Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

440 linhas
17KB

  1. using GpsCardGatewayPosition.Model.Config;
  2. using GpsCardGatewayPosition.Model.Enum;
  3. using GpsCardGatewayPosition.Model.IoT;
  4. using GpsCardGatewayPosition.Model.Templates;
  5. using GpsCardGatewayPosition.Service.MqProducer.Model;
  6. using GpsCardGatewayPosition.Service.Resolver.Property.Dto;
  7. using Microsoft.Extensions.Logging;
  8. using Microsoft.Extensions.Options;
  9. using Newtonsoft.Json;
  10. using System;
  11. using System.Collections.Generic;
  12. using System.Linq;
  13. using System.Security.Claims;
  14. using System.Text;
  15. using System.Threading.Tasks;
  16. using TelpoDataService.Util.Entities.GpsLocationHistory;
  17. namespace GpsCardGatewayPosition.Service.MqProducer
  18. {
  19. public class MqProcessLogic
  20. {
  21. private readonly MessageProducer _producer;
  22. private readonly ILogger<MqProcessLogic> _logger;
  23. private readonly ServiceAccessConfig _configServiceAccess;
  24. public MqProcessLogic(IOptions<ServiceAccessConfig> optConfigServiceAccess,MessageProducer producer, ILogger<MqProcessLogic> logger)
  25. {
  26. _configServiceAccess = optConfigServiceAccess.Value;
  27. _producer = producer;
  28. _logger = logger;
  29. }
  30. public async Task ProcessWxAlarmAsync(HisGpsAlarm alarm)
  31. {
  32. if (!_configServiceAccess.EnablePushWx) return;
  33. DateTime time = alarm.DeviceUtcTime.Value.AddHours(8);
  34. string timeString = time.ToString("yyyy-MM-dd HH:mm:ss");
  35. List<TopicModel> ls = new List<TopicModel>();
  36. ls.Add(new TopicModel()
  37. {
  38. Topic = MqTopic.Wx,
  39. Headers = _producer.CreateHeader(new Dictionary<string, int>
  40. {
  41. {MqHeader.DataType,(int)MqDataType.AlarmInfo },
  42. })
  43. });
  44. var messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now);
  45. var model = new
  46. {
  47. deviceId = alarm.DeviceId,
  48. imei = alarm.Serialno,
  49. alarmTypeId = alarm.TypeId,
  50. alarmDeviceName = alarm.DeviceName,
  51. alarmRemarks = alarm.Remarks,
  52. address = alarm.Address,
  53. };
  54. await _producer.ProduceAsync(ls, new
  55. {
  56. //messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  57. messageId,
  58. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
  59. time = timeString,
  60. //data = new
  61. //{
  62. // deviceId = alarm.DeviceId,
  63. // imei = alarm.Serialno,
  64. // alarmTypeId = alarm.TypeId,
  65. // alarmDeviceName = alarm.DeviceName,
  66. // alarmRemarks = alarm.Remarks,
  67. // address = alarm.Address,
  68. //}
  69. data = model
  70. });
  71. #region 快速通道 Topic
  72. if (!_configServiceAccess.EnablePushFast) return;
  73. ls.ForEach(x => x.Topic = MqTopic.Fast);
  74. await ProcessFastTopicAsync(messageId, timeString, ls, model);
  75. #endregion
  76. }
  77. public async Task ProcessWxTemperatureAsync(string messageId, TemperatureInfoModel temp, LocationType type = default, MethodType method = default)
  78. {
  79. if (!_configServiceAccess.EnablePushWx) return;
  80. DateTime time = temp.TempTime ?? DateTime.Now;
  81. //取消测温30分钟有效性限制
  82. //if (DateTime.Now.Subtract(time).TotalMinutes > 30) //不管哪种测温消息,超过30分钟上报的均不处理
  83. //{
  84. // _logger.LogError($"测温消息[{messageId}]30分钟超时,不处理");
  85. // return;
  86. //}
  87. string timeString = time.ToString("yyyy-MM-dd HH:mm:ss");
  88. List<TopicModel> ls = new List<TopicModel>();
  89. // kafka数据类型
  90. int mqDataType = (int)MqDataType.TemperatureInfo;
  91. //if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo;
  92. if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info;
  93. ls.Add(new TopicModel()
  94. {
  95. Topic = MqTopic.Wx,
  96. Headers = _producer.CreateHeader(new Dictionary<string, int>
  97. {
  98. // {MqHeader.DataType,(int)MqDataType.TemperatureInfo },
  99. {MqHeader.DataType,mqDataType },
  100. })
  101. });
  102. var address = string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address });
  103. // 空地址 address设置为空
  104. if (string.IsNullOrEmpty(temp.Address))
  105. {
  106. address = string.Empty;
  107. }
  108. // LBS地址 address设置为空
  109. if (type == LocationType.LBS)
  110. {
  111. address = string.Empty;
  112. }
  113. var lsData = new
  114. {
  115. deviceId = temp.DeviceId,
  116. imei = temp.Imei,
  117. alarmTypeId = (int)AlarmTypes.Temperature,
  118. alarmDeviceName = temp.DeviceName,
  119. alarmRemarks = temp.Temperature?.ToString("f1"),
  120. //address =string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }),
  121. // address = (type == LocationType.LBS || string.IsNullOrEmpty(temp.Address)) ? string.Empty : string.Join(",", new string[] { temp.Province, temp.City, temp.District, temp.Address }),
  122. address,
  123. deviceKey = temp.DeviceKey
  124. };
  125. await _producer.ProduceAsync(ls, new
  126. {
  127. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  128. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
  129. time = timeString,
  130. data = lsData
  131. });
  132. _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "| MqTopic.Wx-ls:" + JsonConvert.SerializeObject(lsData));
  133. #region 第三方推送(杨雷新增)
  134. List<TopicModel> ls2 = new List<TopicModel>();
  135. //int mqDataType = (int)MqDataType.TemperatureInfo;
  136. ////if (method == MethodType.Manual) mqDataType = (int)MqDataType.TemperatureInfo;
  137. //if (method == MethodType.Period) mqDataType = (int)MqDataType.Temperature1Info;
  138. ls2.Add(new TopicModel()
  139. {
  140. Topic = MqTopic.Third,
  141. Headers = _producer.CreateHeader(new Dictionary<string, int>
  142. {
  143. //{MqHeader.DataType,(int)MqDataType.TemperatureInfo },
  144. {MqHeader.DataType,mqDataType},
  145. })
  146. });
  147. ls2.Add(new TopicModel()
  148. {
  149. Topic = MqTopic.Healthy,
  150. Headers = _producer.CreateHeader(new Dictionary<string, int>
  151. {
  152. //{MqHeader.DataType,(int)MqDataType.TemperatureInfo },
  153. {MqHeader.DataType,mqDataType},
  154. })
  155. });
  156. await _producer.ProduceAsync(ls2, new
  157. {
  158. messageId = messageId,
  159. topic = string.Join(",", ls2.Select(e => e.Topic)),
  160. time = timeString,
  161. data = temp
  162. });
  163. #endregion
  164. _logger.LogInformation($"ProcessWxTemperature测温:设备{temp.Imei}|tempId:{temp.TempId},推送到" + string.Join(", ", ls.Select(e => e.Topic)) + "|MqTopic.Third-ls2:" + JsonConvert.SerializeObject(temp));
  165. #region 快速通道 Topic
  166. if (!_configServiceAccess.EnablePushFast) return;
  167. ls.ForEach(x => x.Topic = MqTopic.Fast);
  168. await ProcessFastTopicAsync(messageId, timeString, ls, lsData);
  169. ls2.ForEach(x => x.Topic = MqTopic.Fast);
  170. // 兼容已定属性
  171. var ls2Data = new
  172. {
  173. deviceId = temp.DeviceId,
  174. deviceName = temp.DeviceName,
  175. tempId = temp.TempId,
  176. tempTime = temp.TempTime,
  177. imei = temp.Imei,
  178. temperature = temp.Temperature,
  179. province = temp.Province,
  180. city = temp.City,
  181. district = temp.District,
  182. address = temp.Address
  183. };
  184. await ProcessFastTopicAsync(messageId, timeString, ls2, ls2Data);
  185. #endregion
  186. }
  187. public async Task ProcessAlarmSosAsync(string messageId, SoSTemplates model, string date)
  188. {
  189. if (!_configServiceAccess.EnablePushThird) return;
  190. List<TopicModel> ls = new List<TopicModel>();
  191. ls.Add(new TopicModel()
  192. {
  193. Topic = MqTopic.Third,
  194. Headers = _producer.CreateHeader(new Dictionary<string, int>
  195. {
  196. {MqHeader.DataType,(int)MqDataType.AlarmInfo },
  197. {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm }
  198. })
  199. });
  200. await _producer.ProduceAsync(ls, new
  201. {
  202. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  203. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  204. time = date,
  205. data = model
  206. });
  207. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  208. #region 快速通道 Topic
  209. if (!_configServiceAccess.EnablePushFast) return;
  210. ls.ForEach(x => x.Topic = MqTopic.Fast);
  211. await ProcessFastTopicAsync(messageId, date, ls, model);
  212. #endregion
  213. /**
  214. * // 快速通道
  215. //if (!_configServiceAccess.EnablePushFast) return;
  216. //List<TopicModel> lsFast = new List<TopicModel>
  217. //{
  218. // new TopicModel()
  219. // {
  220. // Topic = MqTopic.Fast,
  221. // Headers = _producer.CreateHeader(new Dictionary<string, int>
  222. // {
  223. // {MqHeader.DataType,(int)MqDataType.AlarmInfo },
  224. // {MqHeader.AlarmTypes,(int)AlarmType.SosAlarm }
  225. // })
  226. // }
  227. //};
  228. //_producer.ProduceAsync(lsFast, new
  229. //{
  230. // messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  231. // topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  232. // time = date,
  233. // data = model
  234. //});
  235. //_logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", lsFast.Select(e => e.Topic)) + "|"+JsonConvert.SerializeObject(model));
  236. */
  237. }
  238. private async Task ProcessFastTopicAsync(string messageId, string date, List<TopicModel> lsFast, dynamic model)
  239. {
  240. // if (!_configServiceAccess.EnablePushFast) return;
  241. await _producer.ProduceAsync(lsFast, new
  242. {
  243. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  244. topic = string.Join(",", lsFast.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  245. time = date,
  246. data = model
  247. });
  248. string logStr = string.Format("设备{0},推送到{1}|{2}", model.imei, string.Join(", ", lsFast.Select(e => e.Topic)), JsonConvert.SerializeObject(model));
  249. _logger.LogInformation(logStr);
  250. }
  251. public async Task ProcessFencePlusAsync(FenceLocationPlus model, string date, string messageId)
  252. {
  253. if (!_configServiceAccess.EnablePushFence) return;
  254. List<TopicModel> ls = new()
  255. {
  256. new TopicModel()
  257. {
  258. Topic = MqTopic.Fence,
  259. Headers = _producer.CreateHeader(new Dictionary<string, int>
  260. {
  261. {MqHeader.DataType,(int)MqDataType.PositionInfo },
  262. })
  263. }
  264. };
  265. await _producer.ProduceAsync(ls, new
  266. {
  267. messageId = messageId,
  268. topic = MqTopic.Fence,
  269. time = date,
  270. data = model
  271. });
  272. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  273. }
  274. public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date)
  275. {
  276. List<TopicModel> ls = new List<TopicModel>();
  277. if (_configServiceAccess.EnablePushThird)
  278. {
  279. ls.Add(new TopicModel()
  280. {
  281. Topic = MqTopic.Third,
  282. Headers = _producer.CreateHeader(new Dictionary<string, int>
  283. {
  284. {MqHeader.DataType,(int)MqDataType.PositionInfo },
  285. })
  286. });
  287. }
  288. if (_configServiceAccess.EnableLocationMonitor)
  289. {
  290. ls.Add(new TopicModel()
  291. {
  292. Topic = MqTopic.LocationMonitor,
  293. Headers = _producer.CreateHeader(new Dictionary<string, int>
  294. {
  295. { MqHeader.DataType,(int)MqDataType.PositionInfo},
  296. })
  297. });
  298. }
  299. if (ls.Count == 0) return;
  300. await _producer.ProduceAsync(ls, new
  301. {
  302. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  303. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  304. time = date,
  305. data = model
  306. });
  307. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  308. }
  309. /// <summary>
  310. /// 含有wifi列表
  311. /// </summary>
  312. /// <param name="messageId"></param>
  313. /// <param name="model"></param>
  314. /// <param name="date"></param>
  315. /// <param name="wifiMacs"></param>
  316. /// <returns></returns>
  317. public async Task ProcessPositionAsync(string messageId, LocationDatas model, string date, string wifiMacs)
  318. {
  319. List<TopicModel> ls = new();
  320. if (_configServiceAccess.EnablePushThird)
  321. {
  322. ls.Add(new TopicModel()
  323. {
  324. Topic = MqTopic.Third,
  325. Headers = _producer.CreateHeader(new Dictionary<string, int>
  326. {
  327. {MqHeader.DataType,(int)MqDataType.PositionInfo },
  328. })
  329. });
  330. }
  331. if (_configServiceAccess.EnableLocationMonitor)
  332. {
  333. ls.Add(new TopicModel()
  334. {
  335. Topic = MqTopic.LocationMonitor,
  336. Headers = _producer.CreateHeader(new Dictionary<string, int>
  337. {
  338. { MqHeader.DataType,(int)MqDataType.PositionInfo},
  339. })
  340. });
  341. }
  342. if (ls.Count == 0) return;
  343. await _producer.ProduceAsync(ls, new
  344. {
  345. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  346. topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Storage + "," + MqTopic.Third,
  347. time = date,
  348. data = new
  349. {
  350. model.imei,
  351. model.locationType,
  352. model.address,
  353. model.altitude,
  354. model.baiduLatitude,
  355. model.baiduLongitude,
  356. model.gaodeLatitude,
  357. model.gaodeLongitude,
  358. model.originalLatitude,
  359. model.originalLongitude,
  360. model.postcode,
  361. model.hashParam,
  362. model.radius,
  363. model.province,
  364. model.city,
  365. model.district,
  366. wifiMacs
  367. }
  368. });
  369. _logger.LogInformation($"设备{model.imei},推送到" + string.Join(", ", ls.Select(e => e.Topic)));
  370. }
  371. #region topics.storage.iot.postion
  372. public async Task ProcessIotPositionAsync(string messageId, string imei,
  373. // PackageMsgModel msg
  374. object msg
  375. )
  376. {
  377. List<TopicModel> ls = new List<TopicModel>
  378. {
  379. new TopicModel()
  380. {
  381. Topic = MqTopic.IotPosition,
  382. Headers = _producer.CreateHeader(new Dictionary<string, int>
  383. {
  384. {MqHeader.DataType,(int)MqDataType.IotPositionInfo }
  385. })
  386. }
  387. };
  388. await _producer.ProduceAsync(ls, new
  389. {
  390. messageId = messageId, //string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
  391. topic = string.Join(",", ls.Select(e => e.Topic)),
  392. time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
  393. data = msg
  394. });
  395. _logger.LogInformation($"设备{imei},推送IOT定位原文消息 {messageId} 到" + string.Join(", ", ls.Select(e => e.Topic)));
  396. }
  397. #endregion
  398. }
  399. }