You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

475 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Codecs.Protobuf;
using DotNetty.Common.Concurrency;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Module.Socket.Tool
{
public class TcpClientChargerTool
{
#region 定义锁
private object lockObj = new object(); //线程同步锁
#endregion 定义锁
#region 字段属性
private string _client_ip;
/// <summary>
/// 客户端IP地址
/// </summary>
public string F_ClientIP
{
get { return _client_ip; }
set
{
lock (lockObj)
{
_client_ip = value;
}
}
}
private int _client_port;
/// <summary>
/// 客户端端口号
/// </summary>
public int F_ClientPort
{
get { return _client_port; }
set
{
lock (lockObj)
{
_client_port = value;
}
}
}
public event EventHandler<ViewLogEventArgs> ConnectedStatusChanged;
public event EventHandler<TcpDataSendEventArgs> DataSended;
public event EventHandler<ViewLogEventArgs> ConnectLogEvent;
#endregion
private AutoResetEvent ChannelInitilizedEvent = new AutoResetEvent(false);
private Bootstrap SocketBootstrap = new Bootstrap();
private MultithreadEventLoopGroup WorkGroup = new MultithreadEventLoopGroup();
public volatile bool Connected = false;
public volatile IChannel NettyChannel;
public Channel<byte[]> messageQueue;
#region 类结构体
public TcpClientChargerTool()
{
UnboundedChannelOptions options = new UnboundedChannelOptions();
options.SingleReader = true;
messageQueue = Channel.CreateUnbounded<byte[]>(options);
}
public TcpClientChargerTool(string ipAddr, int port)
{
UnboundedChannelOptions options = new UnboundedChannelOptions();
options.SingleReader = true;
messageQueue = Channel.CreateUnbounded<byte[]>(options);
_client_ip = ipAddr;
_client_port = port;
}
#endregion 类结构体
private void InitBootstrap()
{
IByteBuffer delimiter = Unpooled.CopiedBuffer(new byte[] { 0x68 ,0xEE});
SocketBootstrap = new Bootstrap();
SocketBootstrap.Group(WorkGroup)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Option(ChannelOption.SoKeepalive, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
// 在管道中添加 DelimiterBasedFrameDecoder指定分隔符
//pipeline.AddLast(new DelimiterBasedFrameDecoder(2048, delimiter));
//pipeline.AddLast(new CustomFrameDecoder1());
//pipeline.AddLast(new CustomFrameDecoder2(new IByteBuffer[]{ delimiter }, false,false ));//效果不好
//pipeline.AddLast(new CustomFrameDecoder3());
pipeline.AddLast(new CustomFrameDecoder4(new IByteBuffer[] { delimiter }, false, false));//效果不好
pipeline.AddLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); // 触发读取超时
pipeline.AddLast(new ReconnectHandler(this));
pipeline.AddLast(new ClientHandler(this));
}));
}
public async Task Connect()
{
await DoConnect();
}
public void Disconnect()
{
WorkGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
}
public async Task Write(byte[] message)
{
try
{
await NettyChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(message));
}
catch (Exception e)
{
Console.WriteLine($"{_client_ip} Error writing message: {e.Message}");
}
}
private async Task DoConnect()
{
Connected = false;
SetConnectStatusEvent("连接初始化");
do
{
try
{
InitBootstrap();
var clientChannel =
await SocketBootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(_client_ip), _client_port));
NettyChannel = clientChannel;
if (clientChannel.Open)
{
Connected = true;
SetConnectStatusEvent("连接成功");
}
else
{
Connected = false;
SetConnectStatusEvent("连接失败");
await Task.Delay(5 * 1000);
}
//ChannelInitilizedEvent.Set();
await Task.Delay(200);
}
catch (Exception ce)
{
SetConnectStatusEvent(ce.StackTrace);
ConnectLogEventHandle( ce.StackTrace);
Debug.Print("连接失败:" + ce.StackTrace);
Debug.Print("connect fail ,reconnect after 5 seconds..." + _client_ip);
await Task.Delay(5 * 1000);
}
} while (!Connected);
}
/// <summary>
/// 设置连接状态字符串
/// </summary>
/// <param name="status">连接状态字符串。比如:“连接成功”</param>
private void SetConnectStatusEvent(string status)
{
string strClient = "";
if (NettyChannel != null && NettyChannel.RemoteAddress != null)
{
strClient = NettyChannel.RemoteAddress.ToString();
}
string strTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + " ";
string strConnectLog = strTime + "客户端" + strClient + "与服务端" + _client_ip.ToString() + ":" +
_client_port.ToString() + " " + status + "";
EventHandler<ViewLogEventArgs> ConnectedHandler;
ConnectedHandler = ConnectedStatusChanged;
if (ConnectedHandler != null)
{
ConnectedHandler.Invoke(this,
new ViewLogEventArgs() { ConnectedStatus = Connected, IsContent = strConnectLog });
}
}
internal void ConnectLogEventHandle(string message)
{
if (ConnectLogEvent != null)
{
ConnectLogEvent.Invoke(this, new ViewLogEventArgs() { IsContent = message });
}
}
}
public class CustomFrameDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
// 寻找消息头的位置
int headerIndex = IndexOf(input, 0x68);
if (headerIndex == -1)
{
// 没有找到消息头 等待消息头
return;
}
// 跳过 消息头之前的数据
if (headerIndex > 0)
{
input.SkipBytes(headerIndex);
}
//if (input.GetByte(0) != 0x68)
//{
// input.SkipBytes(1);
// return;
//}
//if (input.GetByte(1) != 0xEE)
//{
// input.SkipBytes(1);
// return;
//}
// 如果长度不足 继续等待
if (input.ReadableBytes < 3)
{
// 数据不足,等待更多数据
return;
}
int length = (input.GetUnsignedShortLE(headerIndex + 2)) + 4;
// 检查剩余可读字节数是否足够构成完整的帧
if (input.ReadableBytes < length)
{
// 数据不足,等待更多数据
return;
}
byte[] bytes = new byte[length];
input.ReadBytes(bytes);
// 添加到输出列表
output.Add(bytes);
}
private readonly PacketParser packetParser = new PacketParser();
//protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) {
// var outputBufferList = new List<byte[]>();
// var resultByte = new byte[input.ReadableBytes];
// input.ReadBytes(resultByte);
// packetParser.TryParsing(ref resultByte, ref outputBufferList);
// output.AddRange(outputBufferList);
// input.Clear();
//}
private int IndexOf(IByteBuffer buffer, byte value)
{
int readerIndex = buffer.ReaderIndex;
int writerIndex = buffer.WriterIndex;
for (int i = readerIndex; i < writerIndex; i++)
{
if (buffer.GetByte(i) == value)
{
return i - readerIndex;
}
}
return -1;
}
public class PacketParser
{
private readonly List<byte[]> _bufferList = new List<byte[]>();
public void TryParsing(ref byte[] inBytes, ref List<byte[]> outBytes)
{
try
{
_bufferList.Add(inBytes);
var tempBuffer = new byte[_bufferList.Sum(item => item.Length)];
var size = 0;
foreach (var item in _bufferList)
{
item.CopyTo(tempBuffer, size);
size += item.Length;
}
if (tempBuffer.Length < 4) return;
var packetLen = BitConverter.ToUInt16(tempBuffer, 2);
if (tempBuffer.Length < (4 + packetLen))
{
return;
}
if (tempBuffer.Length == (4 + packetLen))
{
_bufferList.Clear();
outBytes.Add(tempBuffer);
}
if (tempBuffer.Length > (4 + packetLen))
{
var left = new byte[4 + packetLen];
Array.Copy(tempBuffer, 0, left, 0, left.Length);
var right = new byte[tempBuffer.Length - left.Length];
Array.Copy(tempBuffer, left.Length, right, 0, right.Length);
_bufferList.Clear();
outBytes.Add(left);
TryParsing(ref right, ref outBytes);
}
}
catch (Exception ex)
{
Debug.Print(ex.ToString());
}
}
}
}
public class ClientHandler : SimpleChannelInboundHandler<byte[]>
{
private readonly TcpClientChargerTool tcpClientChargerTool;
public ClientHandler(TcpClientChargerTool tcpClientChargerTool)
{
this.tcpClientChargerTool = tcpClientChargerTool;
}
public static string BytesToHexStr(byte[] data)
{
StringBuilder sb = new StringBuilder(data.Length * 3);
foreach (byte b in data)
sb.Append(Convert.ToString(b, 16).PadLeft(2, '0') + " ");
return sb.ToString().ToUpper();
}
//protected override void ChannelRead0(IChannelHandlerContext context, IByteBuffer message)
//{
// byte[] bytes = new byte[message.ReadableBytes ];
// message.ReadBytes(bytes);
// Log.WriteLog($"RECV:{BytesToHexStr(bytes)}", "ChargerC001" );
// tcpClientChargerTool.messageQueue.Writer.WriteAsync(bytes.Skip(1).ToArray()).AsTask().Wait();
// //byte[] bytes = new byte[message.ReadableBytes];
// //message.ReadBytes(bytes);
// //tcpClientChargerTool.messageQueue.Enqueue(bytes);
//}
protected override void ChannelRead0(IChannelHandlerContext context, byte[] message)
{
//Log.WriteLog($"RECV:{BytesToHexStr(message)}", "ChargerC001");
// byte[] bytes = new byte[message.ReadableBytes + 1];
//bytes[0] = 0x68;
//message.ReadBytes(bytes, 1, message.ReadableBytes);
message = message.Skip(1).ToArray();
tcpClientChargerTool.messageQueue.Writer.WriteAsync(message).AsTask().Wait();
//byte[] bytes = new byte[message.ReadableBytes];
//message.ReadBytes(bytes);
//tcpClientChargerTool.messageQueue.Enqueue(bytes);
}
}
class ReconnectHandler : ChannelHandlerAdapter
{
private readonly TcpClientChargerTool tcpClientChargerTool;
public ReconnectHandler(TcpClientChargerTool tcpClientChargerTool)
{
this.tcpClientChargerTool = tcpClientChargerTool;
}
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
if (evt is IdleStateEvent)
{
// 处理超时事件
tcpClientChargerTool.ConnectLogEventHandle("开始处理读写超时事件");
if (tcpClientChargerTool.NettyChannel.Open)
{
// 如果存在有效连接,关闭之前的连接
tcpClientChargerTool.NettyChannel.CloseAsync().Wait();
tcpClientChargerTool.NettyChannel.CloseCompletion.Wait(); // 等待关闭完成
tcpClientChargerTool.ConnectLogEventHandle("关闭之前的连接");
}
}
}
public override async void ChannelInactive(IChannelHandlerContext context)
{
tcpClientChargerTool.ConnectLogEventHandle("Connection lost. Attempting to reconnect...");
await ReconnectAsync(context);
}
private async Task ReconnectAsync(IChannelHandlerContext context)
{
try
{
tcpClientChargerTool.ConnectLogEventHandle("开始重连");
await tcpClientChargerTool.Connect(); // 这里需要异步连接方法
tcpClientChargerTool.ConnectLogEventHandle("Reconnected to server.");
}
catch (Exception ex)
{
tcpClientChargerTool.ConnectLogEventHandle($"Reconnection failed: {ex.Message}");
System.Diagnostics.Debug.WriteLine($"Reconnection failed: {ex.Message}");
context.Channel.EventLoop.Schedule(() => ReconnectAsync(context), TimeSpan.FromSeconds(5));
}
}
}
}