You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

454 line
18KB

  1. using GpsCardGatewayPosition.Model.Config;
  2. using GpsCardGatewayPosition.Model.Enum;
  3. using GpsCardGatewayPosition.Model.IoT;
  4. using GpsCardGatewayPosition.Model.Templates;
  5. using GpsCardGatewayPosition.Service.MqProducer.Model;
  6. using GpsCardGatewayPosition.Service.Resolver.Property.Dto;
  7. using Microsoft.Extensions.Logging;
  8. using Microsoft.Extensions.Options;
  9. using Newtonsoft.Json;
  10. using System;
  11. using System.Collections.Generic;
  12. using System.Linq;
  13. using System.Security.Claims;
  14. using System.Text;
  15. using System.Threading.Tasks;
  16. using TelpoDataService.Util.Entities.GpsLocationHistory;
  17. namespace GpsCardGatewayPosition.Service.MqProducer
  18. {
  19. public class MqProcessLogic
  20. {
  21. private readonly MessageProducer _producer;
  22. private readonly ILogger<MqProcessLogic> _logger;
  23. private readonly ServiceAccessConfig _configServiceAccess;
  24. public MqProcessLogic(IOptions<ServiceAccessConfig> optConfigServiceAccess,MessageProducer producer, ILogger<MqProcessLogic> logger)
  25. {
  26. _configServiceAccess = optConfigServiceAccess.Value;
  27. _producer = producer;
  28. _logger = logger;
  29. }
  30. public async Task ProcessWxAlarmAsync(HisGpsAlarm alarm)
  31. {
  32. if (!_configServiceAccess.EnablePushWx) return;
  33. DateTime time = alarm.DeviceUtcTime.Value.AddHours(8);
  34. string timeString = time.ToString("yyyy-MM-dd HH:mm:ss");
  35. List<TopicModel> ls = new List<TopicModel>();
  36. ls.Add(new TopicModel()
  37. {
  38. Topic = MqTopic.Wx,
  39. Headers = _producer.CreateHeader(new Dictionary<string, int>
  40. {
  41. {MqHeader.DataType,(int)MqDataType.AlarmInfo },
  42. })
  43. });
  44. var messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now);
  45. var model = new
  46. {
  47. deviceId = alarm.DeviceId,
  48. imei = alarm.Serialno,
  49. alarmTypeId = alarm.TypeId,
  50. alarmDeviceName = alarm.DeviceName,
  51. alarmRemarks = alarm.Remarks,
  52. address = alarm.Address,
  53. };
  54. await _producer.ProduceAsync(ls, new
  55. {
  56. //messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  57. messageId,
  58. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
  59. time = timeString,
  60. //data = new
  61. //{
  62. // deviceId = alarm.DeviceId,
  63. // imei = alarm.Serialno,
  64. // alarmTypeId = alarm.TypeId,
  65. // alarmDeviceName = alarm.DeviceName,
  66. // alarmRemarks = alarm.Remarks,
  67. // address = alarm.Address,
  68. //}
  69. data = model
  70. });
  71. #region 快速通道 Topic
  72. if (!_configServiceAccess.EnablePushFast) return;
  73. ls.ForEach(x => x.Topic = MqTopic.Fast);
  74. await ProcessFastTopicAsync(messageId, timeString, ls, model);
  75. #endregion
  76. }
  77. public async Task ProcessWxTemperatureAsync(string messageId, TemperatureInfoModel temp, LocationType type = default, MethodType method = default)
  78. {
  79. if (!_configServiceAccess.EnablePushWx) return;
  80. // 小于34过滤 或者 大于41过滤
  81. if (temp.Temperature < 34 || temp.Temperature > 41) return;
  82. DateTime time = temp.TempTime ?? DateTime.Now;
  83. //取消测温30分钟有效性限制
  84. //if (DateTime.Now.Subtract(time).TotalMinutes > 30) //不管哪种测温消息,超过30分钟上报的均不处理
  85. //{
  86. // _logger.LogError($"测温消息[{messageId}]30分钟超时,不处理");
  87. // return;
  88. //}
  89. string timeString = time.ToString("yyyy-MM-dd HH:mm:ss");
  90. List<TopicModel> ls = new List<TopicModel>();
  91. // kafka数据类型
  92. int mqDataType = (int)MqDataType.TemperatureInfo;
  93. //if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo;
  94. if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info;
  95. ls.Add(new TopicModel()
  96. {
  97. Topic = MqTopic.Wx,
  98. Headers = _producer.CreateHeader(new Dictionary<string, int>
  99. {
  100. // {MqHeader.DataType,(int)MqDataType.TemperatureInfo },
  101. {MqHeader.DataType,mqDataType },
  102. })
  103. });
  104. var address = string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address });
  105. // 空地址 address设置为空
  106. if (string.IsNullOrEmpty(temp.Address))
  107. {
  108. address = string.Empty;
  109. }
  110. // LBS地址 address设置为空
  111. if (type == LocationType.LBS)
  112. {
  113. address = string.Empty;
  114. }
  115. var lsData = new
  116. {
  117. deviceId = temp.DeviceId,
  118. imei = temp.Imei,
  119. alarmTypeId = (int)AlarmTypes.Temperature,
  120. alarmDeviceName = temp.DeviceName,
  121. alarmRemarks = temp.Temperature?.ToString("f1"),
  122. //address =string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }),
  123. // address = (type == LocationType.LBS || string.IsNullOrEmpty(temp.Address)) ? string.Empty : string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }),
  124. address,
  125. deviceKey = temp.DeviceKey
  126. };
  127. await _producer.ProduceAsync(ls, new
  128. {
  129. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  130. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
  131. time = timeString,
  132. data = lsData
  133. });
  134. _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "| MqTopic.Wx-ls:" + JsonConvert.SerializeObject(lsData));
  135. #region 第三方推送(杨雷新增)
  136. List<TopicModel> ls2 = new List<TopicModel>();
  137. //int mqDataType = (int)MqDataType.TemperatureInfo;
  138. ////if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo;
  139. //if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info;
  140. ls2.Add(new TopicModel()
  141. {
  142. Topic = MqTopic.Third,
  143. Headers = _producer.CreateHeader(new Dictionary<string, int>
  144. {
  145. //{MqHeader.DataType,(int)MqDataType.TemperatureInfo },
  146. {MqHeader.DataType,mqDataType},
  147. })
  148. });
  149. ls2.Add(new TopicModel()
  150. {
  151. Topic = MqTopic.Healthy,
  152. Headers = _producer.CreateHeader(new Dictionary<string, int>
  153. {
  154. //{MqHeader.DataType,(int)MqDataType.TemperatureInfo },
  155. {MqHeader.DataType,mqDataType},
  156. })
  157. });
  158. await _producer.ProduceAsync(ls2, new
  159. {
  160. messageId = messageId,
  161. topic = string.Join(",", ls2.Select(e => e.Topic)),
  162. time = timeString,
  163. data = temp
  164. });
  165. #endregion
  166. _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "|MqTopic.Third-ls2:" + JsonConvert.SerializeObject(temp));
  167. #region 快速通道 Topic
  168. if (!_configServiceAccess.EnablePushFast) return;
  169. ls.ForEach(x => x.Topic = MqTopic.Fast);
  170. await ProcessFastTopicAsync(messageId, timeString, ls, lsData);
  171. ls2.ForEach(x => x.Topic = MqTopic.Fast);
  172. // 兼容已定属性
  173. var ls2Data = new
  174. {
  175. deviceId = temp.DeviceId,
  176. deviceName = temp.DeviceName,
  177. tempId = temp.TempId,
  178. tempTime = temp.TempTime,
  179. imei = temp.Imei,
  180. temperature = temp.Temperature,
  181. province = temp.Province,
  182. city = temp.City,
  183. district = temp.District,
  184. address = temp.Address
  185. };
  186. await ProcessFastTopicAsync(messageId, timeString, ls2, ls2Data);
  187. #endregion
  188. }
  189. public async Task ProcessAlarmSosAsync(string messageId, SoSTemplates model, string date)
  190. {
  191. if (!_configServiceAccess.EnablePushThird) return;
  192. List<TopicModel> ls = new List<TopicModel>();
  193. ls.Add(new TopicModel()
  194. {
  195. Topic = MqTopic.Third,
  196. Headers = _producer.CreateHeader(new Dictionary<string, int>
  197. {
  198. {MqHeader.DataType,(int)MqDataType.AlarmInfo },
  199. {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm }
  200. })
  201. });
  202. await _producer.ProduceAsync(ls, new
  203. {
  204. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  205. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  206. time = date,
  207. data = model
  208. });
  209. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  210. #region 快速通道 Topic
  211. if (!_configServiceAccess.EnablePushFast) return;
  212. ls.ForEach(x => x.Topic = MqTopic.Fast);
  213. await ProcessFastTopicAsync(messageId, date, ls, model);
  214. #endregion
  215. /**
  216. * // 快速通道
  217. //if (!_configServiceAccess.EnablePushFast) return;
  218. //List<TopicModel> lsFast = new List<TopicModel>
  219. //{
  220. // new TopicModel()
  221. // {
  222. // Topic = MqTopic.Fast,
  223. // Headers = _producer.CreateHeader(new Dictionary<string, int>
  224. // {
  225. // {MqHeader.DataType,(int)MqDataType.AlarmInfo },
  226. // {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm }
  227. // })
  228. // }
  229. //};
  230. //_producer.ProduceAsync(lsFast, new
  231. //{
  232. // messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  233. // topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  234. // time = date,
  235. // data = model
  236. //});
  237. //_logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", lsFast.Select(e => e.Topic)) + "|"+JsonConvert.SerializeObject(model));
  238. */
  239. }
  240. private async Task ProcessFastTopicAsync(string messageId, string date, List<TopicModel> lsFast, dynamic model)
  241. {
  242. // if (!_configServiceAccess.EnablePushFast) return;
  243. await _producer.ProduceAsync(lsFast, new
  244. {
  245. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  246. topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  247. time = date,
  248. data = model
  249. });
  250. string logStr = string.Format("设备{0},推送到{1}|{2}", model.imei, string.Join(", ", lsFast.Select(e => e.Topic)), JsonConvert.SerializeObject(model));
  251. _logger.LogInformation(logStr);
  252. }
  253. public async Task ProcessFencePlusAsync(FenceLocationPlus model, string date, string messageId)
  254. {
  255. if (!_configServiceAccess.EnablePushFence) return;
  256. List<TopicModel> ls = new()
  257. {
  258. new TopicModel()
  259. {
  260. Topic = MqTopic.Fence,
  261. Headers = _producer.CreateHeader(new Dictionary<string, int>
  262. {
  263. {MqHeader.DataType,(int)MqDataType.PositionInfo },
  264. })
  265. }
  266. };
  267. await _producer.ProduceAsync(ls, new
  268. {
  269. messageId = messageId,
  270. topic = MqTopic.Fence,
  271. time = date,
  272. data = model
  273. });
  274. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  275. }
  276. //public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date)
  277. //{
  278. // List<TopicModel> ls = new List<TopicModel>();
  279. // if (_configServiceAccess.EnablePushThird)
  280. // {
  281. // ls.Add(new TopicModel()
  282. // {
  283. // Topic = MqTopic.Third,
  284. // Headers = _producer.CreateHeader(new Dictionary<string, int>
  285. // {
  286. // {MqHeader.DataType,(int)MqDataType.PositionInfo },
  287. // })
  288. // });
  289. // }
  290. // if (_configServiceAccess.EnableLocationMonitor)
  291. // {
  292. // ls.Add(new TopicModel()
  293. // {
  294. // Topic = MqTopic.LocationMonitor,
  295. // Headers = _producer.CreateHeader(new Dictionary<string, int>
  296. // {
  297. // { MqHeader.DataType,(int)MqDataType.PositionInfo},
  298. // })
  299. // });
  300. // }
  301. // if (ls.Count == 0) return;
  302. // await _producer.ProduceAsync(ls, new
  303. // {
  304. // messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  305. // topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  306. // time = date,
  307. // data = model
  308. // });
  309. // _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  310. //}
  311. /// <summary>
  312. /// 含有wifi列表
  313. /// </summary>
  314. /// <param name="messageId"></param>
  315. /// <param name="model"></param>
  316. /// <param name="date"></param>
  317. /// <param name="wifiMacs"></param>
  318. /// <returns></returns>
  319. public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date, string wifiMacs = "")
  320. {
  321. List<TopicModel> ls = new();
  322. //if (_configServiceAccess.EnablePushThird)
  323. //{
  324. // ls.Add(new TopicModel()
  325. // {
  326. // Topic = MqTopic.Third,
  327. // Headers = _producer.CreateHeader(new Dictionary<string, int>
  328. // {
  329. // {MqHeader.DataType,(int)MqDataType.PositionInfo },
  330. // })
  331. // });
  332. //}
  333. if (_configServiceAccess.EnablePushPosition)
  334. {
  335. ls.Add(new TopicModel()
  336. {
  337. Topic = MqTopic.PushPosition,
  338. Headers = _producer.CreateHeader(new Dictionary<string, int>
  339. {
  340. {MqHeader.DataType,(int)MqDataType.PositionInfo },
  341. })
  342. });
  343. }
  344. if (_configServiceAccess.EnableLocationMonitor)
  345. {
  346. ls.Add(new TopicModel()
  347. {
  348. Topic = MqTopic.LocationMonitor,
  349. Headers = _producer.CreateHeader(new Dictionary<string, int>
  350. {
  351. { MqHeader.DataType,(int)MqDataType.PositionInfo},
  352. })
  353. });
  354. }
  355. if (ls.Count == 0) return;
  356. await _producer.ProduceAsync(ls, new
  357. {
  358. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  359. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  360. time = date,
  361. data = new
  362. {
  363. model.imei,
  364. model.locationType,
  365. model.address,
  366. model.altitude,
  367. model.baiduLatitude,
  368. model.baiduLongitude,
  369. model.gaodeLatitude,
  370. model.gaodeLongitude,
  371. model.originalLatitude,
  372. model.originalLongitude,
  373. model.postcode,
  374. model.hashParam,
  375. model.radius,
  376. model.province,
  377. model.city,
  378. model.district,
  379. wifiMacs
  380. }
  381. });
  382. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  383. }
  384. #region topics.storage.iot.postion
  385. public async Task ProcessIotPositionAsync(string messageId, string imei,
  386. // PackageMsgModel msg
  387. object msg
  388. )
  389. {
  390. List<TopicModel> ls = new List<TopicModel>
  391. {
  392. new TopicModel()
  393. {
  394. Topic = MqTopic.IotPosition,
  395. Headers = _producer.CreateHeader(new Dictionary<string, int>
  396. {
  397. {MqHeader.DataType,(int)MqDataType.IotPositionInfo }
  398. })
  399. }
  400. };
  401. await _producer.ProduceAsync(ls, new
  402. {
  403. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  404. topic = string.Join(",", ls.Select(e => e.Topic)),
  405. time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
  406. data = msg
  407. });
  408. _logger.LogInformation($"设备{imei},推送IOT定位原文消息 {messageId} 到" + string.Join(", ", ls.Select(e => e.Topic)));
  409. }
  410. #endregion
  411. }
  412. }