断线重连

master
smartwyy 7 months ago
parent 1e06e06259
commit 8eb803cbd4

@ -1,9 +1,6 @@
using System.Net;
using Autofac;
using DotNetty.Codecs; using DotNetty.Codecs;
using DotNetty.Handlers.Timeout; using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels; using DotNetty.Transport.Channels;
using HybirdFrameworkCore.Autofac;
using HybirdFrameworkDriver.Session; using HybirdFrameworkDriver.Session;
using log4net; using log4net;
@ -15,6 +12,13 @@ public class ClientListenerHandler<TH, TD, TE> : ChannelHandlerAdapter where TH
{ {
private static readonly ILog Log = LogManager.GetLogger(typeof(ClientListenerHandler<TH, TD, TE>)); private static readonly ILog Log = LogManager.GetLogger(typeof(ClientListenerHandler<TH, TD, TE>));
public TcpClient<TH, TD, TE> Client { get; set; }
public ClientListenerHandler(TcpClient<TH, TD, TE> client)
{
Client = client;
}
public override void ChannelRegistered(IChannelHandlerContext context) public override void ChannelRegistered(IChannelHandlerContext context)
{ {
base.ChannelRegistered(context); base.ChannelRegistered(context);
@ -37,15 +41,13 @@ public class ClientListenerHandler<TH, TD, TE> : ChannelHandlerAdapter where TH
public override void ChannelInactive(IChannelHandlerContext context) public override void ChannelInactive(IChannelHandlerContext context)
{ {
base.ChannelInactive(context); base.ChannelInactive(context);
var ioSession = SessionMgr.GetSession(context.Channel.Id.ToString());
SessionMgr.UnregisterSession(context.Channel); SessionMgr.UnregisterSession(context.Channel);
Log.Info("inactive " + context.Channel); Log.Info("inactive " + context.Channel);
//处理重连
TcpClient<TH, TD, TE> tcpClient = AppInfo.Container.Resolve<TcpClient<TH, TD, TE>>(); context.Channel.CloseAsync().Wait();
IPEndPoint channelRemoteAddress = (IPEndPoint)ioSession.Channel.RemoteAddress; context.Channel.CloseCompletion.Wait();
tcpClient.InitBootstrap(channelRemoteAddress.Address.ToString(), channelRemoteAddress.Port);
tcpClient.Connect(); new Thread(new ThreadStart(Client.Connect)).Start();
} }
public override void UserEventTriggered(IChannelHandlerContext context, object evt) public override void UserEventTriggered(IChannelHandlerContext context, object evt)

@ -13,20 +13,23 @@ using log4net;
namespace HybirdFrameworkDriver.TcpClient; namespace HybirdFrameworkDriver.TcpClient;
public class TcpClient<TH, TD, TE> where TH : IChannelHandler where TD: ByteToMessageDecoder,new() where TE: ChannelHandlerAdapter, new() public class TcpClient<TH, TD, TE> where TH : IChannelHandler
where TD : ByteToMessageDecoder, new()
where TE : ChannelHandlerAdapter, new()
{ {
private Bootstrap? _bootstrap; private Bootstrap? _bootstrap;
public IChannel Channel { get; set; } public IChannel Channel { get; set; }
public bool Connected { get; set; } = false; public bool Connected { get; set; } = false;
public string Host { get; set; } public string Host { get; set; }
public int Port { get; set; } public int Port { get; set; }
private static readonly ILog Log = LogManager.GetLogger(typeof(TcpClient<TH, TD, TE>));
public void InitBootstrap(string host, int port) private static readonly ILog Log = LogManager.GetLogger(typeof(TcpClient<TH, TD, TE>));
public void InitBootstrap(string host, int port, Action? channelInactiveHandler = null)
{ {
Host = host; Host = host;
Port = port; Port = port;
@ -37,7 +40,7 @@ public class TcpClient<TH, TD, TE> where TH : IChannelHandler where TD: ByteToMe
.Option(ChannelOption.TcpNodelay, true) .Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel => .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{ {
var clientListenerHandler = new ClientListenerHandler<TH, TD, TE>(); var clientListenerHandler = new ClientListenerHandler<TH, TD, TE>(this);
IChannelPipeline pipeline = channel.Pipeline; IChannelPipeline pipeline = channel.Pipeline;
// 监听器 // 监听器
@ -66,9 +69,8 @@ public class TcpClient<TH, TD, TE> where TH : IChannelHandler where TD: ByteToMe
private void ResolveHandler(IChannelPipeline pipeline) private void ResolveHandler(IChannelPipeline pipeline)
{ {
List<Type> list = new List<Type>(); List<Type> list = new List<Type>();
foreach (IComponentRegistration reg in AppInfo.Container.ComponentRegistry.Registrations) foreach (IComponentRegistration reg in AppInfo.Container.ComponentRegistry.Registrations)
{ {
foreach (Service service in reg.Services) foreach (Service service in reg.Services)
@ -87,9 +89,9 @@ public class TcpClient<TH, TD, TE> where TH : IChannelHandler where TD: ByteToMe
foreach (var type in list) foreach (var type in list)
{ {
object resolve = AppInfo.Container.Resolve(type); object resolve = AppInfo.Container.Resolve(type);
handlers.Add((TH) resolve); handlers.Add((TH)resolve);
} }
handlers.Sort((handler, msgHandler) => handlers.Sort((handler, msgHandler) =>
{ {
OrderAttribute? orderAttribute1 = handler.GetType().GetCustomAttribute<OrderAttribute>(); OrderAttribute? orderAttribute1 = handler.GetType().GetCustomAttribute<OrderAttribute>();
@ -125,24 +127,24 @@ public class TcpClient<TH, TD, TE> where TH : IChannelHandler where TD: ByteToMe
public void Connect() public void Connect()
{ {
Connected = false; Connected = false;
int num = 1; Log.Info($"begin to connect {Host}:{Port}");
while (!Connected)
{
Task<IChannel> task = _bootstrap!.ConnectAsync(new IPEndPoint(IPAddress.Parse(Host), Port));
Channel = task.Result; while (Connected)
Connected = Channel.Open; {
try
if (Connected) {
Task<IChannel> task = _bootstrap!.ConnectAsync(new IPEndPoint(IPAddress.Parse(Host), Port));
Channel = task.Result;
Connected = Channel.Open;
}
catch (Exception e)
{ {
break; Log.Info($"connect {Host}:{Port} {e}");
} }
Thread.Sleep(5000); Log.Info($"connect {Host}:{Port} {Connected}");
num++; Thread.Sleep(1000);
} }
} }
} }
Loading…
Cancel
Save