You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

475 lines
15 KiB

using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Codecs.Protobuf;
using DotNetty.Common.Concurrency;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Module.Socket.Tool
{
public class TcpClientChargerTool
{
#region 定义锁
private object lockObj = new object(); //线程同步锁
#endregion 定义锁
#region 字段属性
private string _client_ip;
/// <summary>
/// 客户端IP地址
/// </summary>
public string F_ClientIP
{
get { return _client_ip; }
set
{
lock (lockObj)
{
_client_ip = value;
}
}
}
private int _client_port;
/// <summary>
/// 客户端端口号
/// </summary>
public int F_ClientPort
{
get { return _client_port; }
set
{
lock (lockObj)
{
_client_port = value;
}
}
}
public event EventHandler<ViewLogEventArgs> ConnectedStatusChanged;
public event EventHandler<TcpDataSendEventArgs> DataSended;
public event EventHandler<ViewLogEventArgs> ConnectLogEvent;
#endregion
private AutoResetEvent ChannelInitilizedEvent = new AutoResetEvent(false);
private Bootstrap SocketBootstrap = new Bootstrap();
private MultithreadEventLoopGroup WorkGroup = new MultithreadEventLoopGroup();
public volatile bool Connected = false;
public volatile IChannel NettyChannel;
public Channel<byte[]> messageQueue;
#region 类结构体
public TcpClientChargerTool()
{
UnboundedChannelOptions options = new UnboundedChannelOptions();
options.SingleReader = true;
messageQueue = Channel.CreateUnbounded<byte[]>(options);
}
public TcpClientChargerTool(string ipAddr, int port)
{
UnboundedChannelOptions options = new UnboundedChannelOptions();
options.SingleReader = true;
messageQueue = Channel.CreateUnbounded<byte[]>(options);
_client_ip = ipAddr;
_client_port = port;
}
#endregion 类结构体
private void InitBootstrap()
{
IByteBuffer delimiter = Unpooled.CopiedBuffer(new byte[] { 0x68 ,0xEE});
SocketBootstrap = new Bootstrap();
SocketBootstrap.Group(WorkGroup)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Option(ChannelOption.SoKeepalive, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
// 在管道中添加 DelimiterBasedFrameDecoder指定分隔符
//pipeline.AddLast(new DelimiterBasedFrameDecoder(2048, delimiter));
//pipeline.AddLast(new CustomFrameDecoder1());
//pipeline.AddLast(new CustomFrameDecoder2(new IByteBuffer[]{ delimiter }, false,false ));//效果不好
//pipeline.AddLast(new CustomFrameDecoder3());
pipeline.AddLast(new CustomFrameDecoder4(new IByteBuffer[] { delimiter }, false, false));//效果不好
pipeline.AddLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); // 触发读取超时
pipeline.AddLast(new ReconnectHandler(this));
pipeline.AddLast(new ClientHandler(this));
}));
}
public async Task Connect()
{
await DoConnect();
}
public void Disconnect()
{
WorkGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
}
public async Task Write(byte[] message)
{
try
{
await NettyChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(message));
}
catch (Exception e)
{
Console.WriteLine($"{_client_ip} Error writing message: {e.Message}");
}
}
private async Task DoConnect()
{
Connected = false;
SetConnectStatusEvent("连接初始化");
do
{
try
{
InitBootstrap();
var clientChannel =
await SocketBootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(_client_ip), _client_port));
NettyChannel = clientChannel;
if (clientChannel.Open)
{
Connected = true;
SetConnectStatusEvent("连接成功");
}
else
{
Connected = false;
SetConnectStatusEvent("连接失败");
await Task.Delay(5 * 1000);
}
//ChannelInitilizedEvent.Set();
await Task.Delay(200);
}
catch (Exception ce)
{
SetConnectStatusEvent(ce.StackTrace);
ConnectLogEventHandle( ce.StackTrace);
Debug.Print("连接失败:" + ce.StackTrace);
Debug.Print("connect fail ,reconnect after 5 seconds..." + _client_ip);
await Task.Delay(5 * 1000);
}
} while (!Connected);
}
/// <summary>
/// 设置连接状态字符串
/// </summary>
/// <param name="status">连接状态字符串。比如:“连接成功”</param>
private void SetConnectStatusEvent(string status)
{
string strClient = "";
if (NettyChannel != null && NettyChannel.RemoteAddress != null)
{
strClient = NettyChannel.RemoteAddress.ToString();
}
string strTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + " ";
string strConnectLog = strTime + "客户端" + strClient + "与服务端" + _client_ip.ToString() + ":" +
_client_port.ToString() + " " + status + "";
EventHandler<ViewLogEventArgs> ConnectedHandler;
ConnectedHandler = ConnectedStatusChanged;
if (ConnectedHandler != null)
{
ConnectedHandler.Invoke(this,
new ViewLogEventArgs() { ConnectedStatus = Connected, IsContent = strConnectLog });
}
}
internal void ConnectLogEventHandle(string message)
{
if (ConnectLogEvent != null)
{
ConnectLogEvent.Invoke(this, new ViewLogEventArgs() { IsContent = message });
}
}
}
public class CustomFrameDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
// 寻找消息头的位置
int headerIndex = IndexOf(input, 0x68);
if (headerIndex == -1)
{
// 没有找到消息头 等待消息头
return;
}
// 跳过 消息头之前的数据
if (headerIndex > 0)
{
input.SkipBytes(headerIndex);
}
//if (input.GetByte(0) != 0x68)
//{
// input.SkipBytes(1);
// return;
//}
//if (input.GetByte(1) != 0xEE)
//{
// input.SkipBytes(1);
// return;
//}
// 如果长度不足 继续等待
if (input.ReadableBytes < 3)
{
// 数据不足,等待更多数据
return;
}
int length = (input.GetUnsignedShortLE(headerIndex + 2)) + 4;
// 检查剩余可读字节数是否足够构成完整的帧
if (input.ReadableBytes < length)
{
// 数据不足,等待更多数据
return;
}
byte[] bytes = new byte[length];
input.ReadBytes(bytes);
// 添加到输出列表
output.Add(bytes);
}
private readonly PacketParser packetParser = new PacketParser();
//protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) {
// var outputBufferList = new List<byte[]>();
// var resultByte = new byte[input.ReadableBytes];
// input.ReadBytes(resultByte);
// packetParser.TryParsing(ref resultByte, ref outputBufferList);
// output.AddRange(outputBufferList);
// input.Clear();
//}
private int IndexOf(IByteBuffer buffer, byte value)
{
int readerIndex = buffer.ReaderIndex;
int writerIndex = buffer.WriterIndex;
for (int i = readerIndex; i < writerIndex; i++)
{
if (buffer.GetByte(i) == value)
{
return i - readerIndex;
}
}
return -1;
}
public class PacketParser
{
private readonly List<byte[]> _bufferList = new List<byte[]>();
public void TryParsing(ref byte[] inBytes, ref List<byte[]> outBytes)
{
try
{
_bufferList.Add(inBytes);
var tempBuffer = new byte[_bufferList.Sum(item => item.Length)];
var size = 0;
foreach (var item in _bufferList)
{
item.CopyTo(tempBuffer, size);
size += item.Length;
}
if (tempBuffer.Length < 4) return;
var packetLen = BitConverter.ToUInt16(tempBuffer, 2);
if (tempBuffer.Length < (4 + packetLen))
{
return;
}
if (tempBuffer.Length == (4 + packetLen))
{
_bufferList.Clear();
outBytes.Add(tempBuffer);
}
if (tempBuffer.Length > (4 + packetLen))
{
var left = new byte[4 + packetLen];
Array.Copy(tempBuffer, 0, left, 0, left.Length);
var right = new byte[tempBuffer.Length - left.Length];
Array.Copy(tempBuffer, left.Length, right, 0, right.Length);
_bufferList.Clear();
outBytes.Add(left);
TryParsing(ref right, ref outBytes);
}
}
catch (Exception ex)
{
Debug.Print(ex.ToString());
}
}
}
}
public class ClientHandler : SimpleChannelInboundHandler<byte[]>
{
private readonly TcpClientChargerTool tcpClientChargerTool;
public ClientHandler(TcpClientChargerTool tcpClientChargerTool)
{
this.tcpClientChargerTool = tcpClientChargerTool;
}
public static string BytesToHexStr(byte[] data)
{
StringBuilder sb = new StringBuilder(data.Length * 3);
foreach (byte b in data)
sb.Append(Convert.ToString(b, 16).PadLeft(2, '0') + " ");
return sb.ToString().ToUpper();
}
//protected override void ChannelRead0(IChannelHandlerContext context, IByteBuffer message)
//{
// byte[] bytes = new byte[message.ReadableBytes ];
// message.ReadBytes(bytes);
// Log.WriteLog($"RECV:{BytesToHexStr(bytes)}", "ChargerC001" );
// tcpClientChargerTool.messageQueue.Writer.WriteAsync(bytes.Skip(1).ToArray()).AsTask().Wait();
// //byte[] bytes = new byte[message.ReadableBytes];
// //message.ReadBytes(bytes);
// //tcpClientChargerTool.messageQueue.Enqueue(bytes);
//}
protected override void ChannelRead0(IChannelHandlerContext context, byte[] message)
{
//Log.WriteLog($"RECV:{BytesToHexStr(message)}", "ChargerC001");
// byte[] bytes = new byte[message.ReadableBytes + 1];
//bytes[0] = 0x68;
//message.ReadBytes(bytes, 1, message.ReadableBytes);
message = message.Skip(1).ToArray();
tcpClientChargerTool.messageQueue.Writer.WriteAsync(message).AsTask().Wait();
//byte[] bytes = new byte[message.ReadableBytes];
//message.ReadBytes(bytes);
//tcpClientChargerTool.messageQueue.Enqueue(bytes);
}
}
class ReconnectHandler : ChannelHandlerAdapter
{
private readonly TcpClientChargerTool tcpClientChargerTool;
public ReconnectHandler(TcpClientChargerTool tcpClientChargerTool)
{
this.tcpClientChargerTool = tcpClientChargerTool;
}
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
if (evt is IdleStateEvent)
{
// 处理超时事件
tcpClientChargerTool.ConnectLogEventHandle("开始处理读写超时事件");
if (tcpClientChargerTool.NettyChannel.Open)
{
// 如果存在有效连接,关闭之前的连接
tcpClientChargerTool.NettyChannel.CloseAsync().Wait();
tcpClientChargerTool.NettyChannel.CloseCompletion.Wait(); // 等待关闭完成
tcpClientChargerTool.ConnectLogEventHandle("关闭之前的连接");
}
}
}
public override async void ChannelInactive(IChannelHandlerContext context)
{
tcpClientChargerTool.ConnectLogEventHandle("Connection lost. Attempting to reconnect...");
await ReconnectAsync(context);
}
private async Task ReconnectAsync(IChannelHandlerContext context)
{
try
{
tcpClientChargerTool.ConnectLogEventHandle("开始重连");
await tcpClientChargerTool.Connect(); // 这里需要异步连接方法
tcpClientChargerTool.ConnectLogEventHandle("Reconnected to server.");
}
catch (Exception ex)
{
tcpClientChargerTool.ConnectLogEventHandle($"Reconnection failed: {ex.Message}");
System.Diagnostics.Debug.WriteLine($"Reconnection failed: {ex.Message}");
context.Channel.EventLoop.Schedule(() => ReconnectAsync(context), TimeSpan.FromSeconds(5));
}
}
}
}