You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

338 lines
18KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Common.helper;
  3. using HealthMonitor.Model.Service.Mapper;
  4. using HealthMonitor.Service.Biz;
  5. using HealthMonitor.Service.Biz.db;
  6. using HealthMonitor.Service.Cache;
  7. using HealthMonitor.Service.Etcd;
  8. using HealthMonitor.Service.Resolver.Interface;
  9. using HealthMonitor.Service.Sub;
  10. using HealthMonitor.Service.Sub.Topic.Model;
  11. using Microsoft.EntityFrameworkCore.Metadata;
  12. using Microsoft.Extensions.Logging;
  13. using Newtonsoft.Json;
  14. using System;
  15. using System.Collections.Generic;
  16. using System.Linq;
  17. using System.Text;
  18. using System.Threading.Tasks;
  19. using TelpoDataService.Util.Clients;
  20. using TelpoDataService.Util.Entities.GpsLocationHistory;
  21. namespace HealthMonitor.Service.Resolver
  22. {
  23. public class PregnancyHeartRateResolver : IResolver
  24. {
  25. private readonly ILogger<PregnancyHeartRateResolver> _logger;
  26. private readonly TDengineService _serviceTDengine;
  27. private readonly DeviceCacheManager _deviceCacheMgr;
  28. private readonly IotApiService _serviceIotApi;
  29. private readonly AsyncLocal<string> _messageId = new();
  30. private readonly AsyncLocal<HisGpsHeartRate> _msgData = new();
  31. private readonly HttpHelper _httpHelper = default!;
  32. private readonly EtcdService _serviceEtcd;
  33. private readonly GpsLocationHistoryAccessorClient<HisGpsFetalHeartRate> _hisFetalHeartApiClient;
  34. public PregnancyHeartRateResolver(ILogger<PregnancyHeartRateResolver> logger,
  35. HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr,
  36. IotApiService iotApiService, TDengineService serviceDengine, GpsLocationHistoryAccessorClient<HisGpsFetalHeartRate> hisFetalHeartApiClient)
  37. {
  38. _logger = logger;
  39. _httpHelper = httpHelper;
  40. _serviceEtcd = serviceEtcd;
  41. _serviceTDengine = serviceDengine;
  42. _deviceCacheMgr = deviceCacheMgr;
  43. _serviceIotApi = iotApiService;
  44. _hisFetalHeartApiClient = hisFetalHeartApiClient;
  45. }
  46. public void SetResolveInfo(PackageMsgModel msg)
  47. {
  48. var topicHmPregnancyHeartRate = JsonConvert.DeserializeObject<TopicHmPregnancyHeartRate>(msg.DetailData.ToString()!);
  49. _messageId.Value = msg.MessageId;
  50. _msgData.Value = new HisGpsHeartRate()
  51. {
  52. HeartRateId = topicHmPregnancyHeartRate!.PregnancyHeartRateId,
  53. MessageId = topicHmPregnancyHeartRate!.MessageId,
  54. Serialno = topicHmPregnancyHeartRate!.Serialno,
  55. HeartRate= topicHmPregnancyHeartRate.PregnancyHeartRate,
  56. LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmPregnancyHeartRate.LastUpdate) / 1000000),
  57. CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmPregnancyHeartRate.CreateTime) / 1000000),
  58. Method = topicHmPregnancyHeartRate!.Method,
  59. IsDisplay = topicHmPregnancyHeartRate!.IsDisplay ? 1 : 0
  60. };
  61. }
  62. public override string ToString()
  63. {
  64. return $"{nameof(PregnancyHeartRateResolver)}[{_messageId.Value}]";
  65. }
  66. public async Task ExecuteMessageAsync()
  67. {
  68. //throw new NotImplementedException();
  69. var messageId = _messageId.Value;
  70. var heartRate = _msgData.Value!;
  71. var watchConfig = await _deviceCacheMgr.GetGpsDeviceWatchConfigCacheObjectBySerialNoAsync(heartRate.Serialno, "0067");
  72. var isFetalHeartEnable = watchConfig != null && (int)watchConfig["enabled"]! == 1;
  73. if (isFetalHeartEnable)
  74. {
  75. var phr = await _serviceTDengine.GetBySerialNoAsync<PregnancyHeartRateModel>(heartRate.Serialno, 7);
  76. if (phr.Count >= 30)
  77. {
  78. #region 高频心率计算
  79. // 获取最近的两个记录,并计算它们的 LastUpdate 时间差
  80. var firstTwoPhr = phr.OrderByDescending(i => i.LastUpdate).Take(2).Select(i => i.LastUpdate).ToList();
  81. var timeDiff = firstTwoPhr[0] - firstTwoPhr[1];
  82. // 如果需要,将时间差转换为秒
  83. var timeDiffInSeconds = timeDiff.TotalSeconds;
  84. // 高频心率采样间隔
  85. var highFreqSampleInterval = (int)watchConfig!["highFreqSampleInterval"]!;
  86. // 触发高频监测的心率上限值
  87. var triggerHighFreqHigh = (int)watchConfig["triggerHighFreqHigh"]!;
  88. // 触发高频监测的心率下限值
  89. var triggerHighFreqLow = (int)watchConfig["triggerHighFreqLow"]!;
  90. //停止高频心率采样心率连续正常次数
  91. var stopHighFreqSampleCount = (int)watchConfig["stopHighFreqSampleCount"]!;
  92. // 高频心率采集时长 0 为持续采集,非零为高频心率的采集时长
  93. var highFreqSampleTimes = (int)watchConfig["stopHighFreqSampleCount"]!;
  94. // 告警上限阀值
  95. var upperAlarmThreshold = (int)watchConfig["upperAlarmThreshold"]!;
  96. // 告警下限阀值
  97. var lowerAlarmThreshold= (int)watchConfig["lowerAlarmThreshold"]!;
  98. // 高频心率启动
  99. if (timeDiffInSeconds<=highFreqSampleInterval)
  100. {
  101. var phrFreqstatus =await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  102. if (phrFreqstatus == null)
  103. {
  104. /// 设置高频状态
  105. _logger.LogInformation($"进入高频心率启动状态 timeDiffInSeconds {timeDiffInSeconds},highFreqSampleInterval:{highFreqSampleInterval}");
  106. // 设置高频状态
  107. var freqFirstPhr= phr.OrderByDescending(i => i.Timestamp).First();
  108. await _deviceCacheMgr.SetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno, freqFirstPhr);
  109. phrFreqstatus = await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  110. }
  111. /// phr PregnancyHeartRate 连续连续正常次数个值都是正常(大于等于triggerHighFreqLow,少于等于triggerHighFreqHig),
  112. /// 取连续正常次数正常值的平均值,推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
  113. #region 检查是否连续12个值都是正常的
  114. // 获取最近连续正常次数个心率记录
  115. var lastPhr = phr.OrderByDescending(i => i.Timestamp).Take(stopHighFreqSampleCount).ToList();
  116. // 检查是否连续12个值都是正常的
  117. if (lastPhr.All(i => i.PregnancyHeartRate >= triggerHighFreqLow && i.PregnancyHeartRate <= triggerHighFreqHigh))
  118. {
  119. var avgPhr = lastPhr.Select(i => i.PregnancyHeartRate).Average();
  120. // 计算一般心率得到胎心系数
  121. await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
  122. }
  123. #endregion
  124. }
  125. // 高频心率结束
  126. else
  127. {
  128. var phrFreqstatus = await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  129. if (phrFreqstatus != null)
  130. {
  131. /// 在highFreqSampleTimes=0一直异常(大于等于triggerHighFreqLow,少于等于triggerHighFreqHig),
  132. /// 取所有值的平均值,推送胎心数据到api/v1/open/OpenIot/SetFetalHeartRateConfig
  133. if (highFreqSampleTimes==0)
  134. {
  135. // if (phr.OrderByDescending(i => i.Timestamp)
  136. //.Where(i => i.Timestamp >= phrFreqstatus?.Timestamp)
  137. //.Skip(1) // 去除首条
  138. //.All(i => i.PregnancyHeartRate < triggerHighFreqLow || i.PregnancyHeartRate > triggerHighFreqHig))
  139. // {
  140. // var avgPhr = phr.Select(i => i.PregnancyHeartRate).Average();
  141. // // 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig
  142. // // 计算一般心率得到胎心系数
  143. // }
  144. var avgPhr = phr.OrderByDescending(i => i.Timestamp)
  145. .Where(i => i.Timestamp >= phrFreqstatus?.Timestamp)
  146. .Skip(1) // 去除首条
  147. .Where(i => i.PregnancyHeartRate < triggerHighFreqLow || i.PregnancyHeartRate > triggerHighFreqHigh)
  148. .Select(i => i.PregnancyHeartRate).Average();
  149. // 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig
  150. // 计算一般心率得到胎心系数
  151. await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
  152. }
  153. /// 在highFreqSampleTimes>0一直异常(大于等于triggerHighFreqLow,少于等于triggerHighFreqHig),
  154. /// 取所有值的平均值,推送胎心数据到api/v1/open/OpenIot/SetFetalHeartRateConfig
  155. if (highFreqSampleTimes > 0 && heartRate.LastUpdate >= (phrFreqstatus?.LastUpdate + TimeSpan.FromSeconds(highFreqSampleTimes)))
  156. {
  157. var avgPhr = phr
  158. .Where(i => i.Timestamp >= phrFreqstatus?.Timestamp)
  159. .Skip(1) // 去除首条
  160. .Where(i => i.PregnancyHeartRate < triggerHighFreqLow || i.PregnancyHeartRate > triggerHighFreqHigh)
  161. .Select(i => i.PregnancyHeartRate).Average();
  162. // 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig
  163. // 计算一般心率得到胎心系数
  164. await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
  165. }
  166. // 删除高频状态的首条记录
  167. await _deviceCacheMgr.DelPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  168. /// 设置高频状态
  169. _logger.LogInformation($"结束高频心率状态 timeDiffInSeconds {timeDiffInSeconds},highFreqSampleInterval:{highFreqSampleInterval}");
  170. }
  171. }
  172. #endregion
  173. #region 定时计算胎心数据触发器 {interval} 秒后
  174. var fetalKey = $"health_monitor/schedule_push/cal_fetal_heart_rate/imei/{heartRate.Serialno}";
  175. var scheduleFetalPush = await _serviceEtcd.GetValAsync(fetalKey).ConfigureAwait(false);
  176. if (string.IsNullOrWhiteSpace(scheduleFetalPush))
  177. {
  178. var fetalInterval = (int)watchConfig["interval"]!;
  179. var fetalNow= DateTime.Now;
  180. var fetalTimeNextRun = fetalNow.Add(TimeSpan.FromSeconds(fetalInterval));
  181. var fetalTTL = fetalInterval;
  182. var data = new
  183. {
  184. imei = heartRate.Serialno,
  185. create_time = fetalNow.ToString("yyyy-MM-dd HH:mm:ss"),
  186. fetalTTL,
  187. next_run_time = fetalTimeNextRun.ToString("yyyy-MM-dd HH:mm:ss")
  188. };
  189. var result = JsonConvert.SerializeObject(data);
  190. await _serviceEtcd.PutValAsync(fetalKey, result, fetalTTL, false).ConfigureAwait(false);
  191. }
  192. #endregion
  193. }
  194. #region 定时下发触发器(定时建模)
  195. var key = $"health_monitor/schedule_push/pregnancy_heart_rate/imei/{heartRate.Serialno}";
  196. var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false);
  197. if (string.IsNullOrWhiteSpace(schedule_push))
  198. {
  199. // 注册首次下推
  200. #if DEBUG
  201. // await _serviceEtcd.PutValAsync(key, result, 60*1, false).ConfigureAwait(false);
  202. var interval = 0;
  203. // 获取当前时间
  204. DateTime now = DateTime.Now;
  205. // 计算距离下一个$interval天后的8点的时间间隔
  206. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 1, 58).AddDays(interval);
  207. TimeSpan timeUntilNextRun = nextRunTime - now;
  208. // 如果当前时间已经超过了8点,将等待到明天后的8点
  209. if (timeUntilNextRun < TimeSpan.Zero)
  210. {
  211. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1));
  212. nextRunTime += timeUntilNextRun;
  213. }
  214. var ttl = (long)timeUntilNextRun.TotalSeconds;
  215. var data = new
  216. {
  217. imei = heartRate.Serialno,
  218. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  219. ttl,
  220. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  221. };
  222. var result = JsonConvert.SerializeObject(data);
  223. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  224. #else
  225. var interval = 0;
  226. // 获取当前时间
  227. DateTime now = DateTime.Now;
  228. var rand=new Random();
  229. var pushSec = rand.Next(59);
  230. int pushMin= int.TryParse(heartRate.Serialno.AsSpan(heartRate.Serialno.Length - 1), out pushMin) ? pushMin : 10;
  231. // 计算距离下一个$interval天后的8点的时间间隔
  232. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 18, pushMin, pushSec).AddDays(interval);
  233. TimeSpan timeUntilNextRun = nextRunTime - now;
  234. if (timeUntilNextRun < TimeSpan.Zero)
  235. {
  236. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1));
  237. nextRunTime += TimeSpan.FromDays(1);
  238. }
  239. var ttl =(long)timeUntilNextRun.TotalSeconds;
  240. var data = new
  241. {
  242. imei = heartRate.Serialno,
  243. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  244. ttl,
  245. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  246. };
  247. var result = JsonConvert.SerializeObject(data);
  248. await _serviceEtcd.PutValAsync(key, result,ttl, false).ConfigureAwait(false);
  249. #endif
  250. }
  251. #endregion
  252. }
  253. }
  254. private async Task SaveAndPushFetalHeartRateAsync(HisGpsHeartRate heartRate, int upperAlarmThreshold, int lowerAlarmThreshold, double avgPhr)
  255. {
  256. var commonPHR = await _serviceTDengine.InitPregnancyCommonHeartRateModeAsync(heartRate.Serialno);
  257. if (commonPHR != null)
  258. {
  259. // 保存到TDengine数据库
  260. await _serviceTDengine.InsertAsync<PregnancyCommonHeartRateModel>("hm_pchr", commonPHR);
  261. // 计算胎心=孕妇心率*系数
  262. var fetalHeartRate = SafeType.SafeInt(avgPhr * commonPHR?.StatModeAvgFprCoefficient!);
  263. var sampleTime = DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString();
  264. var isAbnormal = fetalHeartRate > upperAlarmThreshold ? 1 : (fetalHeartRate < lowerAlarmThreshold ? 2 : 0);
  265. // 保存到 数据服务 MySQL 数据库
  266. HisGpsFetalHeartRate gpsFetalHeartRate = new ()
  267. {
  268. FetalHeartRateId = Guid.NewGuid().ToString("D"),
  269. PersonId = commonPHR!.PersonId,
  270. Serialno = heartRate.Serialno,
  271. HeartRate = fetalHeartRate,
  272. SampleTime = sampleTime,
  273. IsAbnormal = isAbnormal,
  274. StatStartTime = commonPHR.StatStartTime,
  275. StatEndTime = commonPHR.StatEndTime,
  276. CreateTime = DateTime.Now,
  277. Method = 1,
  278. IsDisplay = 1,
  279. DeviceKey = commonPHR!.DeviceKey
  280. };
  281. await _hisFetalHeartApiClient.AddAsync(gpsFetalHeartRate).ConfigureAwait(false);
  282. // 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
  283. await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal);
  284. }
  285. }
  286. }
  287. }