using System.Text; using Autofac; using Autofac.Core; using Common.Util; using Entity.DbModel.Station; using HybirdFrameworkCore.Autofac; using HybirdFrameworkCore.Autofac.Attribute; using HybirdFrameworkDriver.Common; using log4net; using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Formatter; using MQTTnet.Protocol; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Repository.Station; using Service.Cloud.Handler; using Service.Cloud.Msg; using Service.Cloud.Msg.Cloud.Req; using Service.Cloud.Msg.Cloud.Resp; using Service.Cloud.Msg.Cloud.Resp.OutCharger; using Service.Cloud.Msg.Host.Req; using Service.Cloud.Msg.Host.Req.OutCharger; namespace Service.Cloud.Client; [Scope("SingleInstance")] public class CloudClient : IMqttClientConnectedHandler, IMqttApplicationMessageReceivedHandler, IMqttClientDisconnectedHandler { private static readonly ILog Log = LogManager.GetLogger(typeof(CloudClient)); public SwapOrderRepository SwapOrderRepository { get; set;} #region tcp param public string ServerIp { get; set; } public int ServerPort { get; set; } public string ClientId { get; set; } public string? Username { get; set; } public string? Password { get; set; } public int KeepalivePeriod { get; set; } = 3000; public int Timeout { get; set; } = 60; public string MqttVersion { get; set; } = "4.0.0"; public bool IsCleanSession { get; set; } = false; #endregion #region property public bool Connected { get; set; } public bool AutoReConnect { get; set; } public string StationNo { get; set; } public string SubTopic { get; set; } public string PubTopic { get; set; } public int Encrypt { get; set; } public string? AesKey { get; set; } public bool Authed { get; set; } #endregion #region Cmd msg cache public CarCanStart? CarCanStart { get; set; } /// /// /// public MsgPair CarAuth { get; set; } = new(); public MsgPair ChannelStatus { get; set; } = new(); public MsgPair AirConditioning { get; set; } = new(); public MsgPair ChargeRecord { get; set; } = new(); public MsgPair EndLog { get; set; } = new(); public MsgPair HostStatus { get; set; } = new(); public MsgPair RealTimeFault { get; set; } = new(); public MsgPair ReportingDevice { get; set; } = new(); public MsgPair Sign { get; set; } = new(); public MsgPair StartLog { get; set; } = new(); public MsgPair TemperatureHumidity { get; set; } = new(); public MsgPair UploadPowerChange { get; set; } = new(); public MsgPair VehicleData { get; set; } = new(); public MsgPair ChargeRecordUpLoad { get; set; } = new(); public MsgPair ChargeDevDataInfo { get; set; } = new(); public MsgPair PileEndCharge { get; set; } = new(); public MsgPair PileChargeRealtime { get; set; } = new(); public MsgPair PileRealtime { get; set; } = new(); #endregion #region basic private IMqttClient? MqttClient; private List handlers = new(); private static ushort _incrementId; private static ushort GetIncrementId() { if (_incrementId < 65535) { _incrementId += 1; } else { _incrementId = 1; } return _incrementId; } public void InitHandler() { var list = new List(); foreach (var reg in AppInfo.Container.ComponentRegistry.Registrations) foreach (var service in reg.Services) if (service is TypedService ts) if (MatchHandlers(ts)) list.Add(ts.ServiceType); foreach (var type in list) { var resolve = AppInfo.Container.Resolve(type); handlers.Add((IBaseHandler)resolve); } } private bool MatchHandlers(TypedService ts) { var interfaces = ts.ServiceType.GetInterfaces(); if (interfaces.Length > 0) foreach (var type in interfaces) { if (type.ToString().Contains("Service.Cloud.Handler")) { return true; } } return false; } public void Connect() { Log.Info($"begin connect cloud {ServerIp}:{ServerPort} with client={ClientId}"); if (MqttClient == null) { MqttClient = new MqttFactory().CreateMqttClient(); MqttClient.ConnectedHandler = this; MqttClient.ApplicationMessageReceivedHandler = this; MqttClient.DisconnectedHandler = this; } try { var task = MqttClient.ConnectAsync(BuildOptions()); MqttClientConnectResult result = task.Result; Log.Info($"connect cloud {result.ResultCode}"); if (result.ResultCode == MqttClientConnectResultCode.Success) { Connected = true; } else { Connected = false; if (AutoReConnect) { Thread.Sleep(5000); Connect(); } } } catch (Exception e) { Log.Error("connect cloud error", e); if (AutoReConnect) { Thread.Sleep(5000); Connect(); } } } /// /// 连接成功回调 /// /// /// public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs) { await DoSubTopic(SubTopic); /*SendVehicleCertification(new VehicleCertification() { dt = DateTime.Now, en = 1, mode =1, rfid= "LZ5NB6D33RB000438", ty= 2, vi ="LZ5NB6D33RB000438" });*/ // SendSignIn(new SignIn() // { // sn = StaticStationInfo.StationNo, // st = "01", // ss = StaticStationInfo.StationStatus, // en = 1, // cn = 7 // }); } private async Task DoSubTopic(string topic) { List list = new List(); string[] topics = topic.Split(new char[] { ',' }); foreach (string str in topics) { MqttTopicFilter topicFilter = new MqttTopicFilter { Topic = str, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }; list.Add(topicFilter); } await MqttClient.SubscribeAsync(list.ToArray()); Log.Info($"subscribe {topic} success "); } public void Publish(T data) where T : ICmd { if (MqttClient.IsConnected) { Model model = new Model { Header = new Header() { cmd = data.GetCmd(), cipherFlag = Encrypt, id = GetIncrementId(), sid = StationNo, timeStamp = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000 }, body = data }; model.dataSign = "";//SignData(model); var settings = new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore, DateFormatString = "yyyy-MM-dd HH:mm:ss", NullValueHandling = NullValueHandling.Ignore }; Log.Info($"send {JsonConvert.SerializeObject(model, settings)}"); var appMsg = new MqttApplicationMessage { Topic = PubTopic, Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(model, settings)), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Retain = false }; MqttClient.PublishAsync(appMsg); } } private string SignData(Model model) where T : ICmd { var settings = new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore, DateFormatString = "yyyy-MM-dd HH:mm:ss", NullValueHandling = NullValueHandling.Ignore }; string body = JsonConvert.SerializeObject(model.body, settings); return MD5Util.MD5Encrypt32(body + ":" + model.Header.timeStamp + ":" + model.Header.id).ToLower(); } /// /// 消息接收回调 /// /// /// public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs) { var message = eventArgs.ApplicationMessage; if (message.Topic != SubTopic) { return Task.CompletedTask; } if (message.Payload == null) { return Task.CompletedTask; } string s = Encoding.UTF8.GetString(message.Payload); if (string.IsNullOrWhiteSpace(s)) { return Task.CompletedTask; } Log.Info($"from cloud receive {s} "); JObject objResult = JObject.Parse(s); string headerStr = objResult["header"].ToString(); Header? header = JsonConvert.DeserializeObject
(headerStr); if (header == null) { return Task.CompletedTask; } foreach (IBaseHandler handler in handlers) { if (handler.CanHandle(header.cmd)) { string bodyStr = objResult["body"].ToString(); handler.Handle(bodyStr); break; } } return Task.CompletedTask; } /// /// 断开回调 /// /// /// public Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs) { Log.Info("cloud disconnect"); Connected = false; if (AutoReConnect) { Connect(); } return Task.CompletedTask; } private IMqttClientOptions BuildOptions() { MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder().WithTcpServer(ServerIp, ServerPort).WithClientId(ClientId); if (!string.IsNullOrWhiteSpace(Username)) { builder.WithCredentials(Username, Encoding.UTF8.GetBytes(Password)); } if (IsCleanSession) { builder.WithCleanSession(); } builder.WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepalivePeriod)) .WithCommunicationTimeout(TimeSpan.FromSeconds(Timeout)); switch (MqttVersion) { case "3.1.0": builder.WithProtocolVersion(MqttProtocolVersion.V310); break; case "5.0.16": builder.WithProtocolVersion(MqttProtocolVersion.V500); break; default: builder.WithProtocolVersion(MqttProtocolVersion.V311); break; } return builder.Build(); } #endregion #region 主动发送CMD public VehicleCertificationResp? SendVehicleCertification(VehicleCertification vehicleCertification, TimeSpan? timeSpan = null) { Log.Info(vehicleCertification); this.CarAuth.Req = vehicleCertification; this.Publish(vehicleCertification); return CarAuth.GetResp(timeSpan); } /// /// 换电站通道状态上报 /// /// /// /// public ChannelStatusReportingResp? SendChannelStatusReporting(ChannelStatusReporting channelStatusReporting, TimeSpan timeSpan) { this.ChannelStatus.Req = channelStatusReporting; this.Publish(channelStatusReporting); return ChannelStatus.GetResp(timeSpan); } public AirConditioningDataResp? SendAirConditioningData(AirConditioningData airConditioningData, TimeSpan timeSpan) { this.AirConditioning.Req = airConditioningData; this.Publish(airConditioningData); return AirConditioning.GetResp(timeSpan); } public ChargeRecordReportingResp? SendChargeRecordReporting(ChargeRecordReporting chargeRecord, TimeSpan timeSpan) { this.ChargeRecord.Req = chargeRecord; this.Publish(chargeRecord); return ChargeRecord.GetResp(timeSpan); } public EndLogMessageResp? SendEndLogMessage(EndLogMessage endLogMessage, TimeSpan timeSpan) { this.EndLog.Req = endLogMessage; this.Publish(endLogMessage); return EndLog.GetResp(timeSpan); } public HostStatusReportedResp? SendHostStatusReported(HostStatusReported hostStatusReported, TimeSpan timeSpan) { this.HostStatus.Req = hostStatusReported; this.Publish(hostStatusReported); return HostStatus.GetResp(timeSpan); } public RealTimeFaultInfoResp? SendRealTimeFaultInfo(RealTimeFaultInfo realTimeFaultInfo, TimeSpan timeSpan) { this.RealTimeFault.Req = realTimeFaultInfo; this.Publish(realTimeFaultInfo); return RealTimeFault.GetResp(timeSpan); } public ReportingDeviceListResp? SendReportingDeviceList(ReportingDeviceList reportingDeviceList, TimeSpan timeSpan) { this.ReportingDevice.Req = reportingDeviceList; this.Publish(reportingDeviceList); return ReportingDevice.GetResp(timeSpan); } public SignInResp? SendSignIn(SignIn signIn, TimeSpan? timeSpan=null) { this.Sign.Req = signIn; this.Publish(signIn); return Sign.GetResp(timeSpan); } public StartLogMessageResp? SendStartLogMessage(StartLogMessage startLogMessage, TimeSpan timeSpan) { this.StartLog.Req = startLogMessage; this.Publish(startLogMessage); return StartLog.GetResp(timeSpan); } public TemperatureHumidityDataResp? SendTemperatureHumidityData(TemperatureHumidityData temperatureHumidityData, TimeSpan timeSpan) { this.TemperatureHumidity.Req = temperatureHumidityData; this.Publish(temperatureHumidityData); return TemperatureHumidity.GetResp(timeSpan); } /// /// 上传换电订单 /// /// /// /// public UploadSwapOrderResp? SendUploadPowerChangeOrder(UploadSwapOrder uploadSwapOrder, TimeSpan? timeSpan = null) { this.UploadPowerChange.Req = uploadSwapOrder; this.Publish(uploadSwapOrder); return UploadPowerChange.GetResp(timeSpan); } public VehicleDataReportingResp? SendVehicleDataReporting(VehicleDataReporting vehicleDataReporting, TimeSpan timeSpan) { this.VehicleData.Req = vehicleDataReporting; this.Publish(vehicleDataReporting); return VehicleData.GetResp(timeSpan); } public ChargeRecordUploadRes? SendChargeRecordUpLoad(ChargeRecordUpLoad req, TimeSpan? timeSpan = null) { this.ChargeRecordUpLoad.Req = req; this.Publish(req); return ChargeRecordUpLoad.GetResp(timeSpan); } public ChargeDevDataInfoRes? SendChargeDevDataInfo(ChargeDevDataInfo req, TimeSpan? timeSpan = null) { this.ChargeDevDataInfo.Req = req; this.Publish(req); return ChargeDevDataInfo.GetResp(timeSpan); } public PileEndChargeResp? SendPileEndCharge(PileEndCharge req, TimeSpan? timeSpan = null) { this.PileEndCharge.Req = req; this.Publish(req); return PileEndCharge.GetResp(timeSpan); } public PileChargeRealtimeResp? SendPileChargeRealtime(PileChargeRealtime req, TimeSpan? timeSpan = null) { this.PileChargeRealtime.Req = req; this.Publish(req); return PileChargeRealtime.GetResp(timeSpan); } public PileRealtimeResp? SendPileRealtime(PileRealtime req, TimeSpan? timeSpan = null) { this.PileRealtime.Req = req; this.Publish(req); return PileRealtime.GetResp(timeSpan); } #endregion #region business func /// /// /// /// /// 1: 自动; 2: 人工手动 /// 超时等待 /// public bool PublishChargeOrder(List orders, int op, TimeSpan? timeSpan = null) { ChargeOrder chargeOrder = orders[0]; ChargeOrder lastChargeOrder = orders[^1]; string swapOrderSn = chargeOrder.SwapOrderSn; SwapOrder? swapOrder = SwapOrderRepository.QueryByClause(it => it.Sn == swapOrderSn); ChargeRecordUpLoad req = new ChargeRecordUpLoad() { chrsn = chargeOrder.CloudChargeOrder, son = swapOrder?.CloudSn, bid = chargeOrder.BatteryNo, st = chargeOrder.StartTime ?? DateTime.Now, et = lastChargeOrder.EndTime ?? DateTime.Now, ssoc = chargeOrder.StartSoc ?? 0, esoc = lastChargeOrder.StopSoc ?? 0, //ssoe = chargeOrder.soe //esoe dcce =0, acce =0, tp = 0, pp = 0, fp = 0, vp = 0, ct = 0, cn = orders.Count, sfs = op, vin = swapOrder?.VehicleVin, sfoc = 0, }; foreach (ChargeOrder order in orders) { req.dcce += Convert.ToSingle(order.StopDcElec ?? 0 - order.StartDcElec ?? 0); req.acce += Convert.ToSingle(order.StopAcElec ?? 0 - order.StartAcElec ?? 0); req.tp += Convert.ToSingle(order.SharpElecCount); req.pp += Convert.ToSingle(order.PeakElecCount); req.fp += Convert.ToSingle(order.FlatElecCount); req.vp += Convert.ToSingle(order.ValleyElecCount); req.ct += ((order.EndTime ?? DateTime.Now).Subtract(order.StartTime ?? DateTime.Now)).Minutes; } this.SendChargeRecordUpLoad(req); return true; } #endregion }