using System.Reflection; using Autofac; using Autofac.Core; using DotNetty.Codecs; using DotNetty.Common.Internal.Logging; using DotNetty.Handlers.Logging; using DotNetty.Handlers.Timeout; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using HybirdFrameworkCore.Autofac; using HybirdFrameworkCore.Autofac.Attribute; using log4net; using Microsoft.Extensions.Logging; using LogLevel = DotNetty.Handlers.Logging.LogLevel; namespace HybirdFrameworkDriver.TcpClient; public class TcpClient : IDisposable where TH : IChannelHandler where TD : ByteToMessageDecoder, new() where TE : ChannelHandlerAdapter, new() { private static readonly ILog Log = LogManager.GetLogger(typeof(TcpClient)); private Bootstrap? _bootstrap; private IEventLoopGroup? _eventLoopGroup; public IChannel? Channel { get; set; } public bool Connected { get; set; } public string Host { get; set; } public int Port { get; set; } public bool AutoReconnect { get; set; } public LogLevel? LogLevel { get; set; } public void InitBootstrap(string host, int port, Action? channelInactiveHandler = null) { Host = host; Port = port; if (LogLevel != null) { InternalLoggerFactory.DefaultFactory.AddProvider(new Log4NetProvider()); } _bootstrap = new Bootstrap(); _eventLoopGroup = new MultithreadEventLoopGroup(); _bootstrap .Group(_eventLoopGroup) .Channel() .Option(ChannelOption.TcpNodelay, true) .Handler(new ActionChannelInitializer(channel => { var clientListenerHandler = new ClientListenerHandler(this, AutoReconnect); var pipeline = channel.Pipeline; if (LogLevel != null) { pipeline.AddLast(new LoggingHandler(LogLevel.Value)); } // 监听器 pipeline.AddLast(clientListenerHandler); pipeline.AddLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); // 触发读取超时 // 可以添加编解码器等 ResolveDecode(pipeline); ResolveEncode(pipeline); ResolveHandler(pipeline); })); } private void ResolveEncode(IChannelPipeline pipeline) { pipeline.AddLast(new TE()); } private void ResolveDecode(IChannelPipeline pipeline) { pipeline.AddLast(new TD()); } private void ResolveHandler(IChannelPipeline pipeline) { var list = new List(); foreach (var reg in AppInfo.Container.ComponentRegistry.Registrations) foreach (var service in reg.Services) if (service is TypedService ts) if (MatchHandlers(ts)) list.Add(ts.ServiceType); var handlers = new List(); foreach (var type in list) { var resolve = AppInfo.Container.Resolve(type); handlers.Add((TH)resolve); } handlers.Sort((handler, msgHandler) => { var orderAttribute1 = handler.GetType().GetCustomAttribute(); var orderAttribute2 = msgHandler.GetType().GetCustomAttribute(); var h1Order = orderAttribute1?.Order ?? 0; var h2Order = orderAttribute2?.Order ?? 0; return h1Order.CompareTo(h2Order); }); foreach (var msgHandler in handlers) pipeline.AddLast(msgHandler); } private bool MatchHandlers(TypedService ts) { var interfaces = ts.ServiceType.GetInterfaces(); if (interfaces.Length > 0) foreach (var type in interfaces) if (type == typeof(TH)) return true; return false; } public void BaseConnect() { Connected = false; Log.Info($"begin to connect {Host}:{Port}"); while (!Connected) { try { Task task = _bootstrap!.ConnectAsync(Host, Port); Channel = task.Result; Connected = Channel.Open; } catch (Exception e) { Log.Info($"connect {Host}:{Port} {e}"); } Log.Info($"connect {Host}:{Port} {Connected}"); Thread.Sleep(1000); } } public void Close() { this.Channel?.CloseAsync().Wait(); this.Channel?.CloseCompletion.Wait(); _eventLoopGroup?.ShutdownGracefullyAsync().Wait(); } public void Dispose() { this.Close(); this.Connected = false; } }