using GpsCardGatewayPosition.Model.Config; using GpsCardGatewayPosition.Model.Enum; using GpsCardGatewayPosition.Model.IoT; using GpsCardGatewayPosition.Model.Templates; using GpsCardGatewayPosition.Service.MqProducer.Model; using GpsCardGatewayPosition.Service.Resolver.Property.Dto; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Security.Claims; using System.Text; using System.Threading.Tasks; using TelpoDataService.Util.Entities.GpsLocationHistory; namespace GpsCardGatewayPosition.Service.MqProducer { public class MqProcessLogic { private readonly MessageProducer _producer; private readonly ILogger _logger; private readonly ServiceAccessConfig _configServiceAccess; public MqProcessLogic(IOptions optConfigServiceAccess,MessageProducer producer, ILogger logger) { _configServiceAccess = optConfigServiceAccess.Value; _producer = producer; _logger = logger; } public async Task ProcessWxAlarmAsync(HisGpsAlarm alarm) { if (!_configServiceAccess.EnablePushWx) return; DateTime time = alarm.DeviceUtcTime.Value.AddHours(8); string timeString = time.ToString("yyyy-MM-dd HH:mm:ss"); List ls = new List(); ls.Add(new TopicModel() { Topic = MqTopic.Wx, Headers = _producer.CreateHeader(new Dictionary { {MqHeader.DataType,(int)MqDataType.AlarmInfo }, }) }); var messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now); var model = new { deviceId = alarm.DeviceId, imei = alarm.Serialno, alarmTypeId = alarm.TypeId, alarmDeviceName = alarm.DeviceName, alarmRemarks = alarm.Remarks, address = alarm.Address, }; await _producer.ProduceAsync(ls, new { //messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now), messageId, topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx, time = timeString, //data = new //{ // deviceId = alarm.DeviceId, // imei = alarm.Serialno, // alarmTypeId = alarm.TypeId, // alarmDeviceName = alarm.DeviceName, // alarmRemarks = alarm.Remarks, // address = alarm.Address, //} data = model }); #region 快速通道 Topic if (!_configServiceAccess.EnablePushFast) return; ls.ForEach(x => x.Topic = MqTopic.Fast); await ProcessFastTopicAsync(messageId, timeString, ls, model); #endregion } public async Task ProcessWxTemperatureAsync(string messageId, TemperatureInfoModel temp, LocationType type = default, MethodType method = default) { if (!_configServiceAccess.EnablePushWx) return; DateTime time = temp.TempTime ?? DateTime.Now; //取消测温30分钟有效性限制 //if (DateTime.Now.Subtract(time).TotalMinutes > 30) //不管哪种测温消息,超过30分钟上报的均不处理 //{ // _logger.LogError($"测温消息[{messageId}]30分钟超时,不处理"); // return; //} string timeString = time.ToString("yyyy-MM-dd HH:mm:ss"); List ls = new List(); // kafka数据类型 int mqDataType = (int)MqDataType.TemperatureInfo; //if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo; if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info; ls.Add(new TopicModel() { Topic = MqTopic.Wx, Headers = _producer.CreateHeader(new Dictionary { // {MqHeader.DataType,(int)MqDataType.TemperatureInfo }, {MqHeader.DataType,mqDataType }, }) }); var address = string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }); // 空地址 address设置为空 if (string.IsNullOrEmpty(temp.Address)) { address = string.Empty; } // LBS地址 address设置为空 if (type == LocationType.LBS) { address = string.Empty; } var lsData = new { deviceId = temp.DeviceId, imei = temp.Imei, alarmTypeId = (int)AlarmTypes.Temperature, alarmDeviceName = temp.DeviceName, alarmRemarks = temp.Temperature?.ToString("f1"), //address =string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }), // address = (type == LocationType.LBS || string.IsNullOrEmpty(temp.Address)) ? string.Empty : string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }), address, deviceKey = temp.DeviceKey }; await _producer.ProduceAsync(ls, new { messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now), topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx, time = timeString, data = lsData }); _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "| MqTopic.Wx-ls:" + JsonConvert.SerializeObject(lsData)); #region 第三方推送(杨雷新增) List ls2 = new List(); //int mqDataType = (int)MqDataType.TemperatureInfo; ////if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo; //if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info; ls2.Add(new TopicModel() { Topic = MqTopic.Third, Headers = _producer.CreateHeader(new Dictionary { //{MqHeader.DataType,(int)MqDataType.TemperatureInfo }, {MqHeader.DataType,mqDataType}, }) }); ls2.Add(new TopicModel() { Topic = MqTopic.Healthy, Headers = _producer.CreateHeader(new Dictionary { //{MqHeader.DataType,(int)MqDataType.TemperatureInfo }, {MqHeader.DataType,mqDataType}, }) }); await _producer.ProduceAsync(ls2, new { messageId = messageId, topic = string.Join(",", ls2.Select(e => e.Topic)), time = timeString, data = temp }); #endregion _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "|MqTopic.Third-ls2:" + JsonConvert.SerializeObject(temp)); #region 快速通道 Topic if (!_configServiceAccess.EnablePushFast) return; ls.ForEach(x => x.Topic = MqTopic.Fast); await ProcessFastTopicAsync(messageId, timeString, ls, lsData); ls2.ForEach(x => x.Topic = MqTopic.Fast); // 兼容已定属性 var ls2Data = new { deviceId = temp.DeviceId, deviceName = temp.DeviceName, tempId = temp.TempId, tempTime = temp.TempTime, imei = temp.Imei, temperature = temp.Temperature, province = temp.Province, city = temp.City, district = temp.District, address = temp.Address }; await ProcessFastTopicAsync(messageId, timeString, ls2, ls2Data); #endregion } public async Task ProcessAlarmSosAsync(string messageId, SoSTemplates model, string date) { if (!_configServiceAccess.EnablePushThird) return; List ls = new List(); ls.Add(new TopicModel() { Topic = MqTopic.Third, Headers = _producer.CreateHeader(new Dictionary { {MqHeader.DataType,(int)MqDataType.AlarmInfo }, {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm } }) }); await _producer.ProduceAsync(ls, new { messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now), topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third, time = date, data = model }); _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic))); #region 快速通道 Topic if (!_configServiceAccess.EnablePushFast) return; ls.ForEach(x => x.Topic = MqTopic.Fast); await ProcessFastTopicAsync(messageId, date, ls, model); #endregion /** * // 快速通道 //if (!_configServiceAccess.EnablePushFast) return; //List lsFast = new List //{ // new TopicModel() // { // Topic = MqTopic.Fast, // Headers = _producer.CreateHeader(new Dictionary // { // {MqHeader.DataType,(int)MqDataType.AlarmInfo }, // {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm } // }) // } //}; //_producer.ProduceAsync(lsFast, new //{ // messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now), // topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third, // time = date, // data = model //}); //_logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", lsFast.Select(e => e.Topic)) + "|"+JsonConvert.SerializeObject(model)); */ } private async Task ProcessFastTopicAsync(string messageId, string date, List lsFast, dynamic model) { // if (!_configServiceAccess.EnablePushFast) return; await _producer.ProduceAsync(lsFast, new { messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now), topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third, time = date, data = model }); string logStr = string.Format("设备{0},推送到{1}|{2}", model.imei, string.Join(", ", lsFast.Select(e => e.Topic)), JsonConvert.SerializeObject(model)); _logger.LogInformation(logStr); } public async Task ProcessFencePlusAsync(FenceLocationPlus model, string date, string messageId) { if (!_configServiceAccess.EnablePushFence) return; List ls = new() { new TopicModel() { Topic = MqTopic.Fence, Headers = _producer.CreateHeader(new Dictionary { {MqHeader.DataType,(int)MqDataType.PositionInfo }, }) } }; await _producer.ProduceAsync(ls, new { messageId = messageId, topic = MqTopic.Fence, time = date, data = model }); _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic))); } public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date) { List ls = new List(); if (_configServiceAccess.EnablePushThird) { ls.Add(new TopicModel() { Topic = MqTopic.Third, Headers = _producer.CreateHeader(new Dictionary { {MqHeader.DataType,(int)MqDataType.PositionInfo }, }) }); } if (_configServiceAccess.EnableLocationMonitor) { ls.Add(new TopicModel() { Topic = MqTopic.LocationMonitor, Headers = _producer.CreateHeader(new Dictionary { { MqHeader.DataType,(int)MqDataType.PositionInfo}, }) }); } if (ls.Count == 0) return; await _producer.ProduceAsync(ls, new { messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now), topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third, time = date, data = model }); _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic))); } /// /// 含有wifi列表 /// /// /// /// /// /// public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date, string wifiMacs) { List ls = new(); if (_configServiceAccess.EnablePushThird) { ls.Add(new TopicModel() { Topic = MqTopic.Third, Headers = _producer.CreateHeader(new Dictionary { {MqHeader.DataType,(int)MqDataType.PositionInfo }, }) }); } if (_configServiceAccess.EnableLocationMonitor) { ls.Add(new TopicModel() { Topic = MqTopic.LocationMonitor, Headers = _producer.CreateHeader(new Dictionary { { MqHeader.DataType,(int)MqDataType.PositionInfo}, }) }); } if (ls.Count == 0) return; await _producer.ProduceAsync(ls, new { messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now), topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third, time = date, data = new { model.imei, model.locationType, model.address, model.altitude, model.baiduLatitude, model.baiduLongitude, model.gaodeLatitude, model.gaodeLongitude, model.originalLatitude, model.originalLongitude, model.postcode, model.hashParam, model.radius, model.province, model.city, model.district, wifiMacs } }); _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic))); } #region topics.storage.iot.postion public async Task ProcessIotPositionAsync(string messageId, string imei, // PackageMsgModel msg object msg ) { List ls = new List { new TopicModel() { Topic = MqTopic.IotPosition, Headers = _producer.CreateHeader(new Dictionary { {MqHeader.DataType,(int)MqDataType.IotPositionInfo } }) } }; await _producer.ProduceAsync(ls, new { messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now), topic = string.Join(",", ls.Select(e => e.Topic)), time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), data = msg }); _logger.LogInformation($"设备{imei},推送IOT定位原文消息 {messageId} 到" + string.Join(", ", ls.Select(e => e.Topic))); } #endregion } }