同步框架

zw
smartwyy 6 months ago
parent 3df11fc9b1
commit 256840279b

@ -1,5 +1,4 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using DotNetty.Buffers;
using DotNetty.Transport.Channels; using DotNetty.Transport.Channels;
using log4net; using log4net;
@ -25,7 +24,7 @@ public class IoSession
public ConcurrentDictionary<string, object> BusinessMap { get; } = new(); public ConcurrentDictionary<string, object> BusinessMap { get; } = new();
public void Send(IByteBuffer buffer) public void Send(Object buffer)
{ {
Channel.WriteAndFlushAsync(buffer); Channel.WriteAndFlushAsync(buffer);
} }

@ -60,6 +60,18 @@ public class SessionMgr
Dictionary.AddOrUpdate(channel.Id.ToString(), ioSession, (k, oldSession) => ioSession); Dictionary.AddOrUpdate(channel.Id.ToString(), ioSession, (k, oldSession) => ioSession);
} }
public static void ChangeSessionKey(IoSession ioSession, string newKey)
{
var oldKey = ioSession.Key;
if (oldKey != null)
{
Dictionary.Remove(oldKey, out IoSession? session);
}
ioSession.Key = newKey;
Dictionary.AddOrUpdate(newKey, ioSession, (k, oldSession) => ioSession);
}
public static void RegisterModbusSession(string key, ModbusSession ioSession) public static void RegisterModbusSession(string key, ModbusSession ioSession)
{ {
@ -83,9 +95,9 @@ public class SessionMgr
} }
public static void Broadcast(IByteBuffer buffer, ConcurrentDictionary<string, IoSession> dictionary) public static void Broadcast(Object buffer)
{ {
foreach (var session in dictionary.Values) session.Send(buffer); foreach (var session in Dictionary.Values) session.Send(buffer);
} }
public static object GetAttr(IoSession session, string key) public static object GetAttr(IoSession session, string key)

@ -13,6 +13,8 @@ public class ServerListenerHandler<TH, TD, TE> : ChannelHandlerAdapter where TH
{ {
private static readonly ILog Log = LogManager.GetLogger(typeof(ServerListenerHandler<TH, TD, TE>)); private static readonly ILog Log = LogManager.GetLogger(typeof(ServerListenerHandler<TH, TD, TE>));
public Action? ChannelInActiveAction { get; set; }
public override void ChannelRegistered(IChannelHandlerContext context) public override void ChannelRegistered(IChannelHandlerContext context)
{ {
base.ChannelRegistered(context); base.ChannelRegistered(context);
@ -35,6 +37,11 @@ public class ServerListenerHandler<TH, TD, TE> : ChannelHandlerAdapter where TH
public override void ChannelInactive(IChannelHandlerContext context) public override void ChannelInactive(IChannelHandlerContext context)
{ {
base.ChannelInactive(context); base.ChannelInactive(context);
if (ChannelInActiveAction != null)
{
ChannelInActiveAction();
}
SessionMgr.UnregisterSession(context.Channel); SessionMgr.UnregisterSession(context.Channel);
Log.Info("inactive " + context.Channel); Log.Info("inactive " + context.Channel);
} }

@ -33,8 +33,11 @@ public class TcpServer<TH, TD, TE> : IDisposable where TH : IChannelHandler
public LogLevel? LogLevel { get; set; } public LogLevel? LogLevel { get; set; }
public Action? ChannelInActiveAction { get; set; }
public TcpServer() public TcpServer()
{ {
if (LogLevel != null) if (LogLevel != null)
{ {
InternalLoggerFactory.DefaultFactory.AddProvider(new Log4NetProvider()); InternalLoggerFactory.DefaultFactory.AddProvider(new Log4NetProvider());
@ -49,12 +52,12 @@ public class TcpServer<TH, TD, TE> : IDisposable where TH : IChannelHandler
.Handler(new LoggingHandler()) .Handler(new LoggingHandler())
.ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel => .ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel =>
{ {
var serverListenerHandler = new ServerListenerHandler<TH, TD, TE>(); var serverListenerHandler = new ServerListenerHandler<TH, TD, TE>
var pipeline = channel.Pipeline;
if (LogLevel != null)
{ {
pipeline.AddLast(new LoggingHandler(LogLevel.Value)); ChannelInActiveAction = ChannelInActiveAction
} };
var pipeline = channel.Pipeline;
pipeline.AddLast(new LoggingHandler());
pipeline.AddLast(serverListenerHandler); pipeline.AddLast(serverListenerHandler);
pipeline.AddLast(new IdleStateHandler(0, 0, 180)); //检测空闲连接 pipeline.AddLast(new IdleStateHandler(0, 0, 180)); //检测空闲连接
//业务handler 这里是实际处理业务的Handler //业务handler 这里是实际处理业务的Handler

Loading…
Cancel
Save