using DotNetty.Buffers; using DotNetty.Codecs; using DotNetty.Codecs.Protobuf; using DotNetty.Common.Concurrency; using DotNetty.Handlers.Timeout; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace Module.Socket.Tool { public class TcpClientChargerTool { #region 定义锁 private object lockObj = new object(); //线程同步锁 #endregion 定义锁 #region 字段属性 private string _client_ip; /// /// 客户端IP地址 /// public string F_ClientIP { get { return _client_ip; } set { lock (lockObj) { _client_ip = value; } } } private int _client_port; /// /// 客户端端口号 /// public int F_ClientPort { get { return _client_port; } set { lock (lockObj) { _client_port = value; } } } public event EventHandler ConnectedStatusChanged; public event EventHandler DataSended; public event EventHandler ConnectLogEvent; #endregion private AutoResetEvent ChannelInitilizedEvent = new AutoResetEvent(false); private Bootstrap SocketBootstrap = new Bootstrap(); private MultithreadEventLoopGroup WorkGroup = new MultithreadEventLoopGroup(); public volatile bool Connected = false; public volatile IChannel NettyChannel; public Channel messageQueue; #region 类结构体 public TcpClientChargerTool() { UnboundedChannelOptions options = new UnboundedChannelOptions(); options.SingleReader = true; messageQueue = Channel.CreateUnbounded(options); } public TcpClientChargerTool(string ipAddr, int port) { UnboundedChannelOptions options = new UnboundedChannelOptions(); options.SingleReader = true; messageQueue = Channel.CreateUnbounded(options); _client_ip = ipAddr; _client_port = port; } #endregion 类结构体 private void InitBootstrap() { IByteBuffer delimiter = Unpooled.CopiedBuffer(new byte[] { 0x68 ,0xEE}); SocketBootstrap = new Bootstrap(); SocketBootstrap.Group(WorkGroup) .Channel() .Option(ChannelOption.TcpNodelay, true) .Option(ChannelOption.SoKeepalive, true) .Handler(new ActionChannelInitializer(channel => { IChannelPipeline pipeline = channel.Pipeline; // 在管道中添加 DelimiterBasedFrameDecoder,指定分隔符 //pipeline.AddLast(new DelimiterBasedFrameDecoder(2048, delimiter)); //pipeline.AddLast(new CustomFrameDecoder1()); //pipeline.AddLast(new CustomFrameDecoder2(new IByteBuffer[]{ delimiter }, false,false ));//效果不好 //pipeline.AddLast(new CustomFrameDecoder3()); pipeline.AddLast(new CustomFrameDecoder4(new IByteBuffer[] { delimiter }, false, false));//效果不好 pipeline.AddLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); // 触发读取超时 pipeline.AddLast(new ReconnectHandler(this)); pipeline.AddLast(new ClientHandler(this)); })); } public async Task Connect() { await DoConnect(); } public void Disconnect() { WorkGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)); } public async Task Write(byte[] message) { try { await NettyChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(message)); } catch (Exception e) { Console.WriteLine($"{_client_ip} Error writing message: {e.Message}"); } } private async Task DoConnect() { Connected = false; SetConnectStatusEvent("连接初始化"); do { try { InitBootstrap(); var clientChannel = await SocketBootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(_client_ip), _client_port)); NettyChannel = clientChannel; if (clientChannel.Open) { Connected = true; SetConnectStatusEvent("连接成功"); } else { Connected = false; SetConnectStatusEvent("连接失败"); await Task.Delay(5 * 1000); } //ChannelInitilizedEvent.Set(); await Task.Delay(200); } catch (Exception ce) { SetConnectStatusEvent(ce.StackTrace); ConnectLogEventHandle( ce.StackTrace); Debug.Print("连接失败:" + ce.StackTrace); Debug.Print("connect fail ,reconnect after 5 seconds..." + _client_ip); await Task.Delay(5 * 1000); } } while (!Connected); } /// /// 设置连接状态字符串 /// /// 连接状态字符串。比如:“连接成功” private void SetConnectStatusEvent(string status) { string strClient = ""; if (NettyChannel != null && NettyChannel.RemoteAddress != null) { strClient = NettyChannel.RemoteAddress.ToString(); } string strTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + " "; string strConnectLog = strTime + "客户端" + strClient + "与服务端" + _client_ip.ToString() + ":" + _client_port.ToString() + " " + status + "!"; EventHandler ConnectedHandler; ConnectedHandler = ConnectedStatusChanged; if (ConnectedHandler != null) { ConnectedHandler.Invoke(this, new ViewLogEventArgs() { ConnectedStatus = Connected, IsContent = strConnectLog }); } } internal void ConnectLogEventHandle(string message) { if (ConnectLogEvent != null) { ConnectLogEvent.Invoke(this, new ViewLogEventArgs() { IsContent = message }); } } } public class CustomFrameDecoder : ByteToMessageDecoder { protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) { // 寻找消息头的位置 int headerIndex = IndexOf(input, 0x68); if (headerIndex == -1) { // 没有找到消息头 等待消息头 return; } // 跳过 消息头之前的数据 if (headerIndex > 0) { input.SkipBytes(headerIndex); } //if (input.GetByte(0) != 0x68) //{ // input.SkipBytes(1); // return; //} //if (input.GetByte(1) != 0xEE) //{ // input.SkipBytes(1); // return; //} // 如果长度不足 继续等待 if (input.ReadableBytes < 3) { // 数据不足,等待更多数据 return; } int length = (input.GetUnsignedShortLE(headerIndex + 2)) + 4; // 检查剩余可读字节数是否足够构成完整的帧 if (input.ReadableBytes < length) { // 数据不足,等待更多数据 return; } byte[] bytes = new byte[length]; input.ReadBytes(bytes); // 添加到输出列表 output.Add(bytes); } private readonly PacketParser packetParser = new PacketParser(); //protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) { // var outputBufferList = new List(); // var resultByte = new byte[input.ReadableBytes]; // input.ReadBytes(resultByte); // packetParser.TryParsing(ref resultByte, ref outputBufferList); // output.AddRange(outputBufferList); // input.Clear(); //} private int IndexOf(IByteBuffer buffer, byte value) { int readerIndex = buffer.ReaderIndex; int writerIndex = buffer.WriterIndex; for (int i = readerIndex; i < writerIndex; i++) { if (buffer.GetByte(i) == value) { return i - readerIndex; } } return -1; } public class PacketParser { private readonly List _bufferList = new List(); public void TryParsing(ref byte[] inBytes, ref List outBytes) { try { _bufferList.Add(inBytes); var tempBuffer = new byte[_bufferList.Sum(item => item.Length)]; var size = 0; foreach (var item in _bufferList) { item.CopyTo(tempBuffer, size); size += item.Length; } if (tempBuffer.Length < 4) return; var packetLen = BitConverter.ToUInt16(tempBuffer, 2); if (tempBuffer.Length < (4 + packetLen)) { return; } if (tempBuffer.Length == (4 + packetLen)) { _bufferList.Clear(); outBytes.Add(tempBuffer); } if (tempBuffer.Length > (4 + packetLen)) { var left = new byte[4 + packetLen]; Array.Copy(tempBuffer, 0, left, 0, left.Length); var right = new byte[tempBuffer.Length - left.Length]; Array.Copy(tempBuffer, left.Length, right, 0, right.Length); _bufferList.Clear(); outBytes.Add(left); TryParsing(ref right, ref outBytes); } } catch (Exception ex) { Debug.Print(ex.ToString()); } } } } public class ClientHandler : SimpleChannelInboundHandler { private readonly TcpClientChargerTool tcpClientChargerTool; public ClientHandler(TcpClientChargerTool tcpClientChargerTool) { this.tcpClientChargerTool = tcpClientChargerTool; } public static string BytesToHexStr(byte[] data) { StringBuilder sb = new StringBuilder(data.Length * 3); foreach (byte b in data) sb.Append(Convert.ToString(b, 16).PadLeft(2, '0') + " "); return sb.ToString().ToUpper(); } //protected override void ChannelRead0(IChannelHandlerContext context, IByteBuffer message) //{ // byte[] bytes = new byte[message.ReadableBytes ]; // message.ReadBytes(bytes); // Log.WriteLog($"RECV:{BytesToHexStr(bytes)}", "ChargerC001" ); // tcpClientChargerTool.messageQueue.Writer.WriteAsync(bytes.Skip(1).ToArray()).AsTask().Wait(); // //byte[] bytes = new byte[message.ReadableBytes]; // //message.ReadBytes(bytes); // //tcpClientChargerTool.messageQueue.Enqueue(bytes); //} protected override void ChannelRead0(IChannelHandlerContext context, byte[] message) { //Log.WriteLog($"RECV:{BytesToHexStr(message)}", "ChargerC001"); // byte[] bytes = new byte[message.ReadableBytes + 1]; //bytes[0] = 0x68; //message.ReadBytes(bytes, 1, message.ReadableBytes); message = message.Skip(1).ToArray(); tcpClientChargerTool.messageQueue.Writer.WriteAsync(message).AsTask().Wait(); //byte[] bytes = new byte[message.ReadableBytes]; //message.ReadBytes(bytes); //tcpClientChargerTool.messageQueue.Enqueue(bytes); } } class ReconnectHandler : ChannelHandlerAdapter { private readonly TcpClientChargerTool tcpClientChargerTool; public ReconnectHandler(TcpClientChargerTool tcpClientChargerTool) { this.tcpClientChargerTool = tcpClientChargerTool; } public override void UserEventTriggered(IChannelHandlerContext context, object evt) { if (evt is IdleStateEvent) { // 处理超时事件 tcpClientChargerTool.ConnectLogEventHandle("开始处理读写超时事件"); if (tcpClientChargerTool.NettyChannel.Open) { // 如果存在有效连接,关闭之前的连接 tcpClientChargerTool.NettyChannel.CloseAsync().Wait(); tcpClientChargerTool.NettyChannel.CloseCompletion.Wait(); // 等待关闭完成 tcpClientChargerTool.ConnectLogEventHandle("关闭之前的连接"); } } } public override async void ChannelInactive(IChannelHandlerContext context) { tcpClientChargerTool.ConnectLogEventHandle("Connection lost. Attempting to reconnect..."); await ReconnectAsync(context); } private async Task ReconnectAsync(IChannelHandlerContext context) { try { tcpClientChargerTool.ConnectLogEventHandle("开始重连"); await tcpClientChargerTool.Connect(); // 这里需要异步连接方法 tcpClientChargerTool.ConnectLogEventHandle("Reconnected to server."); } catch (Exception ex) { tcpClientChargerTool.ConnectLogEventHandle($"Reconnection failed: {ex.Message}"); System.Diagnostics.Debug.WriteLine($"Reconnection failed: {ex.Message}"); context.Channel.EventLoop.Schedule(() => ReconnectAsync(context), TimeSpan.FromSeconds(5)); } } } }