|
|
using DotNetty.Buffers;
|
|
|
using DotNetty.Codecs;
|
|
|
using DotNetty.Codecs.Protobuf;
|
|
|
using DotNetty.Common.Concurrency;
|
|
|
using DotNetty.Handlers.Timeout;
|
|
|
using DotNetty.Transport.Bootstrapping;
|
|
|
using DotNetty.Transport.Channels;
|
|
|
using DotNetty.Transport.Channels.Sockets;
|
|
|
using System;
|
|
|
using System.Collections;
|
|
|
using System.Collections.Generic;
|
|
|
using System.Diagnostics;
|
|
|
using System.Linq;
|
|
|
using System.Net;
|
|
|
using System.Net.Sockets;
|
|
|
using System.Text;
|
|
|
using System.Threading;
|
|
|
using System.Threading.Channels;
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
namespace Module.Socket.Tool
|
|
|
{
|
|
|
public class TcpClientChargerTool
|
|
|
{
|
|
|
#region 定义锁
|
|
|
|
|
|
private object lockObj = new object(); //线程同步锁
|
|
|
|
|
|
#endregion 定义锁
|
|
|
|
|
|
#region 字段属性
|
|
|
|
|
|
private string _client_ip;
|
|
|
|
|
|
/// <summary>
|
|
|
/// 客户端IP地址
|
|
|
/// </summary>
|
|
|
public string F_ClientIP
|
|
|
{
|
|
|
get { return _client_ip; }
|
|
|
set
|
|
|
{
|
|
|
lock (lockObj)
|
|
|
{
|
|
|
_client_ip = value;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private int _client_port;
|
|
|
|
|
|
/// <summary>
|
|
|
/// 客户端端口号
|
|
|
/// </summary>
|
|
|
public int F_ClientPort
|
|
|
{
|
|
|
get { return _client_port; }
|
|
|
set
|
|
|
{
|
|
|
lock (lockObj)
|
|
|
{
|
|
|
_client_port = value;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public event EventHandler<ViewLogEventArgs> ConnectedStatusChanged;
|
|
|
|
|
|
public event EventHandler<TcpDataSendEventArgs> DataSended;
|
|
|
|
|
|
public event EventHandler<ViewLogEventArgs> ConnectLogEvent;
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
private AutoResetEvent ChannelInitilizedEvent = new AutoResetEvent(false);
|
|
|
private Bootstrap SocketBootstrap = new Bootstrap();
|
|
|
private MultithreadEventLoopGroup WorkGroup = new MultithreadEventLoopGroup();
|
|
|
public volatile bool Connected = false;
|
|
|
public volatile IChannel NettyChannel;
|
|
|
|
|
|
public Channel<byte[]> messageQueue;
|
|
|
|
|
|
#region 类结构体
|
|
|
|
|
|
public TcpClientChargerTool()
|
|
|
{
|
|
|
UnboundedChannelOptions options = new UnboundedChannelOptions();
|
|
|
options.SingleReader = true;
|
|
|
messageQueue = Channel.CreateUnbounded<byte[]>(options);
|
|
|
}
|
|
|
|
|
|
public TcpClientChargerTool(string ipAddr, int port)
|
|
|
{
|
|
|
UnboundedChannelOptions options = new UnboundedChannelOptions();
|
|
|
options.SingleReader = true;
|
|
|
messageQueue = Channel.CreateUnbounded<byte[]>(options);
|
|
|
_client_ip = ipAddr;
|
|
|
_client_port = port;
|
|
|
}
|
|
|
|
|
|
#endregion 类结构体
|
|
|
|
|
|
private void InitBootstrap()
|
|
|
{
|
|
|
IByteBuffer delimiter = Unpooled.CopiedBuffer(new byte[] { 0x68 ,0xEE});
|
|
|
|
|
|
|
|
|
SocketBootstrap = new Bootstrap();
|
|
|
|
|
|
SocketBootstrap.Group(WorkGroup)
|
|
|
.Channel<TcpSocketChannel>()
|
|
|
.Option(ChannelOption.TcpNodelay, true)
|
|
|
.Option(ChannelOption.SoKeepalive, true)
|
|
|
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
|
|
|
{
|
|
|
IChannelPipeline pipeline = channel.Pipeline;
|
|
|
// 在管道中添加 DelimiterBasedFrameDecoder,指定分隔符
|
|
|
//pipeline.AddLast(new DelimiterBasedFrameDecoder(2048, delimiter));
|
|
|
|
|
|
//pipeline.AddLast(new CustomFrameDecoder1());
|
|
|
//pipeline.AddLast(new CustomFrameDecoder2(new IByteBuffer[]{ delimiter }, false,false ));//效果不好
|
|
|
//pipeline.AddLast(new CustomFrameDecoder3());
|
|
|
pipeline.AddLast(new CustomFrameDecoder4(new IByteBuffer[] { delimiter }, false, false));//效果不好
|
|
|
|
|
|
|
|
|
|
|
|
pipeline.AddLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); // 触发读取超时
|
|
|
pipeline.AddLast(new ReconnectHandler(this));
|
|
|
pipeline.AddLast(new ClientHandler(this));
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
|
|
|
public async Task Connect()
|
|
|
{
|
|
|
await DoConnect();
|
|
|
}
|
|
|
|
|
|
public void Disconnect()
|
|
|
{
|
|
|
WorkGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
|
|
|
}
|
|
|
|
|
|
public async Task Write(byte[] message)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
await NettyChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(message));
|
|
|
}
|
|
|
catch (Exception e)
|
|
|
{
|
|
|
Console.WriteLine($"{_client_ip} Error writing message: {e.Message}");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private async Task DoConnect()
|
|
|
{
|
|
|
Connected = false;
|
|
|
SetConnectStatusEvent("连接初始化");
|
|
|
|
|
|
do
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
InitBootstrap();
|
|
|
var clientChannel =
|
|
|
await SocketBootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(_client_ip), _client_port));
|
|
|
NettyChannel = clientChannel;
|
|
|
if (clientChannel.Open)
|
|
|
{
|
|
|
Connected = true;
|
|
|
SetConnectStatusEvent("连接成功");
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
Connected = false;
|
|
|
SetConnectStatusEvent("连接失败");
|
|
|
await Task.Delay(5 * 1000);
|
|
|
}
|
|
|
|
|
|
|
|
|
//ChannelInitilizedEvent.Set();
|
|
|
|
|
|
await Task.Delay(200);
|
|
|
}
|
|
|
catch (Exception ce)
|
|
|
{
|
|
|
SetConnectStatusEvent(ce.StackTrace);
|
|
|
ConnectLogEventHandle( ce.StackTrace);
|
|
|
Debug.Print("连接失败:" + ce.StackTrace);
|
|
|
Debug.Print("connect fail ,reconnect after 5 seconds..." + _client_ip);
|
|
|
await Task.Delay(5 * 1000);
|
|
|
}
|
|
|
} while (!Connected);
|
|
|
}
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
/// 设置连接状态字符串
|
|
|
/// </summary>
|
|
|
/// <param name="status">连接状态字符串。比如:“连接成功”</param>
|
|
|
private void SetConnectStatusEvent(string status)
|
|
|
{
|
|
|
string strClient = "";
|
|
|
if (NettyChannel != null && NettyChannel.RemoteAddress != null)
|
|
|
{
|
|
|
strClient = NettyChannel.RemoteAddress.ToString();
|
|
|
}
|
|
|
|
|
|
string strTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + " ";
|
|
|
string strConnectLog = strTime + "客户端" + strClient + "与服务端" + _client_ip.ToString() + ":" +
|
|
|
_client_port.ToString() + " " + status + "!";
|
|
|
|
|
|
EventHandler<ViewLogEventArgs> ConnectedHandler;
|
|
|
ConnectedHandler = ConnectedStatusChanged;
|
|
|
if (ConnectedHandler != null)
|
|
|
{
|
|
|
ConnectedHandler.Invoke(this,
|
|
|
new ViewLogEventArgs() { ConnectedStatus = Connected, IsContent = strConnectLog });
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
internal void ConnectLogEventHandle(string message)
|
|
|
{
|
|
|
if (ConnectLogEvent != null)
|
|
|
{
|
|
|
ConnectLogEvent.Invoke(this, new ViewLogEventArgs() { IsContent = message });
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
public class CustomFrameDecoder : ByteToMessageDecoder
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
|
|
|
{
|
|
|
// 寻找消息头的位置
|
|
|
int headerIndex = IndexOf(input, 0x68);
|
|
|
|
|
|
if (headerIndex == -1)
|
|
|
{
|
|
|
// 没有找到消息头 等待消息头
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// 跳过 消息头之前的数据
|
|
|
if (headerIndex > 0)
|
|
|
{
|
|
|
input.SkipBytes(headerIndex);
|
|
|
|
|
|
}
|
|
|
|
|
|
//if (input.GetByte(0) != 0x68)
|
|
|
//{
|
|
|
// input.SkipBytes(1);
|
|
|
// return;
|
|
|
//}
|
|
|
|
|
|
//if (input.GetByte(1) != 0xEE)
|
|
|
//{
|
|
|
// input.SkipBytes(1);
|
|
|
// return;
|
|
|
//}
|
|
|
|
|
|
// 如果长度不足 继续等待
|
|
|
if (input.ReadableBytes < 3)
|
|
|
{
|
|
|
// 数据不足,等待更多数据
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
int length = (input.GetUnsignedShortLE(headerIndex + 2)) + 4;
|
|
|
|
|
|
|
|
|
// 检查剩余可读字节数是否足够构成完整的帧
|
|
|
if (input.ReadableBytes < length)
|
|
|
{
|
|
|
// 数据不足,等待更多数据
|
|
|
return;
|
|
|
}
|
|
|
byte[] bytes = new byte[length];
|
|
|
input.ReadBytes(bytes);
|
|
|
// 添加到输出列表
|
|
|
output.Add(bytes);
|
|
|
}
|
|
|
|
|
|
|
|
|
private readonly PacketParser packetParser = new PacketParser();
|
|
|
|
|
|
|
|
|
|
|
|
//protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) {
|
|
|
// var outputBufferList = new List<byte[]>();
|
|
|
// var resultByte = new byte[input.ReadableBytes];
|
|
|
// input.ReadBytes(resultByte);
|
|
|
// packetParser.TryParsing(ref resultByte, ref outputBufferList);
|
|
|
// output.AddRange(outputBufferList);
|
|
|
// input.Clear();
|
|
|
//}
|
|
|
private int IndexOf(IByteBuffer buffer, byte value)
|
|
|
{
|
|
|
int readerIndex = buffer.ReaderIndex;
|
|
|
int writerIndex = buffer.WriterIndex;
|
|
|
for (int i = readerIndex; i < writerIndex; i++)
|
|
|
{
|
|
|
if (buffer.GetByte(i) == value)
|
|
|
{
|
|
|
return i - readerIndex;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
public class PacketParser
|
|
|
{
|
|
|
private readonly List<byte[]> _bufferList = new List<byte[]>();
|
|
|
|
|
|
public void TryParsing(ref byte[] inBytes, ref List<byte[]> outBytes)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
_bufferList.Add(inBytes);
|
|
|
var tempBuffer = new byte[_bufferList.Sum(item => item.Length)];
|
|
|
|
|
|
var size = 0;
|
|
|
foreach (var item in _bufferList)
|
|
|
{
|
|
|
item.CopyTo(tempBuffer, size);
|
|
|
size += item.Length;
|
|
|
}
|
|
|
|
|
|
if (tempBuffer.Length < 4) return;
|
|
|
var packetLen = BitConverter.ToUInt16(tempBuffer, 2);
|
|
|
|
|
|
if (tempBuffer.Length < (4 + packetLen))
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
if (tempBuffer.Length == (4 + packetLen))
|
|
|
{
|
|
|
_bufferList.Clear();
|
|
|
outBytes.Add(tempBuffer);
|
|
|
}
|
|
|
|
|
|
if (tempBuffer.Length > (4 + packetLen))
|
|
|
{
|
|
|
var left = new byte[4 + packetLen];
|
|
|
Array.Copy(tempBuffer, 0, left, 0, left.Length);
|
|
|
var right = new byte[tempBuffer.Length - left.Length];
|
|
|
Array.Copy(tempBuffer, left.Length, right, 0, right.Length);
|
|
|
_bufferList.Clear();
|
|
|
outBytes.Add(left);
|
|
|
TryParsing(ref right, ref outBytes);
|
|
|
}
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Debug.Print(ex.ToString());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
public class ClientHandler : SimpleChannelInboundHandler<byte[]>
|
|
|
{
|
|
|
private readonly TcpClientChargerTool tcpClientChargerTool;
|
|
|
|
|
|
public ClientHandler(TcpClientChargerTool tcpClientChargerTool)
|
|
|
{
|
|
|
this.tcpClientChargerTool = tcpClientChargerTool;
|
|
|
}
|
|
|
public static string BytesToHexStr(byte[] data)
|
|
|
{
|
|
|
StringBuilder sb = new StringBuilder(data.Length * 3);
|
|
|
foreach (byte b in data)
|
|
|
sb.Append(Convert.ToString(b, 16).PadLeft(2, '0') + " ");
|
|
|
return sb.ToString().ToUpper();
|
|
|
}
|
|
|
|
|
|
//protected override void ChannelRead0(IChannelHandlerContext context, IByteBuffer message)
|
|
|
//{
|
|
|
// byte[] bytes = new byte[message.ReadableBytes ];
|
|
|
// message.ReadBytes(bytes);
|
|
|
|
|
|
// Log.WriteLog($"RECV:{BytesToHexStr(bytes)}", "ChargerC001" );
|
|
|
|
|
|
|
|
|
// tcpClientChargerTool.messageQueue.Writer.WriteAsync(bytes.Skip(1).ToArray()).AsTask().Wait();
|
|
|
|
|
|
// //byte[] bytes = new byte[message.ReadableBytes];
|
|
|
// //message.ReadBytes(bytes);
|
|
|
// //tcpClientChargerTool.messageQueue.Enqueue(bytes);
|
|
|
//}
|
|
|
|
|
|
protected override void ChannelRead0(IChannelHandlerContext context, byte[] message)
|
|
|
{
|
|
|
//Log.WriteLog($"RECV:{BytesToHexStr(message)}", "ChargerC001");
|
|
|
// byte[] bytes = new byte[message.ReadableBytes + 1];
|
|
|
//bytes[0] = 0x68;
|
|
|
//message.ReadBytes(bytes, 1, message.ReadableBytes);
|
|
|
message = message.Skip(1).ToArray();
|
|
|
tcpClientChargerTool.messageQueue.Writer.WriteAsync(message).AsTask().Wait();
|
|
|
|
|
|
//byte[] bytes = new byte[message.ReadableBytes];
|
|
|
//message.ReadBytes(bytes);
|
|
|
//tcpClientChargerTool.messageQueue.Enqueue(bytes);
|
|
|
}
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
class ReconnectHandler : ChannelHandlerAdapter
|
|
|
{
|
|
|
private readonly TcpClientChargerTool tcpClientChargerTool;
|
|
|
|
|
|
public ReconnectHandler(TcpClientChargerTool tcpClientChargerTool)
|
|
|
{
|
|
|
this.tcpClientChargerTool = tcpClientChargerTool;
|
|
|
}
|
|
|
|
|
|
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
|
|
|
{
|
|
|
if (evt is IdleStateEvent)
|
|
|
{
|
|
|
// 处理超时事件
|
|
|
tcpClientChargerTool.ConnectLogEventHandle("开始处理读写超时事件");
|
|
|
if (tcpClientChargerTool.NettyChannel.Open)
|
|
|
{
|
|
|
// 如果存在有效连接,关闭之前的连接
|
|
|
tcpClientChargerTool.NettyChannel.CloseAsync().Wait();
|
|
|
tcpClientChargerTool.NettyChannel.CloseCompletion.Wait(); // 等待关闭完成
|
|
|
tcpClientChargerTool.ConnectLogEventHandle("关闭之前的连接");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
public override async void ChannelInactive(IChannelHandlerContext context)
|
|
|
{
|
|
|
tcpClientChargerTool.ConnectLogEventHandle("Connection lost. Attempting to reconnect...");
|
|
|
await ReconnectAsync(context);
|
|
|
}
|
|
|
|
|
|
|
|
|
private async Task ReconnectAsync(IChannelHandlerContext context)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
tcpClientChargerTool.ConnectLogEventHandle("开始重连");
|
|
|
await tcpClientChargerTool.Connect(); // 这里需要异步连接方法
|
|
|
tcpClientChargerTool.ConnectLogEventHandle("Reconnected to server.");
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
tcpClientChargerTool.ConnectLogEventHandle($"Reconnection failed: {ex.Message}");
|
|
|
System.Diagnostics.Debug.WriteLine($"Reconnection failed: {ex.Message}");
|
|
|
context.Channel.EventLoop.Schedule(() => ReconnectAsync(context), TimeSpan.FromSeconds(5));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|