云平台统一处理

zw
smartwyy 6 months ago
parent 44e2907ddc
commit 9442d9515c

@ -1,14 +1,19 @@
// See https://aka.ms/new-console-template for more information
using ConsoleStarter;
using log4net;
using log4net.Config;
using Newtonsoft.Json;
internal class Program
{
private static readonly ILog Log = LogManager.GetLogger(typeof(Program));
public static void Main(string[] args)
{
XmlConfigurator.ConfigureAndWatch(new FileInfo(AppDomain.CurrentDomain.BaseDirectory + @"\log4net.xml"));
var exportDb = new ExportDb();
exportDb.Export();
string str = "{\n\"header\":{\n\"cmd\":\"amtBat\",\n\"id\":1,\n\"sid\":\"xxxx\"\n},\n\"body\":{\n\"sn\": \"xxxx\",\n\"cn\": \"xxxx\",\n\"bn\": \"xxxx\",\n\"bm\": \"xxxx\",\n\"at\": \"2020-11-20 18:23:06\",\n\"am\": 30\n}\n}";
Log.Info(str);
}
}

@ -17,6 +17,7 @@
<PackageReference Include="DotNetty.Transport" Version="0.7.5"/>
<PackageReference Include="HslCommunication" Version="11.1.1"/>
<PackageReference Include="log4net" Version="2.0.15"/>
<PackageReference Include="Microsoft.Extensions.Logging.Log4Net.AspNetCore" Version="8.0.0" />
</ItemGroup>
<ItemGroup>

@ -3,6 +3,8 @@ using System.Reflection;
using Autofac;
using Autofac.Core;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Logging;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
@ -10,6 +12,8 @@ using DotNetty.Transport.Channels.Sockets;
using HybirdFrameworkCore.Autofac;
using HybirdFrameworkCore.Autofac.Attribute;
using log4net;
using Microsoft.Extensions.Logging;
using LogLevel = DotNetty.Handlers.Logging.LogLevel;
namespace HybirdFrameworkDriver.TcpClient;
@ -27,20 +31,36 @@ public class TcpClient<TH, TD, TE> where TH : IChannelHandler
public string Host { get; set; }
public int Port { get; set; }
public LogLevel? LogLevel { get; set; }
public void InitBootstrap(string host, int port, Action? channelInactiveHandler = null)
{
Host = host;
Port = port;
if (LogLevel != null)
{
InternalLoggerFactory.DefaultFactory.AddProvider(new Log4NetProvider());
}
_bootstrap = new Bootstrap();
_bootstrap
.Group(new MultithreadEventLoopGroup())
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new LoggingHandler())
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
var clientListenerHandler = new ClientListenerHandler<TH, TD, TE>(this);
var pipeline = channel.Pipeline;
if (LogLevel != null)
{
pipeline.AddLast(new LoggingHandler(LogLevel.Value));
}
// 监听器
pipeline.AddLast(clientListenerHandler);
pipeline.AddLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); // 触发读取超时

@ -2,6 +2,7 @@
using Autofac;
using Autofac.Core;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Logging;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
@ -10,6 +11,8 @@ using DotNetty.Transport.Channels.Sockets;
using HybirdFrameworkCore.Autofac;
using HybirdFrameworkCore.Autofac.Attribute;
using log4net;
using Microsoft.Extensions.Logging;
using LogLevel = DotNetty.Handlers.Logging.LogLevel;
namespace HybirdFrameworkDriver.TcpServer;
@ -28,8 +31,14 @@ public class TcpServer<TH, TD, TE> : IDisposable where TH : IChannelHandler
private int _port = 9000;
public LogLevel? LogLevel { get; set; }
public TcpServer()
{
if (LogLevel != null)
{
InternalLoggerFactory.DefaultFactory.AddProvider(new Log4NetProvider());
}
bossGroup = new MultithreadEventLoopGroup();
workerGroup = new MultithreadEventLoopGroup();
bootstrap = new ServerBootstrap();
@ -42,7 +51,10 @@ public class TcpServer<TH, TD, TE> : IDisposable where TH : IChannelHandler
{
var serverListenerHandler = new ServerListenerHandler<TH, TD, TE>();
var pipeline = channel.Pipeline;
pipeline.AddLast(new LoggingHandler(""));
if (LogLevel != null)
{
pipeline.AddLast(new LoggingHandler(LogLevel.Value));
}
pipeline.AddLast(serverListenerHandler);
pipeline.AddLast(new IdleStateHandler(0, 0, 180)); //检测空闲连接
//业务handler 这里是实际处理业务的Handler

@ -1,12 +1,34 @@
using System.Text;
using Autofac;
using Autofac.Core;
using HybirdFrameworkCore.Autofac;
using HybirdFrameworkCore.Autofac.Attribute;
using log4net;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Linq;
using Service.Cloud.Handler;
using Service.Cloud.Msg;
using Service.Cloud.Msg.Host.Req;
namespace Service.Cloud.Client;
public class CloudClient
[Scope("SingleInstance")]
public class CloudClient : IMqttClientConnectedHandler, IMqttApplicationMessageReceivedHandler,
IMqttClientDisconnectedHandler
{
#region client param
private static readonly ILog Log = LogManager.GetLogger(typeof(CloudClient));
#region tcp param
public string ServerIp { get; set; }
public int ServerPort { get; set; }
@ -20,8 +42,251 @@ public class CloudClient
#endregion
#region property
public bool Connected { get; set; }
public bool AutoReConnect { get; set; }
public string StationNo { get; set; }
public string SubTopic { get; set; }
public string PubTopic { get; set; }
public int Encrypt { get; set; }
public string? AesKey { get; set; }
#endregion
private IMqttClient? MqttClient;
private List<IBaseHandler> handlers = new List<IBaseHandler>();
private static ushort _incrementId;
private static ushort GetIncrementId()
{
if (_incrementId < 65535)
{
_incrementId += 1;
}
else
{
_incrementId = 1;
}
return _incrementId;
}
public void InitHandler()
{
var list = new List<Type>();
foreach (var reg in AppInfo.Container.ComponentRegistry.Registrations)
foreach (var service in reg.Services)
if (service is TypedService ts)
if (MatchHandlers(ts))
list.Add(ts.ServiceType);
foreach (var type in list)
{
var resolve = AppInfo.Container.Resolve(type);
handlers.Add((IBaseHandler)resolve);
}
}
private bool MatchHandlers(TypedService ts)
{
var interfaces = ts.ServiceType.GetInterfaces();
if (interfaces.Length > 0)
foreach (var type in interfaces)
{
if (type.ToString().Contains("Service.Cloud.Handler"))
{
return true;
}
}
return false;
}
public void Connect()
{
Log.Info($"begin connect cloud {ServerIp}:{ServerPort} with client={ClientId}");
if (MqttClient == null)
{
MqttClient = new MqttFactory().CreateMqttClient();
MqttClient.ConnectedHandler = this;
MqttClient.ApplicationMessageReceivedHandler = this;
MqttClient.DisconnectedHandler = this;
}
try
{
var task = MqttClient.ConnectAsync(BuildOptions());
MqttClientConnectResult result = task.Result;
Log.Info($"connect cloud {result.ResultCode}");
if (result.ResultCode == MqttClientConnectResultCode.Success)
{
Connected = true;
}
else
{
Connected = false;
if (AutoReConnect)
{
Connect();
}
}
}
catch (Exception e)
{
Log.Error("connect cloud error", e);
if (AutoReConnect)
{
Connect();
}
}
}
/// <summary>
/// 连接成功回调
/// </summary>
/// <param name="eventArgs"></param>
/// <returns></returns>
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
{
await DoSubTopic(SubTopic);
Publish(new SignIn());
}
private async Task DoSubTopic(string topic)
{
List<MqttTopicFilter> list = new List<MqttTopicFilter>();
string[] topics = topic.Split(new char[] { ',' });
foreach (string str in topics)
{
MqttTopicFilter topicFilter = new MqttTopicFilter
{
Topic = str,
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};
list.Add(topicFilter);
}
await MqttClient.SubscribeAsync(list.ToArray());
Log.Info($"subscribe {topic} success ");
}
public void Publish<T>(T data) where T : ICmd
{
Model<T> model = new Model<T>
{
Header = new Header()
{
cmd = data.GetCmd(),
chipherFlag = Encrypt,
id = GetIncrementId(),
sid = StationNo,
timeStamp = DateTime.Now.Millisecond
},
body = data
};
model.dataSign = SignData(model);
var appMsg = new MqttApplicationMessage
{
Topic = PubTopic,
Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(model)),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce,
Retain = false
};
Task<MqttClientPublishResult> task = MqttClient.PublishAsync(appMsg);
var result = task.Result;
if (result.ReasonCode == MqttClientPublishReasonCode.Success)
{
Log.Info($"send {JsonConvert.SerializeObject(model)} success");
}
}
private string SignData<T>(Model<T> model) where T : ICmd
{
IsoDateTimeConverter timeConverter = new IsoDateTimeConverter();
timeConverter.DateTimeFormat = "yyyy-MM-dd HH:mm:ss";
string body = JsonConvert.SerializeObject(model.body, timeConverter);
return body + ":" + model.Header.timeStamp + ":" + model.Header.id;
}
/// <summary>
/// 消息接收回调
/// </summary>
/// <param name="eventArgs"></param>
/// <returns></returns>
public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var message = eventArgs.ApplicationMessage;
if (message.Topic != SubTopic)
{
return Task.CompletedTask;
}
if (message.Payload == null)
{
return Task.CompletedTask;
}
string s = Encoding.UTF8.GetString(message.Payload);
if (string.IsNullOrWhiteSpace(s))
{
return Task.CompletedTask;
}
Log.Info($"from cloud receive {s} ");
JObject objResult = JObject.Parse(s);
string headerStr = objResult["header"].ToString();
Header? header = JsonConvert.DeserializeObject<Header>(headerStr);
if (header == null)
{
return Task.CompletedTask;
}
foreach (IBaseHandler handler in handlers)
{
if (handler.CanHandle(header.cmd))
{
string bodyStr = objResult["body"].ToString();
handler.Handle(bodyStr);
break;
}
}
return Task.CompletedTask;
}
/// <summary>
/// 断开回调
/// </summary>
/// <param name="eventArgs"></param>
/// <returns></returns>
public Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs)
{
Log.Info("cloud disconnect");
Connected = false;
if (AutoReConnect)
{
return Task.Run(Connect);
}
return Task.CompletedTask;
}
private MqttClientOptions BuildOptions()
private IMqttClientOptions BuildOptions()
{
MqttClientOptionsBuilder builder =
new MqttClientOptionsBuilder().WithTcpServer(ServerIp, ServerPort).WithClientId(ClientId);
@ -35,7 +300,8 @@ public class CloudClient
builder.WithCleanSession();
}
builder.WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepalivePeriod)).WithTimeout(TimeSpan.FromSeconds(Timeout));
builder.WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepalivePeriod))
.WithCommunicationTimeout(TimeSpan.FromSeconds(Timeout));
switch (Version)
{
case "3.1.0":

@ -0,0 +1,17 @@
using Autofac;
using HybirdFrameworkCore.Autofac;
using HybirdFrameworkCore.Autofac.Attribute;
namespace Service.Cloud.Client;
[Scope("SingleInstance")]
public class CloudClientMgr
{
public static CloudClient? CloudClient { get; set; }
public static void Init()
{
CloudClient = AppInfo.Container.Resolve<CloudClient>();
CloudClient.InitHandler();
}
}

@ -6,6 +6,8 @@ public class CloudConst
public static readonly string signIn = "signIn";
public static readonly string signInRes = "signInRes";
public static readonly string amtBat = "amtBat";
public static readonly string amtBatRes = "amtBatRes";
#endregion
}

@ -0,0 +1,20 @@
using HybirdFrameworkCore.Autofac.Attribute;
using Newtonsoft.Json;
using Service.Cloud.Common;
using Service.Cloud.Msg.Cloud.Req;
namespace Service.Cloud.Handler;
[Scope("InstancePerDependency")]
public class AmtBatHandler : IBaseHandler
{
public void Handle(string t)
{
AmtBat? amtBat = JsonConvert.DeserializeObject<AmtBat>(t);
}
public bool CanHandle(string cmd)
{
return CloudConst.amtBat == cmd;
}
}

@ -1,11 +1,8 @@
namespace Service.Cloud.Handler;
public interface IBaseHandler<in T>
public interface IBaseHandler
{
public void Handler(T t);
public bool CanHandle(string cmd);
public bool CanHandle(object obj)
{
return obj.GetType() == typeof(T);
}
public void Handle(string t);
}

@ -1,13 +1,23 @@
using HybirdFrameworkCore.Autofac.Attribute;
using Newtonsoft.Json;
using Service.Cloud.Common;
using Service.Cloud.Msg;
using Service.Cloud.Msg.Cloud.Resp;
namespace Service.Cloud.Handler;
[Scope("InstancePerDependency")]
public class SignInRespHandler : IBaseHandler<Model<SignInResp>>
public class SignInRespHandler : IBaseHandler
{
public void Handler(Model<SignInResp> t)
public void Handle(string t)
{
SignInResp? signInResp = JsonConvert.DeserializeObject<SignInResp>(t);
}
public bool CanHandle(string cmd)
{
return CloudConst.signInRes == cmd;
}
}

@ -0,0 +1,17 @@
using Service.Cloud.Common;
namespace Service.Cloud.Msg.Cloud.Req;
public class AmtBat : ICmd
{
public string sn { get; set; }
public string cn { get; set; }
public string bn { get; set; }
public int bm { get; set; }
public DateTime at { get; set; }
public int am { get; set; }
public string GetCmd()
{
return CloudConst.amtBat;
}
}

@ -1,6 +1,6 @@
using Service.Cloud.Common;
namespace Service.Cloud.Msg;
namespace Service.Cloud.Msg.Host.Req;
public class SignIn : ICmd
{

@ -0,0 +1,13 @@
using Service.Cloud.Common;
namespace Service.Cloud.Msg.Host.Resp;
public class AmtBatRes : ICmd
{
public int rs { get; set; }
public string GetCmd()
{
return CloudConst.amtBatRes;
}
}

@ -14,7 +14,8 @@
<PackageReference Include="DotNetty.Handlers" Version="0.7.5"/>
<PackageReference Include="DotNetty.Transport" Version="0.7.5"/>
<PackageReference Include="log4net" Version="2.0.15"/>
<PackageReference Include="MQTTnet.AspNetCore" Version="4.3.5.1141" />
<PackageReference Include="MQTTnet.AspNetCore" Version="3.1.2" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="SqlSugarCore" Version="5.1.4.115"/>
<PackageReference Include="System.ServiceModel.Duplex" Version="4.8.1" />
<PackageReference Include="System.ServiceModel.Http" Version="4.8.1" />
@ -29,9 +30,4 @@
<ProjectReference Include="..\Repository\Repository.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Cloud\Msg\Cloud\Req\" />
<Folder Include="Cloud\Msg\Host\Resp\" />
</ItemGroup>
</Project>

@ -0,0 +1,7 @@
namespace Service.Station;
public class StationParamConst
{
public static readonly string StationNo = "Station.StationNo";
public static readonly string StationName = "Station.StationName";
}

@ -158,11 +158,11 @@ namespace Service.System
/// <summary>
///
/// </summary>
/// <param name="key">GroupCode#code</param>
/// <param name="key">GroupCode.code</param>
/// <returns></returns>
public string? Get(string key)
{
string[] keys = key.Split("#");
string[] keys = key.Split('.');
if (keys.Length !=2)
{
throw new InvalidParameterException("配置数据key格式错误");
@ -180,11 +180,11 @@ namespace Service.System
/// <summary>
///
/// </summary>
/// <param name="key">GroupCode#code</param>
/// <param name="key">GroupCode.code</param>
/// <param name="value"></param>
public void Set(string key, string value)
{
string[] keys = key.Split("#");
string[] keys = key.Split('.');
if (keys.Length !=2)
{
throw new InvalidParameterException("配置数据key格式错误");

@ -1,3 +1,4 @@
using System.Text;
using Autofac;
using Autofac.Extensions.DependencyInjection;
using Entity.Dto.Resp;
@ -8,10 +9,9 @@ using HybirdFrameworkCore.Redis;
using Mapster;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.IdentityModel.Tokens;
using Microsoft.OpenApi.Models;
using Service.Cloud.Client;
using SqlSugar;
using SqlSugar.IOC;
using System.Text;
var builder = WebApplication.CreateBuilder(args);
@ -142,4 +142,6 @@ app.MapControllers();
AppInfo.Container = app.Services.GetAutofacRoot();
CloudClientMgr.Init();
app.Run();
Loading…
Cancel
Save