using System.Reflection;
using Autofac;
using Autofac.Core;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Logging;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using HybirdFrameworkCore.Autofac;
using HybirdFrameworkCore.Autofac.Attribute;
using log4net;
using Microsoft.Extensions.Logging;
using LogLevel = DotNetty.Handlers.Logging.LogLevel;
namespace HybirdFrameworkDriver.TcpServer;
///
/// netty server
///
public class TcpServer
: IDisposable where TH : IChannelHandler
where TD : ByteToMessageDecoder, new()
where TE : ChannelHandlerAdapter, new()
{
private static readonly ILog Log = LogManager.GetLogger(typeof(TcpServer | ));
private static MultithreadEventLoopGroup? bossGroup;
private static MultithreadEventLoopGroup? workerGroup;
private static ServerBootstrap? bootstrap;
private int _port = 9000;
public LogLevel? LogLevel { get; set; }
public Action? ChannelInActiveAction { get; set; }
public void InitBootstrap()
{
if (LogLevel != null)
{
InternalLoggerFactory.DefaultFactory.AddProvider(new Log4NetProvider());
}
bossGroup = new MultithreadEventLoopGroup();
workerGroup = new MultithreadEventLoopGroup();
bootstrap = new ServerBootstrap();
bootstrap
.Group(bossGroup, workerGroup) // 设置主和工作线程组
.Channel() // 设置通道模式为TcpSocket
.Option(ChannelOption.SoKeepalive, true) //保持连接
.Handler(new LoggingHandler())
.ChildHandler(new ActionChannelInitializer(channel =>
{
var serverListenerHandler = new ServerListenerHandler
{
ChannelInActiveAction = ChannelInActiveAction
};
var pipeline = channel.Pipeline;
pipeline.AddLast(new LoggingHandler());
pipeline.AddLast(serverListenerHandler);
pipeline.AddLast(new IdleStateHandler(0, 0, 180)); //检测空闲连接
//业务handler ,这里是实际处理业务的Handler
ResolveEncode(pipeline);
ResolveDecode(pipeline);
ResolveHandler(pipeline);
}));
}
///
/// ??netty
///
public void Dispose()
{
Log.Info(this + " Dispose");
bossGroup?.ShutdownGracefullyAsync().Wait();
workerGroup?.ShutdownGracefullyAsync().Wait();
}
private void ResolveEncode(IChannelPipeline pipeline)
{
pipeline.AddLast(new TE());
}
private void ResolveDecode(IChannelPipeline pipeline)
{
pipeline.AddLast(new TD());
}
private void ResolveHandler(IChannelPipeline pipeline)
{
var list = new List();
foreach (var reg in AppInfo.Container.ComponentRegistry.Registrations)
foreach (var service in reg.Services)
if (service is TypedService ts)
if (MatchHandlers(ts))
list.Add(ts.ServiceType);
var handlers = new List();
foreach (var type in list)
{
var resolve = AppInfo.Container.Resolve(type);
handlers.Add((TH)resolve);
}
handlers.Sort((handler, msgHandler) =>
{
var orderAttribute1 = handler.GetType().GetCustomAttribute();
var orderAttribute2 = msgHandler.GetType().GetCustomAttribute();
var h1Order = orderAttribute1?.Order ?? 0;
var h2Order = orderAttribute2?.Order ?? 0;
return h1Order.CompareTo(h2Order);
});
foreach (var msgHandler in handlers) pipeline.AddLast(msgHandler);
}
private bool MatchHandlers(TypedService ts)
{
var interfaces = ts.ServiceType.GetInterfaces();
if (interfaces.Length > 0)
foreach (var type in interfaces)
if (type == typeof(TH))
return true;
return false;
}
public void Start(int port)
{
_port = port;
Log.Info(" Start Listen");
InitBootstrap();
Task? channel = bootstrap?.BindAsync(_port);
Log.Info($"netty success listen {_port}");
}
}
| | |