万佳安设备数据
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

293 linhas
13KB

  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Options;
  3. using Newtonsoft.Json;
  4. using TelpoPush.WanJiaAn.Worker.Common;
  5. using TelpoPush.WanJiaAn.Worker.Models.Config;
  6. using TelpoPush.WanJiaAn.Worker.Models.Enum;
  7. using TelpoPush.WanJiaAn.Worker.Models.MqTemplates;
  8. using TelpoPush.WanJiaAn.Worker.Models.PushTemplates;
  9. using TelpoPush.WanJiaAn.Worker.Service.Cache;
  10. using TelpoPush.WanJiaAn.Worker.Service.Mq;
  11. namespace TelpoPush.WanJiaAn.Worker.Handlers
  12. {
  13. public class WanJiaAnProcess
  14. {
  15. private readonly static object _syncLocker = new object();
  16. private readonly IHostEnvironment _env;
  17. private readonly ILogger<WanJiaAnProcess> _logger;
  18. private readonly HttpHelperAsync _httpHelper;
  19. private readonly RedisUtil _redis;
  20. private readonly MqProcessMessage _serviceMqProcess;
  21. private readonly WanJiaAnConfig _WanJiaAnConfig;
  22. public WanJiaAnProcess(
  23. IHostEnvironment env,
  24. ILogger<WanJiaAnProcess> logger,
  25. HttpHelperAsync httpHelper,
  26. RedisUtil redis,
  27. MqProcessMessage serviceMqProcess,
  28. IOptions<WanJiaAnConfig> WanJiaAnConfig
  29. )
  30. {
  31. _env = env;
  32. _logger = logger;
  33. _httpHelper = httpHelper;
  34. _redis = redis;
  35. _WanJiaAnConfig = WanJiaAnConfig.Value;
  36. _serviceMqProcess = serviceMqProcess;
  37. }
  38. public async Task SendWanJiaAn(string? message, string topic, Headers headers)
  39. {
  40. #region 数据初始验证
  41. bool isHandle = true;
  42. BaseModel model = null;
  43. string imei = "";
  44. if (!string.IsNullOrEmpty(message))
  45. {
  46. model = JsonConvert.DeserializeObject<BaseModel>(message);
  47. if (model != null)
  48. {
  49. var Jo = JsonConvert.DeserializeObject<Dictionary<string, object>>(model.data.ToString());
  50. if (Jo.ContainsKey("device_id"))
  51. imei = Jo["device_id"].ToString();
  52. if (string.IsNullOrEmpty(imei))
  53. {
  54. _logger.LogInformation($"[数据信息不完整] imei信息不存在:{message}");
  55. isHandle = false;
  56. }
  57. else
  58. await _redis.GetGpsDevice(imei);
  59. }
  60. else
  61. {
  62. _logger.LogInformation($"[数据信息不完整] 数据解析异常:{message}");
  63. isHandle = false;
  64. }
  65. }
  66. else
  67. {
  68. _logger.LogInformation($"[数据信息不完整] message数据异常:{message}");
  69. isHandle = false;
  70. }
  71. #endregion
  72. if (isHandle)
  73. {
  74. lock (_syncLocker)
  75. {
  76. //Headers 解析
  77. HeadersDto headersDto = new HeadersDto();
  78. try
  79. {
  80. foreach (var item in headers)
  81. {
  82. if (item.Key == KafkaHeader.DataType)
  83. headersDto.DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  84. else if (item.Key == KafkaHeader.AlarmType)
  85. headersDto.AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  86. else if (item.Key == KafkaHeader.OperType)
  87. headersDto.OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  88. }
  89. }
  90. catch (Exception ex)
  91. {
  92. _logger.LogError($"当前工作线程Headers异常,{ex.Message}|{ex.Source}|{ex.StackTrace}");
  93. }
  94. try
  95. {
  96. #region 注释
  97. //string dataType = headersDto.DataType != null ? "_" + headersDto.DataType : "";
  98. //string alarmType = headersDto.AlarmType != null ? "_" + headersDto.AlarmType : "";
  99. //string operType = headersDto.OperType != null ? "_" + headersDto.OperType : "";
  100. //string key = dataType + alarmType + operType;
  101. //var dataStatus = _redis.IsDateStatus(model, imei, key).Result;
  102. //过滤
  103. //if (headersDto.DataType == (int)MqDataType.TemperatureInfo
  104. // || headersDto.DataType == (int)MqDataType.Temperature1Info
  105. // || headersDto.DataType == (int)MqDataType.BindDevice
  106. // || headersDto.DataType == (int)MqDataType.WanJiaAnInfo
  107. // || headersDto.DataType == (int)MqDataType.HeartRateInfo
  108. // || headersDto.DataType == (int)MqDataType.HeartRate1Info
  109. // || headersDto.DataType == (int)MqDataType.Spo2Info
  110. // || headersDto.DataType == (int)MqDataType.Spo21Info
  111. // || headersDto.DataType == (int)MqDataType.BloodPressInfo
  112. // || headersDto.DataType == (int)MqDataType.BloodPress1Info
  113. // || headersDto.DataType == (int)MqDataType.SportResult
  114. // || headersDto.DataType == (int)MqDataType.BloodSugar
  115. // )
  116. // dataStatus.isPush = true;
  117. //dataStatus.isPush = true;
  118. //if (dataStatus.isPush)
  119. //{
  120. //switch (topic)
  121. //{
  122. // case "topic.push.third":
  123. // switch (headersDto.DataType)
  124. // {
  125. // case (int)MqDataType.AlarmInfo: //报警消息
  126. // break;
  127. // case (int)MqDataType.TemperatureInfo: //体温消息
  128. // break;
  129. // case (int)MqDataType.WanJiaAnInfo: //定位消息
  130. // // DataServicePusWanJiaAn(model, imei);
  131. // break;
  132. // case (int)MqDataType.StepInfo: //步数消息
  133. // break;
  134. // case (int)MqDataType.BatteryLevelInfo: //电量消息
  135. // break;
  136. // case (int)MqDataType.DeviceCallLog: //设备通话记录
  137. // break;
  138. // case (int)MqDataType.DeviceSmsLog: //设备短信记录
  139. // break;
  140. // case (int)MqDataType.DeviceConfigInfo: //设备配置信息
  141. // break;
  142. // case (int)MqDataType.Status: //设备状态(offline,online)
  143. // break;
  144. // case (int)MqDataType.Active: //设备激活状态
  145. // break;
  146. // case (int)MqDataType.reply: //指令回调
  147. // break;
  148. // case (int)MqDataType.Weather: //天气查询
  149. // break;
  150. // case (int)MqDataType.ReadMsg: //短消息阅读
  151. // break;
  152. // case (int)MqDataType.StudyAINotifyStatusUpload: //学习能力状态
  153. // break;
  154. // case (int)MqDataType.HeartRateInfo: //心率
  155. // break;
  156. // case (int)MqDataType.HeartRate1Info: //周期性心率
  157. // break;
  158. // case (int)MqDataType.Spo2Info: //血氧
  159. // break;
  160. // case (int)MqDataType.Spo21Info: //周期性血氧
  161. // break;
  162. // case (int)MqDataType.Temperature1Info: //周期性报体温数据
  163. // break;
  164. // case (int)MqDataType.DrownReportInfo: //防溺水告警
  165. // break;
  166. // case (int)MqDataType.WearStatusInfo: //手表未佩戴告警
  167. // break;
  168. // case (int)MqDataType.BloodPressInfo: //血压
  169. // break;
  170. // case (int)MqDataType.BloodPress1Info: //周期性血压
  171. // break;
  172. // case (int)MqDataType.PsychInfo: //心理监测
  173. // break;
  174. // case (int)MqDataType.AiCallResult: //AI呼叫结果回调
  175. // case (int)MqDataType.CrossBorder: //越界上报(围栏进出告警)
  176. // break;
  177. // case (int)MqDataType.SportResult: //运动数据上报
  178. // break;
  179. // case (int)MqDataType.BindDevice: //绑定业务
  180. // break;
  181. // case (int)MqDataType.BloodSugar: //血糖业务
  182. // break;
  183. // default:
  184. // break;
  185. // }
  186. // break;
  187. // default:
  188. // break;
  189. //}
  190. //}
  191. //else
  192. // _logger.LogInformation($"数据未处理(历史数据):{JsonConvert.SerializeObject(dataStatus)}");
  193. #endregion
  194. switch (topic)
  195. {
  196. case "topic.wanjiaan.push.telpo":
  197. switch (headersDto.DataType)
  198. {
  199. case (int)MqDataType.WanJiaAnInfo: //定位消息
  200. DataServicePusWanJiaAn(model, imei);
  201. break;
  202. default:
  203. break;
  204. }
  205. break;
  206. default:
  207. break;
  208. }
  209. }
  210. catch (Exception ex)
  211. {
  212. _logger.LogError($"当前工作线程异常: {ex.Message}|{ex.Source}|{ex.StackTrace}");
  213. }
  214. }
  215. }
  216. }
  217. //位置
  218. public async Task DataServicePusWanJiaAn(BaseModel model,string imei)
  219. {
  220. if (model.data != null)
  221. {
  222. var alarmEvent = JsonConvert.DeserializeObject<MqAlarmEventTemplate>(model.data.ToString());
  223. if (alarmEvent?.alarm_event != null)
  224. {
  225. var data = alarmEvent?.alarm_event;
  226. if (data?.event_type == 2 || data?.event_type == 52)//2:哭声检测;52:遮脸提醒
  227. {
  228. string event_id = data.event_id;
  229. string device_id = model.device_id;
  230. string user_id = model.user_id;
  231. int event_type = data.event_type;
  232. int report_type = data.report_type;
  233. DateTime event_time = TimeHelper.ConvertToLocalDateTime(model.event_time);
  234. DateTime event_start = TimeHelper.ConvertToLocalDateTime(data.event_start);
  235. if (!string.IsNullOrEmpty(data.event_end))
  236. {
  237. DateTime event_end = TimeHelper.ConvertToLocalDateTime(data.event_end);
  238. }
  239. if (!string.IsNullOrEmpty(data.video_start))
  240. {
  241. DateTime video_start = TimeHelper.ConvertToLocalDateTime(data.video_start);
  242. }
  243. if (!string.IsNullOrEmpty(data.video_end))
  244. {
  245. DateTime video_end = TimeHelper.ConvertToLocalDateTime(data.video_end);
  246. }
  247. string image_url = data.image;
  248. string video_url = data.url;
  249. string remark = JsonConvert.SerializeObject(model);
  250. DateTime create_time = DateTime.Now;
  251. var obj = new { };
  252. }
  253. }
  254. }
  255. }
  256. }
  257. }