using System.Text; using Autofac; using Autofac.Core; using HybirdFrameworkCore.Autofac; using HybirdFrameworkCore.Autofac.Attribute; using HybirdFrameworkDriver.Common; using log4net; using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Publishing; using MQTTnet.Client.Receiving; using MQTTnet.Formatter; using MQTTnet.Protocol; using Newtonsoft.Json; using Newtonsoft.Json.Converters; using Newtonsoft.Json.Linq; using Service.Cloud.Handler; using Service.Cloud.Msg; using Service.Cloud.Msg.Cloud.Req; using Service.Cloud.Msg.Cloud.Resp; using Service.Cloud.Msg.Host.Req; namespace Service.Cloud.Client; [Scope("SingleInstance")] public class CloudClient : IMqttClientConnectedHandler, IMqttApplicationMessageReceivedHandler, IMqttClientDisconnectedHandler { private static readonly ILog Log = LogManager.GetLogger(typeof(CloudClient)); #region tcp param public string ServerIp { get; set; } public int ServerPort { get; set; } public string ClientId { get; set; } public string? Username { get; set; } public string? Password { get; set; } public int KeepalivePeriod { get; set; } = 30; public int Timeout { get; set; } = 60; public string Version { get; set; } = "5.0.16"; public bool IsCleanSession { get; set; } = false; #endregion #region property public bool Connected { get; set; } public bool AutoReConnect { get; set; } public string StationNo { get; set; } public string SubTopic { get; set; } public string PubTopic { get; set; } public int Encrypt { get; set; } public string? AesKey { get; set; } public bool Authed { get; set; } #endregion #region Cmd msg cache public CarCanStart? CarCanStart { get; set; } /// /// /// public MsgPair CarAuth { get; set; } = new(); #endregion #region basic private IMqttClient? MqttClient; private List handlers = new(); private static ushort _incrementId; private static ushort GetIncrementId() { if (_incrementId < 65535) { _incrementId += 1; } else { _incrementId = 1; } return _incrementId; } public void InitHandler() { var list = new List(); foreach (var reg in AppInfo.Container.ComponentRegistry.Registrations) foreach (var service in reg.Services) if (service is TypedService ts) if (MatchHandlers(ts)) list.Add(ts.ServiceType); foreach (var type in list) { var resolve = AppInfo.Container.Resolve(type); handlers.Add((IBaseHandler)resolve); } } private bool MatchHandlers(TypedService ts) { var interfaces = ts.ServiceType.GetInterfaces(); if (interfaces.Length > 0) foreach (var type in interfaces) { if (type.ToString().Contains("Service.Cloud.Handler")) { return true; } } return false; } public void Connect() { Log.Info($"begin connect cloud {ServerIp}:{ServerPort} with client={ClientId}"); if (MqttClient == null) { MqttClient = new MqttFactory().CreateMqttClient(); MqttClient.ConnectedHandler = this; MqttClient.ApplicationMessageReceivedHandler = this; MqttClient.DisconnectedHandler = this; } try { var task = MqttClient.ConnectAsync(BuildOptions()); MqttClientConnectResult result = task.Result; Log.Info($"connect cloud {result.ResultCode}"); if (result.ResultCode == MqttClientConnectResultCode.Success) { Connected = true; } else { Connected = false; if (AutoReConnect) { Thread.Sleep(5000); Connect(); } } } catch (Exception e) { Log.Error("connect cloud error", e); if (AutoReConnect) { Thread.Sleep(5000); Connect(); } } } /// /// 连接成功回调 /// /// /// public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs) { await DoSubTopic(SubTopic); Publish(new SignIn()); } private async Task DoSubTopic(string topic) { List list = new List(); string[] topics = topic.Split(new char[] { ',' }); foreach (string str in topics) { MqttTopicFilter topicFilter = new MqttTopicFilter { Topic = str, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; list.Add(topicFilter); } await MqttClient.SubscribeAsync(list.ToArray()); Log.Info($"subscribe {topic} success "); } public void Publish(T data) where T : ICmd { Model model = new Model { Header = new Header() { cmd = data.GetCmd(), chipherFlag = Encrypt, id = GetIncrementId(), sid = StationNo, timeStamp = DateTime.Now.Millisecond }, body = data }; model.dataSign = SignData(model); var appMsg = new MqttApplicationMessage { Topic = PubTopic, Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(model)), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce, Retain = false }; Task task = MqttClient.PublishAsync(appMsg); var result = task.Result; if (result.ReasonCode == MqttClientPublishReasonCode.Success) { Log.Info($"send {JsonConvert.SerializeObject(model)} success"); } } private string SignData(Model model) where T : ICmd { IsoDateTimeConverter timeConverter = new IsoDateTimeConverter(); timeConverter.DateTimeFormat = "yyyy-MM-dd HH:mm:ss"; string body = JsonConvert.SerializeObject(model.body, timeConverter); return body + ":" + model.Header.timeStamp + ":" + model.Header.id; } /// /// 消息接收回调 /// /// /// public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs) { var message = eventArgs.ApplicationMessage; if (message.Topic != SubTopic) { return Task.CompletedTask; } if (message.Payload == null) { return Task.CompletedTask; } string s = Encoding.UTF8.GetString(message.Payload); if (string.IsNullOrWhiteSpace(s)) { return Task.CompletedTask; } Log.Info($"from cloud receive {s} "); JObject objResult = JObject.Parse(s); string headerStr = objResult["header"].ToString(); Header? header = JsonConvert.DeserializeObject
(headerStr); if (header == null) { return Task.CompletedTask; } foreach (IBaseHandler handler in handlers) { if (handler.CanHandle(header.cmd)) { string bodyStr = objResult["body"].ToString(); handler.Handle(bodyStr); break; } } return Task.CompletedTask; } /// /// 断开回调 /// /// /// public Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs) { Log.Info("cloud disconnect"); Connected = false; if (AutoReConnect) { return Task.Run(Connect); } return Task.CompletedTask; } private IMqttClientOptions BuildOptions() { MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder().WithTcpServer(ServerIp, ServerPort).WithClientId(ClientId); if (!string.IsNullOrWhiteSpace(Username)) { builder.WithCredentials(Username, Encoding.UTF8.GetBytes(Password)); } if (IsCleanSession) { builder.WithCleanSession(); } builder.WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepalivePeriod)) .WithCommunicationTimeout(TimeSpan.FromSeconds(Timeout)); switch (Version) { case "3.1.0": builder.WithProtocolVersion(MqttProtocolVersion.V310); break; case "5.0.16": builder.WithProtocolVersion(MqttProtocolVersion.V500); break; default: builder.WithProtocolVersion(MqttProtocolVersion.V311); break; } return builder.Build(); } #endregion #region 主动发送CMD public VehicleCertificationResp? SendVehicleCertification(VehicleCertification vehicleCertification, TimeSpan timeSpan) { this.CarAuth.Req = vehicleCertification; this.Publish(vehicleCertification); return CarAuth.GetResp(timeSpan); } #endregion }