You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

148 lines
4.0 KiB

7 months ago
using System.Net;
using System.Reflection;
using Autofac;
using Autofac.Core;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using HybirdFrameworkCore.Autofac;
using HybirdFrameworkCore.Autofac.Attribute;
using log4net;
namespace HybirdFrameworkDriver.TcpClient;
public class TcpClient<TH, TD, TE> where TH : IChannelHandler where TD: ByteToMessageDecoder,new() where TE: ChannelHandlerAdapter, new()
{
private Bootstrap? _bootstrap;
public IChannel Channel { get; set; }
public bool Connected { get; set; } = false;
public string Host { get; set; }
public int Port { get; set; }
private static readonly ILog Log = LogManager.GetLogger(typeof(TcpClient<TH, TD, TE>));
public void InitBootstrap(string host, int port)
{
Host = host;
Port = port;
_bootstrap = new Bootstrap();
_bootstrap
.Group(new MultithreadEventLoopGroup())
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
var clientListenerHandler = new ClientListenerHandler<TH, TD, TE>();
IChannelPipeline pipeline = channel.Pipeline;
// 监听器
pipeline.AddLast(clientListenerHandler);
pipeline.AddLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); // 触发读取超时
// 可以添加编解码器等
ResolveDecode(pipeline);
ResolveEncode(pipeline);
ResolveHandler(pipeline);
}));
}
private void ResolveEncode(IChannelPipeline pipeline)
{
pipeline.AddLast(new TE());
}
private void ResolveDecode(IChannelPipeline pipeline)
{
pipeline.AddLast(new TD());
}
private void ResolveHandler(IChannelPipeline pipeline)
{
List<Type> list = new List<Type>();
foreach (IComponentRegistration reg in AppInfo.Container.ComponentRegistry.Registrations)
{
foreach (Service service in reg.Services)
{
if (service is TypedService ts)
{
if (MatchHandlers(ts))
{
list.Add(ts.ServiceType);
}
}
}
}
List<TH> handlers = new List<TH>();
foreach (var type in list)
{
object resolve = AppInfo.Container.Resolve(type);
handlers.Add((TH) resolve);
}
handlers.Sort((handler, msgHandler) =>
{
OrderAttribute? orderAttribute1 = handler.GetType().GetCustomAttribute<OrderAttribute>();
OrderAttribute? orderAttribute2 = msgHandler.GetType().GetCustomAttribute<OrderAttribute>();
int h1Order = orderAttribute1?.Order ?? 0;
int h2Order = orderAttribute2?.Order ?? 0;
return h1Order.CompareTo(h2Order);
});
foreach (var msgHandler in handlers)
{
pipeline.AddLast((IChannelHandler)msgHandler);
}
}
private bool MatchHandlers(TypedService ts)
{
Type[] interfaces = ts.ServiceType.GetInterfaces();
if (interfaces.Length > 0)
{
foreach (Type type in interfaces)
{
if (type == typeof(TH))
{
return true;
}
}
}
return false;
}
public void Connect()
{
Connected = false;
int num = 1;
while (!Connected)
{
Task<IChannel> task = _bootstrap!.ConnectAsync(new IPEndPoint(IPAddress.Parse(Host), Port));
Channel = task.Result;
Connected = Channel.Open;
if (Connected)
{
break;
}
Thread.Sleep(5000);
num++;
}
}
}