You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MqProcessLogic.cs 18KB

11 kuukautta sitten
10 kuukautta sitten
11 kuukautta sitten
11 kuukautta sitten
11 kuukautta sitten
11 kuukautta sitten
11 kuukautta sitten
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. using GpsCardGatewayPosition.Model.Config;
  2. using GpsCardGatewayPosition.Model.Enum;
  3. using GpsCardGatewayPosition.Model.IoT;
  4. using GpsCardGatewayPosition.Model.Templates;
  5. using GpsCardGatewayPosition.Service.MqProducer.Model;
  6. using GpsCardGatewayPosition.Service.Resolver.Property.Dto;
  7. using Microsoft.Extensions.Logging;
  8. using Microsoft.Extensions.Options;
  9. using Newtonsoft.Json;
  10. using System;
  11. using System.Collections.Generic;
  12. using System.Linq;
  13. using System.Security.Claims;
  14. using System.Text;
  15. using System.Threading.Tasks;
  16. using TelpoDataService.Util.Entities.GpsLocationHistory;
  17. namespace GpsCardGatewayPosition.Service.MqProducer
  18. {
  19. public class MqProcessLogic
  20. {
  21. private readonly MessageProducer _producer;
  22. private readonly ILogger<MqProcessLogic> _logger;
  23. private readonly ServiceAccessConfig _configServiceAccess;
  24. public MqProcessLogic(IOptions<ServiceAccessConfig> optConfigServiceAccess,MessageProducer producer, ILogger<MqProcessLogic> logger)
  25. {
  26. _configServiceAccess = optConfigServiceAccess.Value;
  27. _producer = producer;
  28. _logger = logger;
  29. }
  30. public async Task ProcessWxAlarmAsync(HisGpsAlarm alarm)
  31. {
  32. if (!_configServiceAccess.EnablePushWx) return;
  33. DateTime time = alarm.DeviceUtcTime.Value.AddHours(8);
  34. string timeString = time.ToString("yyyy-MM-dd HH:mm:ss");
  35. List<TopicModel> ls = new List<TopicModel>();
  36. ls.Add(new TopicModel()
  37. {
  38. Topic = MqTopic.Wx,
  39. Headers = _producer.CreateHeader(new Dictionary<string, int>
  40. {
  41. {MqHeader.DataType,(int)MqDataType.AlarmInfo },
  42. })
  43. });
  44. var messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now);
  45. var model = new
  46. {
  47. deviceId = alarm.DeviceId,
  48. imei = alarm.Serialno,
  49. alarmTypeId = alarm.TypeId,
  50. alarmDeviceName = alarm.DeviceName,
  51. alarmRemarks = alarm.Remarks,
  52. address = alarm.Address,
  53. };
  54. await _producer.ProduceAsync(ls, new
  55. {
  56. //messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  57. messageId,
  58. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
  59. time = timeString,
  60. //data = new
  61. //{
  62. // deviceId = alarm.DeviceId,
  63. // imei = alarm.Serialno,
  64. // alarmTypeId = alarm.TypeId,
  65. // alarmDeviceName = alarm.DeviceName,
  66. // alarmRemarks = alarm.Remarks,
  67. // address = alarm.Address,
  68. //}
  69. data = model
  70. });
  71. #region 快速通道 Topic
  72. if (!_configServiceAccess.EnablePushFast) return;
  73. ls.ForEach(x => x.Topic = MqTopic.Fast);
  74. await ProcessFastTopicAsync(messageId, timeString, ls, model);
  75. #endregion
  76. }
  77. public async Task ProcessWxTemperatureAsync(string messageId, TemperatureInfoModel temp, LocationType type = default, MethodType method = default)
  78. {
  79. if (!_configServiceAccess.EnablePushWx) return;
  80. // 小于34过滤 或者 大于41过滤
  81. if (temp.Temperature < 34 || temp.Temperature > 41) return;
  82. DateTime time = temp.TempTime ?? DateTime.Now;
  83. //取消测温30分钟有效性限制
  84. //if (DateTime.Now.Subtract(time).TotalMinutes > 30) //不管哪种测温消息,超过30分钟上报的均不处理
  85. //{
  86. // _logger.LogError($"测温消息[{messageId}]30分钟超时,不处理");
  87. // return;
  88. //}
  89. string timeString = time.ToString("yyyy-MM-dd HH:mm:ss");
  90. List<TopicModel> ls = new List<TopicModel>();
  91. // kafka数据类型
  92. int mqDataType = (int)MqDataType.TemperatureInfo;
  93. //if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo;
  94. if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info;
  95. ls.Add(new TopicModel()
  96. {
  97. Topic = MqTopic.Wx,
  98. Headers = _producer.CreateHeader(new Dictionary<string, int>
  99. {
  100. // {MqHeader.DataType,(int)MqDataType.TemperatureInfo },
  101. {MqHeader.DataType,mqDataType },
  102. })
  103. });
  104. var address = string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address });
  105. // 空地址 address设置为空
  106. if (string.IsNullOrEmpty(temp.Address))
  107. {
  108. address = string.Empty;
  109. }
  110. // LBS地址 address设置为空
  111. if (type == LocationType.LBS)
  112. {
  113. address = string.Empty;
  114. }
  115. var lsData = new
  116. {
  117. deviceId = temp.DeviceId,
  118. imei = temp.Imei,
  119. alarmTypeId = (int)AlarmTypes.Temperature,
  120. alarmDeviceName = temp.DeviceName,
  121. alarmRemarks = temp.Temperature?.ToString("f1"),
  122. //address =string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }),
  123. // address = (type == LocationType.LBS || string.IsNullOrEmpty(temp.Address)) ? string.Empty : string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }),
  124. address,
  125. deviceKey = temp.DeviceKey
  126. };
  127. await _producer.ProduceAsync(ls, new
  128. {
  129. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  130. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
  131. time = timeString,
  132. data = lsData
  133. });
  134. _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "| MqTopic.Wx-ls:" + JsonConvert.SerializeObject(lsData));
  135. #region 第三方推送(杨雷新增)
  136. List<TopicModel> ls2 = new List<TopicModel>();
  137. //int mqDataType = (int)MqDataType.TemperatureInfo;
  138. ////if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo;
  139. //if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info;
  140. ls2.Add(new TopicModel()
  141. {
  142. Topic = MqTopic.Third,
  143. Headers = _producer.CreateHeader(new Dictionary<string, int>
  144. {
  145. //{MqHeader.DataType,(int)MqDataType.TemperatureInfo },
  146. {MqHeader.DataType,mqDataType},
  147. })
  148. });
  149. ls2.Add(new TopicModel()
  150. {
  151. Topic = MqTopic.Healthy,
  152. Headers = _producer.CreateHeader(new Dictionary<string, int>
  153. {
  154. //{MqHeader.DataType,(int)MqDataType.TemperatureInfo },
  155. {MqHeader.DataType,mqDataType},
  156. })
  157. });
  158. await _producer.ProduceAsync(ls2, new
  159. {
  160. messageId = messageId,
  161. topic = string.Join(",", ls2.Select(e => e.Topic)),
  162. time = timeString,
  163. data = temp
  164. });
  165. #endregion
  166. _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "|MqTopic.Third-ls2:" + JsonConvert.SerializeObject(temp));
  167. #region 快速通道 Topic
  168. if (!_configServiceAccess.EnablePushFast) return;
  169. ls.ForEach(x => x.Topic = MqTopic.Fast);
  170. await ProcessFastTopicAsync(messageId, timeString, ls, lsData);
  171. ls2.ForEach(x => x.Topic = MqTopic.Fast);
  172. // 兼容已定属性
  173. var ls2Data = new
  174. {
  175. deviceId = temp.DeviceId,
  176. deviceName = temp.DeviceName,
  177. tempId = temp.TempId,
  178. tempTime = temp.TempTime,
  179. imei = temp.Imei,
  180. temperature = temp.Temperature,
  181. province = temp.Province,
  182. city = temp.City,
  183. district = temp.District,
  184. address = temp.Address
  185. };
  186. await ProcessFastTopicAsync(messageId, timeString, ls2, ls2Data);
  187. #endregion
  188. }
  189. public async Task ProcessAlarmSosAsync(string messageId, SoSTemplates model, string date)
  190. {
  191. if (!_configServiceAccess.EnablePushThird) return;
  192. List<TopicModel> ls = new List<TopicModel>();
  193. ls.Add(new TopicModel()
  194. {
  195. Topic = MqTopic.Third,
  196. Headers = _producer.CreateHeader(new Dictionary<string, int>
  197. {
  198. {MqHeader.DataType,(int)MqDataType.AlarmInfo },
  199. {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm }
  200. })
  201. });
  202. await _producer.ProduceAsync(ls, new
  203. {
  204. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  205. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  206. time = date,
  207. data = model
  208. });
  209. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  210. #region 快速通道 Topic
  211. if (!_configServiceAccess.EnablePushFast) return;
  212. ls.ForEach(x => x.Topic = MqTopic.Fast);
  213. await ProcessFastTopicAsync(messageId, date, ls, model);
  214. #endregion
  215. /**
  216. * // 快速通道
  217. //if (!_configServiceAccess.EnablePushFast) return;
  218. //List<TopicModel> lsFast = new List<TopicModel>
  219. //{
  220. // new TopicModel()
  221. // {
  222. // Topic = MqTopic.Fast,
  223. // Headers = _producer.CreateHeader(new Dictionary<string, int>
  224. // {
  225. // {MqHeader.DataType,(int)MqDataType.AlarmInfo },
  226. // {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm }
  227. // })
  228. // }
  229. //};
  230. //_producer.ProduceAsync(lsFast, new
  231. //{
  232. // messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  233. // topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  234. // time = date,
  235. // data = model
  236. //});
  237. //_logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", lsFast.Select(e => e.Topic)) + "|"+JsonConvert.SerializeObject(model));
  238. */
  239. }
  240. private async Task ProcessFastTopicAsync(string messageId, string date, List<TopicModel> lsFast, dynamic model)
  241. {
  242. // if (!_configServiceAccess.EnablePushFast) return;
  243. await _producer.ProduceAsync(lsFast, new
  244. {
  245. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  246. topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  247. time = date,
  248. data = model
  249. });
  250. string logStr = string.Format("设备{0},推送到{1}|{2}", model.imei, string.Join(", ", lsFast.Select(e => e.Topic)), JsonConvert.SerializeObject(model));
  251. _logger.LogInformation(logStr);
  252. }
  253. public async Task ProcessFencePlusAsync(FenceLocationPlus model, string date, string messageId)
  254. {
  255. if (!_configServiceAccess.EnablePushFence) return;
  256. List<TopicModel> ls = new()
  257. {
  258. new TopicModel()
  259. {
  260. Topic = MqTopic.Fence,
  261. Headers = _producer.CreateHeader(new Dictionary<string, int>
  262. {
  263. {MqHeader.DataType,(int)MqDataType.PositionInfo },
  264. })
  265. }
  266. };
  267. await _producer.ProduceAsync(ls, new
  268. {
  269. messageId = messageId,
  270. topic = MqTopic.Fence,
  271. time = date,
  272. data = model
  273. });
  274. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  275. }
  276. //public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date)
  277. //{
  278. // List<TopicModel> ls = new List<TopicModel>();
  279. // if (_configServiceAccess.EnablePushThird)
  280. // {
  281. // ls.Add(new TopicModel()
  282. // {
  283. // Topic = MqTopic.Third,
  284. // Headers = _producer.CreateHeader(new Dictionary<string, int>
  285. // {
  286. // {MqHeader.DataType,(int)MqDataType.PositionInfo },
  287. // })
  288. // });
  289. // }
  290. // if (_configServiceAccess.EnableLocationMonitor)
  291. // {
  292. // ls.Add(new TopicModel()
  293. // {
  294. // Topic = MqTopic.LocationMonitor,
  295. // Headers = _producer.CreateHeader(new Dictionary<string, int>
  296. // {
  297. // { MqHeader.DataType,(int)MqDataType.PositionInfo},
  298. // })
  299. // });
  300. // }
  301. // if (ls.Count == 0) return;
  302. // await _producer.ProduceAsync(ls, new
  303. // {
  304. // messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  305. // topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  306. // time = date,
  307. // data = model
  308. // });
  309. // _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  310. //}
  311. /// <summary>
  312. /// 含有wifi列表
  313. /// </summary>
  314. /// <param name="messageId"></param>
  315. /// <param name="model"></param>
  316. /// <param name="date"></param>
  317. /// <param name="wifiMacs"></param>
  318. /// <returns></returns>
  319. public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date, string wifiMacs = "")
  320. {
  321. List<TopicModel> ls = new();
  322. //if (_configServiceAccess.EnablePushThird)
  323. //{
  324. // ls.Add(new TopicModel()
  325. // {
  326. // Topic = MqTopic.Third,
  327. // Headers = _producer.CreateHeader(new Dictionary<string, int>
  328. // {
  329. // {MqHeader.DataType,(int)MqDataType.PositionInfo },
  330. // })
  331. // });
  332. //}
  333. if (_configServiceAccess.EnablePushPosition)
  334. {
  335. ls.Add(new TopicModel()
  336. {
  337. Topic = MqTopic.PushPosition,
  338. Headers = _producer.CreateHeader(new Dictionary<string, int>
  339. {
  340. {MqHeader.DataType,(int)MqDataType.PositionInfo },
  341. })
  342. });
  343. }
  344. if (_configServiceAccess.EnableLocationMonitor)
  345. {
  346. ls.Add(new TopicModel()
  347. {
  348. Topic = MqTopic.LocationMonitor,
  349. Headers = _producer.CreateHeader(new Dictionary<string, int>
  350. {
  351. { MqHeader.DataType,(int)MqDataType.PositionInfo},
  352. })
  353. });
  354. }
  355. if (ls.Count == 0) return;
  356. await _producer.ProduceAsync(ls, new
  357. {
  358. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  359. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  360. time = date,
  361. data = new
  362. {
  363. model.imei,
  364. model.locationType,
  365. model.address,
  366. model.altitude,
  367. model.baiduLatitude,
  368. model.baiduLongitude,
  369. model.gaodeLatitude,
  370. model.gaodeLongitude,
  371. model.originalLatitude,
  372. model.originalLongitude,
  373. model.postcode,
  374. model.hashParam,
  375. model.radius,
  376. model.province,
  377. model.city,
  378. model.district,
  379. wifiMacs
  380. }
  381. });
  382. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  383. }
  384. #region topics.storage.iot.postion
  385. public async Task ProcessIotPositionAsync(string messageId, string imei,
  386. // PackageMsgModel msg
  387. object msg
  388. )
  389. {
  390. List<TopicModel> ls = new List<TopicModel>
  391. {
  392. new TopicModel()
  393. {
  394. Topic = MqTopic.IotPosition,
  395. Headers = _producer.CreateHeader(new Dictionary<string, int>
  396. {
  397. {MqHeader.DataType,(int)MqDataType.IotPositionInfo }
  398. })
  399. }
  400. };
  401. await _producer.ProduceAsync(ls, new
  402. {
  403. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  404. topic = string.Join(",", ls.Select(e => e.Topic)),
  405. time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
  406. data = msg
  407. });
  408. _logger.LogInformation($"设备{imei},推送IOT定位原文消息 {messageId} 到" + string.Join(", ", ls.Select(e => e.Topic)));
  409. }
  410. #endregion
  411. }
  412. }