|
- using GpsCardGatewayPosition.Model.Config;
- using GpsCardGatewayPosition.Model.Enum;
- using GpsCardGatewayPosition.Model.IoT;
- using GpsCardGatewayPosition.Model.Templates;
- using GpsCardGatewayPosition.Service.MqProducer.Model;
- using GpsCardGatewayPosition.Service.Resolver.Property.Dto;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Security.Claims;
- using System.Text;
- using System.Threading.Tasks;
- using TelpoDataService.Util.Entities.GpsLocationHistory;
-
- namespace GpsCardGatewayPosition.Service.MqProducer
- {
- public class MqProcessLogic
- {
- private readonly MessageProducer _producer;
- private readonly ILogger<MqProcessLogic> _logger;
- private readonly ServiceAccessConfig _configServiceAccess;
-
- public MqProcessLogic(IOptions<ServiceAccessConfig> optConfigServiceAccess,MessageProducer producer, ILogger<MqProcessLogic> logger)
- {
- _configServiceAccess = optConfigServiceAccess.Value;
- _producer = producer;
- _logger = logger;
- }
- public async Task ProcessWxAlarmAsync(HisGpsAlarm alarm)
- {
- if (!_configServiceAccess.EnablePushWx) return;
-
- DateTime time = alarm.DeviceUtcTime.Value.AddHours(8);
- string timeString = time.ToString("yyyy-MM-dd HH:mm:ss");
- List<TopicModel> ls = new List<TopicModel>();
-
- ls.Add(new TopicModel()
- {
- Topic = MqTopic.Wx,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- {MqHeader.DataType,(int)MqDataType.AlarmInfo },
- })
- });
- var messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now);
- var model = new
- {
- deviceId = alarm.DeviceId,
- imei = alarm.Serialno,
- alarmTypeId = alarm.TypeId,
- alarmDeviceName = alarm.DeviceName,
- alarmRemarks = alarm.Remarks,
- address = alarm.Address,
- };
- await _producer.ProduceAsync(ls, new
- {
- //messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
- messageId,
- topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
- time = timeString,
- //data = new
- //{
- // deviceId = alarm.DeviceId,
- // imei = alarm.Serialno,
- // alarmTypeId = alarm.TypeId,
- // alarmDeviceName = alarm.DeviceName,
- // alarmRemarks = alarm.Remarks,
- // address = alarm.Address,
- //}
- data = model
- });
- #region 快速通道 Topic
- if (!_configServiceAccess.EnablePushFast) return;
- ls.ForEach(x => x.Topic = MqTopic.Fast);
- await ProcessFastTopicAsync(messageId, timeString, ls, model);
- #endregion
- }
- public async Task ProcessWxTemperatureAsync(string messageId, TemperatureInfoModel temp, LocationType type = default, MethodType method = default)
- {
- if (!_configServiceAccess.EnablePushWx) return;
-
- DateTime time = temp.TempTime ?? DateTime.Now;
- //取消测温30分钟有效性限制
- //if (DateTime.Now.Subtract(time).TotalMinutes > 30) //不管哪种测温消息,超过30分钟上报的均不处理
- //{
- // _logger.LogError($"测温消息[{messageId}]30分钟超时,不处理");
- // return;
- //}
-
- string timeString = time.ToString("yyyy-MM-dd HH:mm:ss");
- List<TopicModel> ls = new List<TopicModel>();
- // kafka数据类型
- int mqDataType = (int)MqDataType.TemperatureInfo;
- //if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo;
- if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info;
-
- ls.Add(new TopicModel()
- {
- Topic = MqTopic.Wx,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- // {MqHeader.DataType,(int)MqDataType.TemperatureInfo },
- {MqHeader.DataType,mqDataType },
- })
- });
-
- var address = string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address });
-
- // 空地址 address设置为空
- if (string.IsNullOrEmpty(temp.Address))
- {
- address = string.Empty;
- }
- // LBS地址 address设置为空
- if (type == LocationType.LBS)
- {
- address = string.Empty;
- }
-
- var lsData = new
- {
- deviceId = temp.DeviceId,
- imei = temp.Imei,
- alarmTypeId = (int)AlarmTypes.Temperature,
- alarmDeviceName = temp.DeviceName,
- alarmRemarks = temp.Temperature?.ToString("f1"),
- //address =string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }),
- // address = (type == LocationType.LBS || string.IsNullOrEmpty(temp.Address)) ? string.Empty : string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }),
- address,
- deviceKey = temp.DeviceKey
- };
-
- await _producer.ProduceAsync(ls, new
- {
- messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
- topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
- time = timeString,
- data = lsData
- });
- _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "| MqTopic.Wx-ls:" + JsonConvert.SerializeObject(lsData));
- #region 第三方推送(杨雷新增)
- List<TopicModel> ls2 = new List<TopicModel>();
- //int mqDataType = (int)MqDataType.TemperatureInfo;
- ////if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo;
- //if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info;
- ls2.Add(new TopicModel()
- {
- Topic = MqTopic.Third,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- //{MqHeader.DataType,(int)MqDataType.TemperatureInfo },
- {MqHeader.DataType,mqDataType},
- })
- });
- ls2.Add(new TopicModel()
- {
- Topic = MqTopic.Healthy,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- //{MqHeader.DataType,(int)MqDataType.TemperatureInfo },
- {MqHeader.DataType,mqDataType},
- })
- });
- await _producer.ProduceAsync(ls2, new
- {
- messageId = messageId,
- topic = string.Join(",", ls2.Select(e => e.Topic)),
- time = timeString,
- data = temp
- });
- #endregion
- _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "|MqTopic.Third-ls2:" + JsonConvert.SerializeObject(temp));
-
- #region 快速通道 Topic
- if (!_configServiceAccess.EnablePushFast) return;
- ls.ForEach(x => x.Topic = MqTopic.Fast);
- await ProcessFastTopicAsync(messageId, timeString, ls, lsData);
-
- ls2.ForEach(x => x.Topic = MqTopic.Fast);
- // 兼容已定属性
- var ls2Data = new
- {
- deviceId = temp.DeviceId,
- deviceName = temp.DeviceName,
- tempId = temp.TempId,
- tempTime = temp.TempTime,
- imei = temp.Imei,
- temperature = temp.Temperature,
- province = temp.Province,
- city = temp.City,
- district = temp.District,
- address = temp.Address
- };
- await ProcessFastTopicAsync(messageId, timeString, ls2, ls2Data);
- #endregion
- }
- public async Task ProcessAlarmSosAsync(string messageId, SoSTemplates model, string date)
- {
- if (!_configServiceAccess.EnablePushThird) return;
-
- List<TopicModel> ls = new List<TopicModel>();
- ls.Add(new TopicModel()
- {
- Topic = MqTopic.Third,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- {MqHeader.DataType,(int)MqDataType.AlarmInfo },
- {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm }
- })
- });
-
- await _producer.ProduceAsync(ls, new
- {
- messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
- topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
- time = date,
- data = model
- });
-
- _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
-
- #region 快速通道 Topic
- if (!_configServiceAccess.EnablePushFast) return;
- ls.ForEach(x => x.Topic = MqTopic.Fast);
- await ProcessFastTopicAsync(messageId, date, ls, model);
- #endregion
- /**
- * // 快速通道
- //if (!_configServiceAccess.EnablePushFast) return;
-
- //List<TopicModel> lsFast = new List<TopicModel>
- //{
- // new TopicModel()
- // {
- // Topic = MqTopic.Fast,
- // Headers = _producer.CreateHeader(new Dictionary<string, int>
- // {
- // {MqHeader.DataType,(int)MqDataType.AlarmInfo },
- // {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm }
- // })
- // }
- //};
-
- //_producer.ProduceAsync(lsFast, new
- //{
- // messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
- // topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
- // time = date,
- // data = model
- //});
-
- //_logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", lsFast.Select(e => e.Topic)) + "|"+JsonConvert.SerializeObject(model));
- */
-
- }
- private async Task ProcessFastTopicAsync(string messageId, string date, List<TopicModel> lsFast, dynamic model)
- {
- // if (!_configServiceAccess.EnablePushFast) return;
- await _producer.ProduceAsync(lsFast, new
- {
- messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
- topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
- time = date,
- data = model
- });
-
- string logStr = string.Format("设备{0},推送到{1}|{2}", model.imei, string.Join(", ", lsFast.Select(e => e.Topic)), JsonConvert.SerializeObject(model));
- _logger.LogInformation(logStr);
-
-
-
-
- }
-
- public async Task ProcessFencePlusAsync(FenceLocationPlus model, string date, string messageId)
- {
- if (!_configServiceAccess.EnablePushFence) return;
-
- List<TopicModel> ls = new()
- {
- new TopicModel()
- {
- Topic = MqTopic.Fence,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- {MqHeader.DataType,(int)MqDataType.PositionInfo },
- })
- }
- };
-
- await _producer.ProduceAsync(ls, new
- {
- messageId = messageId,
- topic = MqTopic.Fence,
- time = date,
- data = model
- });
-
- _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
- }
- public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date)
- {
- List<TopicModel> ls = new List<TopicModel>();
- if (_configServiceAccess.EnablePushThird)
- {
- ls.Add(new TopicModel()
- {
- Topic = MqTopic.Third,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- {MqHeader.DataType,(int)MqDataType.PositionInfo },
- })
- });
- }
-
- if (_configServiceAccess.EnableLocationMonitor)
- {
- ls.Add(new TopicModel()
- {
- Topic = MqTopic.LocationMonitor,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- { MqHeader.DataType,(int)MqDataType.PositionInfo},
- })
- });
- }
-
- if (ls.Count == 0) return;
- await _producer.ProduceAsync(ls, new
- {
- messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
- topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
- time = date,
- data = model
- });
-
- _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
- }
- /// <summary>
- /// 含有wifi列表
- /// </summary>
- /// <param name="messageId"></param>
- /// <param name="model"></param>
- /// <param name="date"></param>
- /// <param name="wifiMacs"></param>
- /// <returns></returns>
- public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date, string wifiMacs)
- {
- List<TopicModel> ls = new();
- if (_configServiceAccess.EnablePushThird)
- {
- ls.Add(new TopicModel()
- {
- Topic = MqTopic.Third,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- {MqHeader.DataType,(int)MqDataType.PositionInfo },
- })
- });
- }
-
- if (_configServiceAccess.EnableLocationMonitor)
- {
- ls.Add(new TopicModel()
- {
- Topic = MqTopic.LocationMonitor,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- { MqHeader.DataType,(int)MqDataType.PositionInfo},
- })
- });
- }
-
- if (ls.Count == 0) return;
- await _producer.ProduceAsync(ls, new
- {
- messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
- topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
- time = date,
- data = new
- {
- model.imei,
- model.locationType,
- model.address,
- model.altitude,
- model.baiduLatitude,
- model.baiduLongitude,
- model.gaodeLatitude,
- model.gaodeLongitude,
- model.originalLatitude,
- model.originalLongitude,
- model.postcode,
- model.hashParam,
- model.radius,
- model.province,
- model.city,
- model.district,
- wifiMacs
- }
- });
-
- _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
- }
-
- #region topics.storage.iot.postion
- public async Task ProcessIotPositionAsync(string messageId, string imei,
- // PackageMsgModel msg
- object msg
- )
- {
-
- List<TopicModel> ls = new List<TopicModel>
- {
- new TopicModel()
- {
- Topic = MqTopic.IotPosition,
- Headers = _producer.CreateHeader(new Dictionary<string, int>
- {
- {MqHeader.DataType,(int)MqDataType.IotPositionInfo }
- })
- }
- };
- await _producer.ProduceAsync(ls, new
- {
- messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
- topic = string.Join(",", ls.Select(e => e.Topic)),
- time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
- data = msg
- });
- _logger.LogInformation($"设备{imei},推送IOT定位原文消息 {messageId} 到" + string.Join(", ", ls.Select(e => e.Topic)));
-
- }
-
- #endregion
- }
- }
|