using System.Text;
using Autofac;
using Autofac.Core;
using Common.Util;
using Entity.DbModel.Station;
using HybirdFrameworkCore.Autofac;
using HybirdFrameworkCore.Autofac.Attribute;
using HybirdFrameworkDriver.Common;
using log4net;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Repository.Station;
using Service.Cloud.Handler;
using Service.Cloud.Msg;
using Service.Cloud.Msg.Cloud.Req;
using Service.Cloud.Msg.Cloud.Resp;
using Service.Cloud.Msg.Cloud.Resp.OutCharger;
using Service.Cloud.Msg.Host.Req;
using Service.Cloud.Msg.Host.Req.OutCharger;
namespace Service.Cloud.Client;
[Scope("SingleInstance")]
public class CloudClient : IMqttClientConnectedHandler, IMqttApplicationMessageReceivedHandler,
IMqttClientDisconnectedHandler
{
private static readonly ILog Log = LogManager.GetLogger(typeof(CloudClient));
public SwapOrderRepository SwapOrderRepository { get; set;}
#region tcp param
public string ServerIp { get; set; }
public int ServerPort { get; set; }
public string ClientId { get; set; }
public string? Username { get; set; }
public string? Password { get; set; }
public int KeepalivePeriod { get; set; } = 3000;
public int Timeout { get; set; } = 60;
public string MqttVersion { get; set; } = "4.0.0";
public bool IsCleanSession { get; set; } = false;
#endregion
#region property
public bool Connected { get; set; }
public bool AutoReConnect { get; set; }
public string StationNo { get; set; }
public string SubTopic { get; set; }
public string PubTopic { get; set; }
public int Encrypt { get; set; }
public string? AesKey { get; set; }
public bool Authed { get; set; }
#endregion
#region Cmd msg cache
public CarCanStart? CarCanStart { get; set; }
///
///
///
public MsgPair CarAuth { get; set; } = new();
public MsgPair BatData { get; set; } = new();
public MsgPair ChannelStatus { get; set; } = new();
public MsgPair AirConditioning { get; set; } = new();
public MsgPair ChargeRecord { get; set; } = new();
public MsgPair EndLog { get; set; } = new();
public MsgPair HostStatus { get; set; } = new();
public MsgPair RealTimeFault { get; set; } = new();
public MsgPair ReportingDevice { get; set; } = new();
public MsgPair Sign { get; set; } = new();
public MsgPair StartLog { get; set; } = new();
public MsgPair TemperatureHumidity { get; set; } = new();
public MsgPair UploadPowerChange { get; set; } = new();
public MsgPair VehicleData { get; set; } = new();
public MsgPair ChargeRecordUpLoad { get; set; } = new();
public MsgPair ChargeDevDataInfo { get; set; } = new();
public MsgPair PileEndCharge { get; set; } = new();
public MsgPair PileChargeRealtime { get; set; } = new();
public MsgPair PileRealtime { get; set; } = new();
#endregion
#region basic
private IMqttClient? MqttClient;
private List handlers = new();
private static ushort _incrementId;
private static ushort GetIncrementId()
{
if (_incrementId < 65535)
{
_incrementId += 1;
}
else
{
_incrementId = 1;
}
return _incrementId;
}
public void InitHandler()
{
var list = new List();
foreach (var reg in AppInfo.Container.ComponentRegistry.Registrations)
foreach (var service in reg.Services)
if (service is TypedService ts)
if (MatchHandlers(ts))
list.Add(ts.ServiceType);
foreach (var type in list)
{
var resolve = AppInfo.Container.Resolve(type);
handlers.Add((IBaseHandler)resolve);
}
}
private bool MatchHandlers(TypedService ts)
{
var interfaces = ts.ServiceType.GetInterfaces();
if (interfaces.Length > 0)
foreach (var type in interfaces)
{
if (type.ToString().Contains("Service.Cloud.Handler"))
{
return true;
}
}
return false;
}
public void Connect()
{
Log.Info($"begin connect cloud {ServerIp}:{ServerPort} with client={ClientId}");
if (MqttClient == null)
{
MqttClient = new MqttFactory().CreateMqttClient();
MqttClient.ConnectedHandler = this;
MqttClient.ApplicationMessageReceivedHandler = this;
MqttClient.DisconnectedHandler = this;
}
try
{
var task = MqttClient.ConnectAsync(BuildOptions());
MqttClientConnectResult result = task.Result;
Log.Info($"connect cloud {result.ResultCode}");
if (result.ResultCode == MqttClientConnectResultCode.Success)
{
Connected = true;
}
else
{
Connected = false;
if (AutoReConnect)
{
Thread.Sleep(5000);
Connect();
}
}
}
catch (Exception e)
{
Log.Error("connect cloud error", e);
if (AutoReConnect)
{
Thread.Sleep(5000);
Connect();
}
}
}
///
/// 连接成功回调
///
///
///
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
{
await DoSubTopic(SubTopic);
/*SendVehicleCertification(new VehicleCertification()
{
dt = DateTime.Now,
en = 1,
mode =1,
rfid= "LZ5NB6D33RB000438",
ty= 2,
vi ="LZ5NB6D33RB000438"
});*/
// SendSignIn(new SignIn()
// {
// sn = StaticStationInfo.StationNo,
// st = "01",
// ss = StaticStationInfo.StationStatus,
// en = 1,
// cn = 7
// });
}
private async Task DoSubTopic(string topic)
{
List list = new List();
string[] topics = topic.Split(new char[] { ',' });
foreach (string str in topics)
{
MqttTopicFilter topicFilter = new MqttTopicFilter
{
Topic = str,
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce
};
list.Add(topicFilter);
}
await MqttClient.SubscribeAsync(list.ToArray());
Log.Info($"subscribe {topic} success ");
}
public void Publish(T data) where T : ICmd
{
if (MqttClient.IsConnected)
{
Model model = new Model
{
Header = new Header()
{
cmd = data.GetCmd(),
cipherFlag = Encrypt,
id = GetIncrementId(),
sid = StationNo,
timeStamp = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000
},
body = data
};
model.dataSign = "";//SignData(model);
var settings = new JsonSerializerSettings()
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore,
DateFormatString = "yyyy-MM-dd HH:mm:ss",
NullValueHandling = NullValueHandling.Ignore
};
Log.Info($"send {JsonConvert.SerializeObject(model, settings)}");
var appMsg = new MqttApplicationMessage
{
Topic = PubTopic,
Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(model, settings)),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = false
};
MqttClient.PublishAsync(appMsg);
}
}
private string SignData(Model model) where T : ICmd
{
var settings = new JsonSerializerSettings()
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore,
DateFormatString = "yyyy-MM-dd HH:mm:ss",
NullValueHandling = NullValueHandling.Ignore
};
string body = JsonConvert.SerializeObject(model.body, settings);
return MD5Util.MD5Encrypt32(body + ":" + model.Header.timeStamp + ":" + model.Header.id).ToLower();
}
///
/// 消息接收回调
///
///
///
public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var message = eventArgs.ApplicationMessage;
if (message.Topic != SubTopic)
{
return Task.CompletedTask;
}
if (message.Payload == null)
{
return Task.CompletedTask;
}
string s = Encoding.UTF8.GetString(message.Payload);
if (string.IsNullOrWhiteSpace(s))
{
return Task.CompletedTask;
}
Log.Info($"from cloud receive {s} ");
JObject objResult = JObject.Parse(s);
string headerStr = objResult["header"].ToString();
Header? header = JsonConvert.DeserializeObject(headerStr);
if (header == null)
{
return Task.CompletedTask;
}
foreach (IBaseHandler handler in handlers)
{
if (handler.CanHandle(header.cmd))
{
string bodyStr = objResult["body"].ToString();
handler.Handle(bodyStr);
break;
}
}
return Task.CompletedTask;
}
///
/// 断开回调
///
///
///
public Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs)
{
Log.Info("cloud disconnect");
Connected = false;
if (AutoReConnect)
{
Connect();
}
return Task.CompletedTask;
}
private IMqttClientOptions BuildOptions()
{
MqttClientOptionsBuilder builder =
new MqttClientOptionsBuilder().WithTcpServer(ServerIp, ServerPort).WithClientId(ClientId);
if (!string.IsNullOrWhiteSpace(Username))
{
builder.WithCredentials(Username, Encoding.UTF8.GetBytes(Password));
}
if (IsCleanSession)
{
builder.WithCleanSession();
}
builder.WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepalivePeriod))
.WithCommunicationTimeout(TimeSpan.FromSeconds(Timeout));
switch (MqttVersion)
{
case "3.1.0":
builder.WithProtocolVersion(MqttProtocolVersion.V310);
break;
case "5.0.16":
builder.WithProtocolVersion(MqttProtocolVersion.V500);
break;
default:
builder.WithProtocolVersion(MqttProtocolVersion.V311);
break;
}
return builder.Build();
}
#endregion
#region 主动发送CMD
public VehicleCertificationResp? SendVehicleCertification(VehicleCertification vehicleCertification,
TimeSpan? timeSpan = null)
{
Log.Info(vehicleCertification);
this.CarAuth.Req = vehicleCertification;
this.Publish(vehicleCertification);
return CarAuth.GetResp(timeSpan);
}
public BatDataInfoRes? SendBatDataInfo(BatDataInfo batDataInfo,
TimeSpan? timeSpan = null)
{
this.BatData.Req = batDataInfo;
this.Publish(batDataInfo);
return BatData.GetResp(timeSpan);
}
///
/// 换电站通道状态上报
///
///
///
///
public ChannelStatusReportingResp? SendChannelStatusReporting(ChannelStatusReporting channelStatusReporting,
TimeSpan timeSpan)
{
this.ChannelStatus.Req = channelStatusReporting;
this.Publish(channelStatusReporting);
return ChannelStatus.GetResp(timeSpan);
}
public AirConditioningDataResp? SendAirConditioningData(AirConditioningData airConditioningData,
TimeSpan timeSpan)
{
this.AirConditioning.Req = airConditioningData;
this.Publish(airConditioningData);
return AirConditioning.GetResp(timeSpan);
}
public ChargeRecordReportingResp? SendChargeRecordReporting(ChargeRecordReporting chargeRecord,
TimeSpan timeSpan)
{
this.ChargeRecord.Req = chargeRecord;
this.Publish(chargeRecord);
return ChargeRecord.GetResp(timeSpan);
}
public EndLogMessageResp? SendEndLogMessage(EndLogMessage endLogMessage,
TimeSpan timeSpan)
{
this.EndLog.Req = endLogMessage;
this.Publish(endLogMessage);
return EndLog.GetResp(timeSpan);
}
public HostStatusReportedResp? SendHostStatusReported(HostStatusReported hostStatusReported,
TimeSpan timeSpan)
{
this.HostStatus.Req = hostStatusReported;
this.Publish(hostStatusReported);
return HostStatus.GetResp(timeSpan);
}
public RealTimeFaultInfoResp? SendRealTimeFaultInfo(RealTimeFaultInfo realTimeFaultInfo,
TimeSpan timeSpan)
{
this.RealTimeFault.Req = realTimeFaultInfo;
this.Publish(realTimeFaultInfo);
return RealTimeFault.GetResp(timeSpan);
}
public ReportingDeviceListResp? SendReportingDeviceList(ReportingDeviceList reportingDeviceList,
TimeSpan timeSpan)
{
this.ReportingDevice.Req = reportingDeviceList;
this.Publish(reportingDeviceList);
return ReportingDevice.GetResp(timeSpan);
}
public SignInResp? SendSignIn(SignIn signIn,
TimeSpan? timeSpan=null)
{
this.Sign.Req = signIn;
this.Publish(signIn);
return Sign.GetResp(timeSpan);
}
public StartLogMessageResp? SendStartLogMessage(StartLogMessage startLogMessage,
TimeSpan timeSpan)
{
this.StartLog.Req = startLogMessage;
this.Publish(startLogMessage);
return StartLog.GetResp(timeSpan);
}
public TemperatureHumidityDataResp? SendTemperatureHumidityData(TemperatureHumidityData temperatureHumidityData,
TimeSpan timeSpan)
{
this.TemperatureHumidity.Req = temperatureHumidityData;
this.Publish(temperatureHumidityData);
return TemperatureHumidity.GetResp(timeSpan);
}
///
/// 上传换电订单
///
///
///
///
public UploadSwapOrderResp? SendUploadPowerChangeOrder(UploadSwapOrder uploadSwapOrder, TimeSpan? timeSpan = null)
{
this.UploadPowerChange.Req = uploadSwapOrder;
this.Publish(uploadSwapOrder);
return UploadPowerChange.GetResp(timeSpan);
}
public VehicleDataReportingResp? SendVehicleDataReporting(VehicleDataReporting vehicleDataReporting,
TimeSpan timeSpan)
{
this.VehicleData.Req = vehicleDataReporting;
this.Publish(vehicleDataReporting);
return VehicleData.GetResp(timeSpan);
}
public ChargeRecordUploadRes? SendChargeRecordUpLoad(ChargeRecordUpLoad req, TimeSpan? timeSpan = null)
{
this.ChargeRecordUpLoad.Req = req;
this.Publish(req);
return ChargeRecordUpLoad.GetResp(timeSpan);
}
public ChargeDevDataInfoRes? SendChargeDevDataInfo(ChargeDevDataInfo req, TimeSpan? timeSpan = null)
{
this.ChargeDevDataInfo.Req = req;
this.Publish(req);
return ChargeDevDataInfo.GetResp(timeSpan);
}
public PileEndChargeResp? SendPileEndCharge(PileEndCharge req, TimeSpan? timeSpan = null)
{
this.PileEndCharge.Req = req;
this.Publish(req);
return PileEndCharge.GetResp(timeSpan);
}
public PileChargeRealtimeResp? SendPileChargeRealtime(PileChargeRealtime req, TimeSpan? timeSpan = null)
{
this.PileChargeRealtime.Req = req;
this.Publish(req);
return PileChargeRealtime.GetResp(timeSpan);
}
public PileRealtimeResp? SendPileRealtime(PileRealtime req, TimeSpan? timeSpan = null)
{
this.PileRealtime.Req = req;
this.Publish(req);
return PileRealtime.GetResp(timeSpan);
}
#endregion
#region business func
///
///
///
///
/// 1: 自动; 2: 人工手动
/// 超时等待
///
public bool PublishChargeOrder(List orders, int op, TimeSpan? timeSpan = null)
{
ChargeOrder chargeOrder = orders[0];
ChargeOrder lastChargeOrder = orders[^1];
string swapOrderSn = chargeOrder.SwapOrderSn;
SwapOrder? swapOrder = SwapOrderRepository.QueryByClause(it => it.Sn == swapOrderSn);
ChargeRecordUpLoad req = new ChargeRecordUpLoad()
{
chrsn = chargeOrder.CloudChargeOrder,
son = swapOrder?.CloudSn,
bid = chargeOrder.BatteryNo,
st = chargeOrder.StartTime ?? DateTime.Now,
et = lastChargeOrder.EndTime ?? DateTime.Now,
ssoc = chargeOrder.StartSoc ?? 0,
esoc = lastChargeOrder.StopSoc ?? 0,
//ssoe = chargeOrder.soe
//esoe
dcce =0,
acce =0,
tp = 0,
pp = 0,
fp = 0,
vp = 0,
ct = 0,
cn = orders.Count,
sfs = op,
vin = swapOrder?.VehicleVin,
sfoc = 0,
};
foreach (ChargeOrder order in orders)
{
req.dcce += Convert.ToSingle(order.StopDcElec ?? 0 - order.StartDcElec ?? 0);
req.acce += Convert.ToSingle(order.StopAcElec ?? 0 - order.StartAcElec ?? 0);
req.tp += Convert.ToSingle(order.SharpElecCount);
req.pp += Convert.ToSingle(order.PeakElecCount);
req.fp += Convert.ToSingle(order.FlatElecCount);
req.vp += Convert.ToSingle(order.ValleyElecCount);
req.ct += ((order.EndTime ?? DateTime.Now).Subtract(order.StartTime ?? DateTime.Now)).Minutes;
}
this.SendChargeRecordUpLoad(req);
return true;
}
#endregion
}