万佳安设备数据
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

385 lines
19KB

  1. using Confluent.Kafka;
  2. using Google.Protobuf.WellKnownTypes;
  3. using Microsoft.Extensions.Options;
  4. using Newtonsoft.Json;
  5. using System.ComponentModel.DataAnnotations;
  6. using TelpoDataService.Util.Clients;
  7. using TelpoDataService.Util.Entities.GpsLocationHistory;
  8. using TelpoDataService.Util.Models;
  9. using TelpoDataService.Util.QueryObjects;
  10. using TelpoPush.WanJiaAn.Worker.Common;
  11. using TelpoPush.WanJiaAn.Worker.Models.Config;
  12. using TelpoPush.WanJiaAn.Worker.Models.Enum;
  13. using TelpoPush.WanJiaAn.Worker.Models.MqTemplates;
  14. using TelpoPush.WanJiaAn.Worker.Models.PushTemplates;
  15. using TelpoPush.WanJiaAn.Worker.Service.Cache;
  16. using TelpoPush.WanJiaAn.Worker.Service.Mq;
  17. namespace TelpoPush.WanJiaAn.Worker.Handlers
  18. {
  19. public class WanJiaAnProcess
  20. {
  21. private readonly static object _syncLocker = new object();
  22. private readonly IHostEnvironment _env;
  23. private readonly ILogger<WanJiaAnProcess> _logger;
  24. private readonly HttpHelperAsync _httpHelper;
  25. private readonly RedisUtil _redis;
  26. private readonly MqProcessMessage _serviceMqProcess;
  27. private readonly WanJiaAnConfig _WanJiaAnConfig;
  28. private readonly GpsLocationHistoryAccessorClient<WanjiaanAlarmEvent> _messageWanJiaAnClient;
  29. public WanJiaAnProcess(
  30. IHostEnvironment env,
  31. ILogger<WanJiaAnProcess> logger,
  32. HttpHelperAsync httpHelper,
  33. RedisUtil redis,
  34. MqProcessMessage serviceMqProcess,
  35. IOptions<WanJiaAnConfig> WanJiaAnConfig,
  36. GpsLocationHistoryAccessorClient<WanjiaanAlarmEvent> messageWanJiaAnClient
  37. )
  38. {
  39. _env = env;
  40. _logger = logger;
  41. _httpHelper = httpHelper;
  42. _redis = redis;
  43. _WanJiaAnConfig = WanJiaAnConfig.Value;
  44. _serviceMqProcess = serviceMqProcess;
  45. _messageWanJiaAnClient = messageWanJiaAnClient;
  46. }
  47. public async Task SendWanJiaAn(string? message, string topic, Headers headers)
  48. {
  49. #region 数据初始验证
  50. bool isHandle = true;
  51. BaseModel model = null;
  52. string imei = "";
  53. if (!string.IsNullOrEmpty(message))
  54. {
  55. model = JsonConvert.DeserializeObject<BaseModel>(message);
  56. if (model != null)
  57. {
  58. var Jo = JsonConvert.DeserializeObject<Dictionary<string, object>>(message);
  59. if (Jo.ContainsKey("device_id"))
  60. imei = Jo["device_id"].ToString();
  61. if (string.IsNullOrEmpty(imei))
  62. {
  63. _logger.LogInformation($"[数据信息不完整] device_id信息不存在:{message}");
  64. isHandle = false;
  65. }
  66. //else
  67. // await _redis.GetGpsDevice(imei);
  68. }
  69. else
  70. {
  71. _logger.LogInformation($"[数据信息不完整] 数据解析异常:{message}");
  72. isHandle = false;
  73. }
  74. }
  75. else
  76. {
  77. _logger.LogInformation($"[数据信息不完整] message数据异常:{message}");
  78. isHandle = false;
  79. }
  80. #endregion
  81. if (isHandle)
  82. {
  83. lock (_syncLocker)
  84. {
  85. #region 注释 Headers 解析
  86. //Headers 解析
  87. //HeadersDto headersDto = new HeadersDto();
  88. //try
  89. //{
  90. // foreach (var item in headers)
  91. // {
  92. // if (item.Key == KafkaHeader.DataType)
  93. // headersDto.DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  94. // else if (item.Key == KafkaHeader.AlarmType)
  95. // headersDto.AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  96. // else if (item.Key == KafkaHeader.OperType)
  97. // headersDto.OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  98. // }
  99. //}
  100. //catch (Exception ex)
  101. //{
  102. // _logger.LogError($"当前工作线程Headers异常,{ex.Message}|{ex.Source}|{ex.StackTrace}");
  103. //}
  104. #endregion
  105. try
  106. {
  107. #region 注释
  108. //string dataType = headersDto.DataType != null ? "_" + headersDto.DataType : "";
  109. //string alarmType = headersDto.AlarmType != null ? "_" + headersDto.AlarmType : "";
  110. //string operType = headersDto.OperType != null ? "_" + headersDto.OperType : "";
  111. //string key = dataType + alarmType + operType;
  112. //var dataStatus = _redis.IsDateStatus(model, imei, key).Result;
  113. //过滤
  114. //if (headersDto.DataType == (int)MqDataType.TemperatureInfo
  115. // || headersDto.DataType == (int)MqDataType.Temperature1Info
  116. // || headersDto.DataType == (int)MqDataType.BindDevice
  117. // || headersDto.DataType == (int)MqDataType.WanJiaAnInfo
  118. // || headersDto.DataType == (int)MqDataType.HeartRateInfo
  119. // || headersDto.DataType == (int)MqDataType.HeartRate1Info
  120. // || headersDto.DataType == (int)MqDataType.Spo2Info
  121. // || headersDto.DataType == (int)MqDataType.Spo21Info
  122. // || headersDto.DataType == (int)MqDataType.BloodPressInfo
  123. // || headersDto.DataType == (int)MqDataType.BloodPress1Info
  124. // || headersDto.DataType == (int)MqDataType.SportResult
  125. // || headersDto.DataType == (int)MqDataType.BloodSugar
  126. // )
  127. // dataStatus.isPush = true;
  128. //dataStatus.isPush = true;
  129. //if (dataStatus.isPush)
  130. //{
  131. //switch (topic)
  132. //{
  133. // case "topic.push.third":
  134. // switch (headersDto.DataType)
  135. // {
  136. // case (int)MqDataType.AlarmInfo: //报警消息
  137. // break;
  138. // case (int)MqDataType.TemperatureInfo: //体温消息
  139. // break;
  140. // case (int)MqDataType.WanJiaAnInfo: //定位消息
  141. // // DataServicePusWanJiaAn(model, imei);
  142. // break;
  143. // case (int)MqDataType.StepInfo: //步数消息
  144. // break;
  145. // case (int)MqDataType.BatteryLevelInfo: //电量消息
  146. // break;
  147. // case (int)MqDataType.DeviceCallLog: //设备通话记录
  148. // break;
  149. // case (int)MqDataType.DeviceSmsLog: //设备短信记录
  150. // break;
  151. // case (int)MqDataType.DeviceConfigInfo: //设备配置信息
  152. // break;
  153. // case (int)MqDataType.Status: //设备状态(offline,online)
  154. // break;
  155. // case (int)MqDataType.Active: //设备激活状态
  156. // break;
  157. // case (int)MqDataType.reply: //指令回调
  158. // break;
  159. // case (int)MqDataType.Weather: //天气查询
  160. // break;
  161. // case (int)MqDataType.ReadMsg: //短消息阅读
  162. // break;
  163. // case (int)MqDataType.StudyAINotifyStatusUpload: //学习能力状态
  164. // break;
  165. // case (int)MqDataType.HeartRateInfo: //心率
  166. // break;
  167. // case (int)MqDataType.HeartRate1Info: //周期性心率
  168. // break;
  169. // case (int)MqDataType.Spo2Info: //血氧
  170. // break;
  171. // case (int)MqDataType.Spo21Info: //周期性血氧
  172. // break;
  173. // case (int)MqDataType.Temperature1Info: //周期性报体温数据
  174. // break;
  175. // case (int)MqDataType.DrownReportInfo: //防溺水告警
  176. // break;
  177. // case (int)MqDataType.WearStatusInfo: //手表未佩戴告警
  178. // break;
  179. // case (int)MqDataType.BloodPressInfo: //血压
  180. // break;
  181. // case (int)MqDataType.BloodPress1Info: //周期性血压
  182. // break;
  183. // case (int)MqDataType.PsychInfo: //心理监测
  184. // break;
  185. // case (int)MqDataType.AiCallResult: //AI呼叫结果回调
  186. // case (int)MqDataType.CrossBorder: //越界上报(围栏进出告警)
  187. // break;
  188. // case (int)MqDataType.SportResult: //运动数据上报
  189. // break;
  190. // case (int)MqDataType.BindDevice: //绑定业务
  191. // break;
  192. // case (int)MqDataType.BloodSugar: //血糖业务
  193. // break;
  194. // default:
  195. // break;
  196. // }
  197. // break;
  198. // default:
  199. // break;
  200. //}
  201. //}
  202. //else
  203. // _logger.LogInformation($"数据未处理(历史数据):{JsonConvert.SerializeObject(dataStatus)}");
  204. #endregion
  205. switch (topic)
  206. {
  207. case "topic.wanjiaan.push.telpo":
  208. DataServicePusWanJiaAn(model);
  209. break;
  210. default:
  211. break;
  212. }
  213. }
  214. catch (Exception ex)
  215. {
  216. _logger.LogError($"当前工作线程异常: {ex.Message}|{ex.Source}|{ex.StackTrace}");
  217. }
  218. }
  219. }
  220. }
  221. //位置
  222. public async Task DataServicePusWanJiaAn(BaseModel model)
  223. {
  224. try
  225. {
  226. if (model.data != null)
  227. {
  228. var alarmEvent = JsonConvert.DeserializeObject<MqAlarmEventTemplate>(model.data.ToString());
  229. if (alarmEvent?.alarm_event != null)
  230. {
  231. var data = alarmEvent?.alarm_event;
  232. if (data?.event_type == 2 || data?.event_type == 52)//2:哭声检测;52:遮脸提醒
  233. {
  234. string tagRemark = "";
  235. if (data.report_type == 1)
  236. tagRemark = "事件开始";
  237. else if (data.report_type == 2)
  238. tagRemark = "事件结束";
  239. else if (data.report_type == 3)
  240. tagRemark = "资源上报";
  241. string tag = data?.event_type == 2 ? "哭声检测" : "遮脸提醒";
  242. string eventId = $"{data?.event_id}_{data?.report_type}";
  243. var wanjiaanAlarm = new WanjiaanAlarmEvent()
  244. {
  245. EventId = eventId,
  246. DeviceId = model.device_id,
  247. UserId = model.user_id,
  248. EventTime = TimeHelper.ConvertToLocalDateTime(model.event_time),
  249. EventType = data.event_type,
  250. ReportType = data.report_type,
  251. EventStart = TimeHelper.ConvertToLocalDateTime(data.event_start),
  252. ImageUrl = data.image,
  253. VideoUrl = data.url,
  254. Remark = $"{tagRemark}:{JsonConvert.SerializeObject(model)};",
  255. CreateTime = DateTime.Now,
  256. };
  257. if (!string.IsNullOrEmpty(data.event_end))
  258. wanjiaanAlarm.EventEnd = TimeHelper.ConvertToLocalDateTime(data.event_end);
  259. if (!string.IsNullOrEmpty(data.video_start))
  260. wanjiaanAlarm.VideoStart = TimeHelper.ConvertToLocalDateTime(data.video_start);
  261. if (!string.IsNullOrEmpty(data.video_end))
  262. wanjiaanAlarm.VideoEnd = TimeHelper.ConvertToLocalDateTime(data.video_end);
  263. await _messageWanJiaAnClient.AddAsync(wanjiaanAlarm);//数据保存
  264. //推送 业务
  265. //var messageObj = new
  266. //{
  267. // messageId = eventId,
  268. // topic = "topic.push.third",
  269. // time = TimeHelper.ToDateTimeStr(wanjiaanAlarm.EventTime),
  270. // data = new
  271. // {
  272. // imei = wanjiaanAlarm.DeviceId,
  273. // wanjiaanAlarm.UserId,
  274. // wanjiaanAlarm.EventType,
  275. // wanjiaanAlarm.ReportType,
  276. // wanjiaanAlarm.EventStart,
  277. // wanjiaanAlarm.EventEnd,
  278. // wanjiaanAlarm.ImageUrl,
  279. // wanjiaanAlarm.VideoUrl,
  280. // wanjiaanAlarm.VideoStart,
  281. // wanjiaanAlarm.VideoEnd
  282. // }
  283. //};
  284. //var header = new Dictionary<string, int> { { "DataType", 32 } };
  285. //await _serviceMqProcess.ProcessThirdhServer(wanjiaanAlarm.DeviceId, messageObj, header, $"{tag}({tagRemark})");
  286. #region 注释
  287. //var alarmEventDb = await _redis.GetWanjiaanAlarmEvent(eventId);
  288. //if (alarmEventDb != null)
  289. // wanjiaanAlarm.Remark = alarmEventDb.Remark + wanjiaanAlarm.Remark;
  290. //await _redis.SetWanjiaanAlarmEvent(wanjiaanAlarm.EventId, wanjiaanAlarm);
  291. //if (data.report_type == 3)
  292. //{
  293. // await _messageWanJiaAnClient.AddAsync(wanjiaanAlarm);
  294. // await _redis.DelWanjiaanAlarmEvent(wanjiaanAlarm.EventId);
  295. //}
  296. //var param = new GeneralParam
  297. //{
  298. // Filters = new List<QueryFilterCondition> {
  299. // new QueryFilterCondition {
  300. // Key = nameof(WanjiaanAlarmEvent.DeviceId),
  301. // Value = device_id,
  302. // ValueType = QueryValueTypeEnum.String,
  303. // Operator = QueryOperatorEnum.Equal
  304. // }, new QueryFilterCondition {
  305. // Key = nameof(WanjiaanAlarmEvent.EventId),
  306. // Value = event_id,
  307. // ValueType = QueryValueTypeEnum.String,
  308. // Operator = QueryOperatorEnum.Equal
  309. // }
  310. //}
  311. //};
  312. //var alarmEventDb =_messageWanJiaAnClient.GetFirst(param);
  313. //if (alarmEventDb != null)
  314. //{
  315. // alarmEventDb.UserId = user_id;
  316. // alarmEventDb.EventTime = event_time;
  317. // alarmEventDb.EventType = event_type;
  318. // alarmEventDb.ReportType = report_type;
  319. // alarmEventDb.EventStart = event_start;
  320. // alarmEventDb.EventEnd = wanjiaanAlarm.EventEnd;
  321. // alarmEventDb.ImageUrl = image_url;
  322. // alarmEventDb.VideoUrl = video_url;
  323. // alarmEventDb.VideoStart = wanjiaanAlarm.VideoStart;
  324. // alarmEventDb.VideoEnd = wanjiaanAlarm.VideoEnd;
  325. // alarmEventDb.Remark = alarmEventDb.Remark + remark;
  326. // _messageWanJiaAnClient.Update(alarmEventDb);
  327. // await _redis.SetWanjiaanAlarmEvent(event_id, alarmEventDb);
  328. //}
  329. //else
  330. //{
  331. // _messageWanJiaAnClient.Add(wanjiaanAlarm);
  332. // await _redis.SetWanjiaanAlarmEvent(event_id, wanjiaanAlarm);
  333. //}
  334. #endregion
  335. }
  336. }
  337. }
  338. }
  339. catch (Exception ex)
  340. {
  341. string s = ex.Message;
  342. }
  343. }
  344. }
  345. }