You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

866 lines
49KB

  1. using Etcdserverpb;
  2. using Google.Protobuf.WellKnownTypes;
  3. using HealthMonitor.Common;
  4. using HealthMonitor.Common.helper;
  5. using HealthMonitor.Model.Service.Mapper;
  6. using HealthMonitor.Service.Biz;
  7. using HealthMonitor.Service.Biz.db;
  8. using HealthMonitor.Service.Cache;
  9. using HealthMonitor.Service.Etcd;
  10. using HealthMonitor.Service.MessageQueue;
  11. using HealthMonitor.Service.Resolver.Interface;
  12. using HealthMonitor.Service.Sub;
  13. using HealthMonitor.Service.Sub.Topic.Model;
  14. using Microsoft.EntityFrameworkCore.Metadata;
  15. using Microsoft.Extensions.Logging;
  16. using Newtonsoft.Json;
  17. using Newtonsoft.Json.Linq;
  18. using SqlSugar;
  19. using System;
  20. using System.Collections.Generic;
  21. using System.Linq;
  22. using System.Net;
  23. using System.Text;
  24. using System.Threading.Tasks;
  25. using TelpoDataService.Util.Clients;
  26. using TelpoDataService.Util.Entities.GpsCard;
  27. using TelpoDataService.Util.Entities.GpsLocationHistory;
  28. using TelpoDataService.Util.Models;
  29. using TelpoDataService.Util.QueryObjects;
  30. namespace HealthMonitor.Service.Resolver
  31. {
  32. public class PregnancyHeartRateResolver : IResolver
  33. {
  34. private readonly ILogger<PregnancyHeartRateResolver> _logger;
  35. private readonly TDengineService _serviceTDengine;
  36. private readonly DeviceCacheManager _deviceCacheMgr;
  37. private readonly IotApiService _serviceIotApi;
  38. private readonly AsyncLocal<string> _messageId = new();
  39. private readonly AsyncLocal<HisGpsHeartRate> _msgData = new();
  40. private readonly HttpHelper _httpHelper = default!;
  41. private readonly EtcdService _serviceEtcd;
  42. private readonly GpsLocationHistoryAccessorClient<HisGpsFetalHeartRate> _hisFetalHeartApiClient;
  43. private readonly GpsLocationHistoryAccessorClient<HisGpsFetalMovement> _hisFetalMovementApiClient;
  44. private readonly FetalMovementNormalValueRangeCacheManager _mgrFetalMovementNormalValueRangeCache;
  45. private readonly MqProcessLogic _serviceMqProcess;
  46. private static int[] SCHEDULE_HOUR = new int[] { 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24 };
  47. public PregnancyHeartRateResolver(ILogger<PregnancyHeartRateResolver> logger,
  48. HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr,
  49. MqProcessLogic serviceMqProcess,
  50. IotApiService iotApiService, TDengineService serviceDengine, FetalMovementNormalValueRangeCacheManager fetalMovementNormalValueRangeCacheMgr,
  51. GpsLocationHistoryAccessorClient<HisGpsFetalHeartRate> hisFetalHeartApiClient,
  52. GpsLocationHistoryAccessorClient<HisGpsFetalMovement> hisFetalMovementApiClient
  53. )
  54. {
  55. _logger = logger;
  56. _httpHelper = httpHelper;
  57. _serviceEtcd = serviceEtcd;
  58. _serviceTDengine = serviceDengine;
  59. _deviceCacheMgr = deviceCacheMgr;
  60. _serviceIotApi = iotApiService;
  61. _serviceMqProcess = serviceMqProcess;
  62. _hisFetalHeartApiClient = hisFetalHeartApiClient;
  63. _hisFetalMovementApiClient = hisFetalMovementApiClient;
  64. _mgrFetalMovementNormalValueRangeCache = fetalMovementNormalValueRangeCacheMgr;
  65. }
  66. public void SetResolveInfo(PackageMsgModel msg)
  67. {
  68. var topicHmPregnancyHeartRate = JsonConvert.DeserializeObject<TopicHmPregnancyHeartRate>(msg.DetailData.ToString()!);
  69. _messageId.Value = msg.MessageId;
  70. _msgData.Value = new HisGpsHeartRate()
  71. {
  72. HeartRateId = topicHmPregnancyHeartRate!.PregnancyHeartRateId,
  73. MessageId = topicHmPregnancyHeartRate!.MessageId,
  74. Serialno = topicHmPregnancyHeartRate!.Serialno,
  75. HeartRate = topicHmPregnancyHeartRate.PregnancyHeartRate,
  76. LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmPregnancyHeartRate.LastUpdate) / 1000000),
  77. CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmPregnancyHeartRate.CreateTime) / 1000000),
  78. Method = topicHmPregnancyHeartRate!.Method,
  79. IsDisplay = topicHmPregnancyHeartRate!.IsDisplay ? 1 : 0
  80. };
  81. }
  82. public override string ToString()
  83. {
  84. return $"{nameof(PregnancyHeartRateResolver)}[{_messageId.Value}]";
  85. }
  86. public async Task ExecuteMessageAsync()
  87. {
  88. var messageId = _messageId.Value;
  89. var heartRate = _msgData.Value!;
  90. try
  91. {
  92. var watchConfig = await _deviceCacheMgr.GetGpsDeviceWatchConfigCacheObjectBySerialNoAsync(heartRate.Serialno, "0067");
  93. var isFetalHeartEnable = watchConfig != null && (int)watchConfig["enabled"]! == 1;
  94. if (isFetalHeartEnable)
  95. {
  96. //_logger.LogInformation($"{heartRate.Serialno} 计算胎心胎动启动");
  97. #region 定时下发触发器(定时建模)
  98. var key = $"health_monitor/schedule_push/pregnancy_heart_rate/imei/{heartRate.Serialno}";
  99. var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false);
  100. if (string.IsNullOrWhiteSpace(schedule_push))
  101. {
  102. // 注册首次下推
  103. var interval = 0;
  104. // 获取当前时间
  105. DateTime now = DateTime.Now;
  106. var rand = new Random();
  107. var pushSec = rand.Next(59);
  108. int pushMin = int.TryParse(heartRate.Serialno.AsSpan(heartRate.Serialno.Length - 1), out pushMin) ? pushMin : 10;
  109. // 计算距离下一个$interval天后的8点的时间间隔
  110. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 6, pushMin, pushSec).AddDays(interval);
  111. TimeSpan timeUntilNextRun = nextRunTime - now;
  112. if (timeUntilNextRun < TimeSpan.Zero)
  113. {
  114. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1));
  115. nextRunTime += TimeSpan.FromDays(1);
  116. }
  117. var ttl = (long)timeUntilNextRun.TotalSeconds;
  118. var data = new
  119. {
  120. imei = heartRate.Serialno,
  121. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  122. ttl,
  123. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  124. };
  125. var result = JsonConvert.SerializeObject(data);
  126. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  127. }
  128. #endregion
  129. //_logger.LogInformation($"{heartRate.Serialno} 触发定时建模");
  130. // 高频心率采样间隔 highFreqSampleInterval = highFreqSampleInterval 最小highFreqSampleInterval=60)
  131. var highFreqSampleInterval = (int)watchConfig!["highFreqSampleInterval"]! >= 60 ? (int)watchConfig!["highFreqSampleInterval"]!:60;
  132. // 触发高频监测的心率上限值
  133. var triggerHighFreqHigh = (int)watchConfig["triggerHighFreqHigh"]!;
  134. // 触发高频监测的心率下限值
  135. var triggerHighFreqLow = (int)watchConfig["triggerHighFreqLow"]!;
  136. // 暂时处理
  137. triggerHighFreqLow = triggerHighFreqLow > 60 && triggerHighFreqLow < 67 ? 60 : triggerHighFreqLow;
  138. //停止高频心率采样心率连续正常次数
  139. var stopHighFreqSampleCount = (int)watchConfig["stopHighFreqSampleCount"]!;
  140. // 高频心率采集时长 0 为持续采集,非零为高频心率的采集时长
  141. var highFreqSampleTimes = (int)watchConfig["highFreqSampleTimes"]!;
  142. // 告警上限阀值
  143. var upperAlarmThreshold = (int)watchConfig["upperAlarmThreshold"]!;
  144. // 告警下限阀值
  145. var lowerAlarmThreshold = (int)watchConfig["lowerAlarmThreshold"]!;
  146. // EDOC
  147. var edoc = DateTimeUtil.ToDateTime(watchConfig!["EDOC"]!.ToString());
  148. // interval (分钟) 固定15分钟
  149. var intervalFHR = 15;//int)watchConfig["interval"]!;
  150. var daysPhr = await _serviceTDengine.GetBySerialNoAsync<PregnancyHeartRateModel>(heartRate.Serialno, 7);
  151. var phr = daysPhr
  152. .Where(p => p.LastUpdate <= heartRate.LastUpdate)
  153. .ToList();
  154. var nonFreqPhr = GetNonFreqPregnancyHeartRate(phr, highFreqSampleInterval);
  155. if (nonFreqPhr.Count >= 30)
  156. {
  157. #region 定时计算胎动数据触发器两小时间隔开始
  158. var fetalMovementKey = $"health_monitor/schedule_push/cal_fetal_movement/imei/{heartRate.Serialno}";
  159. ///// 计算 0 点秒数
  160. var fetalMovementLastUpdate = (DateTime)heartRate.LastUpdate!;
  161. DateTime fmScheduleNow = DateTime.Now;
  162. // 小于两小时
  163. if (fmScheduleNow > fetalMovementLastUpdate && (fmScheduleNow - fetalMovementLastUpdate).TotalHours <= 2)
  164. {
  165. var rand = new Random();
  166. var pushSec = rand.Next(59);
  167. int pushMin = int.TryParse(heartRate.Serialno.AsSpan(heartRate.Serialno.Length - 1), out pushMin) ? pushMin : 10;
  168. var scheduleHourDiff = SCHEDULE_HOUR
  169. .Where(h => h > fetalMovementLastUpdate.Hour)
  170. .OrderBy(h => h - fetalMovementLastUpdate.Hour)
  171. .FirstOrDefault() - fetalMovementLastUpdate.Hour;
  172. var scheduleTime = fetalMovementLastUpdate.AddHours(scheduleHourDiff);
  173. DateTime nextRunTime = new(scheduleTime.Year, scheduleTime.Month, scheduleTime.Day, scheduleTime.Hour, pushMin, pushSec);
  174. TimeSpan timeUntilNextRun = nextRunTime - fmScheduleNow;
  175. var ttl = (long)timeUntilNextRun.TotalSeconds;
  176. await SetIntervalTriggerAsync(fetalMovementKey, heartRate.Serialno, ttl, heartRate);
  177. }
  178. #endregion
  179. #region 计算胎心数据(按心率时间LastUpdate)
  180. var commonPHR = await _serviceTDengine.GetLastAsync<PregnancyCommonHeartRateModel>(heartRate.Serialno);
  181. if (commonPHR == null)
  182. {
  183. // 处理孕妇业务,计算一般心率并下发
  184. commonPHR = await _serviceTDengine.InitPregnancyCommonHeartRateModeAsync(heartRate.Serialno, highFreqSampleInterval: highFreqSampleInterval);
  185. // 建模完成
  186. var flag = await _serviceIotApi.SetFetalConfig(heartRate.Serialno, 1, commonPHR!.MaxValue, commonPHR!.MinValue);
  187. _logger.LogInformation($"{heartRate.Serialno} 记录数量足够,建模完成");
  188. // 保存到TDengine数据库
  189. await _serviceTDengine.InsertAsync<PregnancyCommonHeartRateModel>("hm_pchr", commonPHR!);
  190. _logger.LogInformation($"保存TDengine完成");
  191. }
  192. // 获取最近的两个记录,并计算它们的 LastUpdate 时间差
  193. var firstTwoPhr = phr.OrderByDescending(i => i.LastUpdate).Take(2).Select(i => i.LastUpdate).ToList();
  194. var timeDiff = firstTwoPhr[0] - firstTwoPhr[1];
  195. // 如果需要,将时间差转换为秒
  196. var timeDiffInSeconds = timeDiff.TotalSeconds;
  197. // 高频统计结束时间
  198. var FreqStatsEnd = DateTime.Now;
  199. // 高频心率启动(在高频第二条才能判断)
  200. if (timeDiffInSeconds <= highFreqSampleInterval)
  201. {
  202. var phrFreqstatus = await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  203. if (phrFreqstatus == null)
  204. {
  205. /// 设置高频状态
  206. _logger.LogInformation($"{heartRate.Serialno} 进入高频心率启动状态 timeDiffInSeconds {timeDiffInSeconds},highFreqSampleInterval:{highFreqSampleInterval}");
  207. // 设置高频状态
  208. _logger.LogInformation($"{heartRate.Serialno} phr.Count {phr.Count}");
  209. var freqFirstPhr = phr.OrderByDescending(i => i.LastUpdate)
  210. .Skip(1) //在高频第二条才能判断,所以要去除本条记录
  211. .First();
  212. await _deviceCacheMgr.SetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno, freqFirstPhr);
  213. _logger.LogInformation($"{heartRate.Serialno} 设置高频状态");
  214. phrFreqstatus = await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  215. // 通过续租 延时计算高频心率的胎心
  216. }
  217. /// phr PregnancyHeartRate 连续连续正常次数个值都是正常(大于等于triggerHighFreqLow,少于等于triggerHighFreqHig),
  218. /// 取连续正常次数正常值的平均值,推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
  219. #region 检查高频状态是否连续12个心率值都是正常的
  220. // 获取最近连续正常次数个心率记录
  221. //_logger.LogInformation($"{heartRate.Serialno} 设置 stopHighFreqSampleCount {stopHighFreqSampleCount}");
  222. //var lastPhr = phr.OrderByDescending(i => i.LastUpdate).Take(stopHighFreqSampleCount).ToList();
  223. var lastPhr = phr.Where(i => i.LastUpdate >= phrFreqstatus!.LastUpdate)
  224. .OrderByDescending(i => i.LastUpdate).Take(stopHighFreqSampleCount).ToList();
  225. _logger.LogInformation($"{heartRate.Serialno} lastPhr.Count {lastPhr.Count},stopHighFreqSampleCount {stopHighFreqSampleCount}");
  226. _logger.LogInformation($"{heartRate.Serialno} count :{lastPhr.Count >= stopHighFreqSampleCount}");
  227. _logger.LogInformation($"{heartRate.Serialno} All {lastPhr.All(i => i.PregnancyHeartRate >= triggerHighFreqLow && i.PregnancyHeartRate <= triggerHighFreqHigh)}");
  228. // 检查是否连续12个值都是正常的
  229. if ((lastPhr.Count >= stopHighFreqSampleCount) &&
  230. lastPhr.All(i => i.PregnancyHeartRate >= triggerHighFreqLow && i.PregnancyHeartRate <= triggerHighFreqHigh)
  231. )
  232. {
  233. var avgPhr = lastPhr.Select(i => i.PregnancyHeartRate).Average();
  234. // 计算一般心率得到胎心系数
  235. //await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString());
  236. // 高频数据不建模
  237. FreqStatsEnd = (DateTime)heartRate.LastUpdate!;
  238. _logger.LogInformation($"{heartRate.Serialno} 高频状态已经持续{(FreqStatsEnd - phrFreqstatus!.LastUpdate).TotalSeconds} 秒,连续 {stopHighFreqSampleCount} 次采样心率正常,将下发指令");
  239. var freqSaveAndPushFetalHeartRate = await _deviceCacheMgr.GetBizIntervalAsync(heartRate.Serialno, "SaveAndPushFetalHeartRate");
  240. // 高频不停,15分钟内只下发一条
  241. if (string.IsNullOrEmpty(freqSaveAndPushFetalHeartRate))
  242. {
  243. await SaveAndPushFetalHeartRateAsync(heartRate, commonPHR, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(phrFreqstatus!.LastUpdate).ToString(), phrFreqstatus!.LastUpdate, FreqStatsEnd);
  244. // 删除高频状态的首条记录
  245. await _deviceCacheMgr.DelPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  246. // 设置15分的SaveAndPushFetalHeartRate业务间隔
  247. await _deviceCacheMgr.SetBizIntervalAsync(heartRate.Serialno, "SaveAndPushFetalHeartRate");
  248. _logger.LogInformation($"{heartRate.Serialno} 连续 {stopHighFreqSampleCount} 次采样心率正常,结束高频心率状态, timeDiffInSeconds {timeDiffInSeconds},highFreqSampleInterval:{highFreqSampleInterval},高频状态持续{((DateTime)heartRate.LastUpdate - phrFreqstatus!.LastUpdate).TotalSeconds} 秒");
  249. }
  250. else
  251. {
  252. _logger.LogWarning($"{heartRate.Serialno} 连续 {stopHighFreqSampleCount} 次采样心率正常,设备端应该结束高频状态,但设备端没有结束高频,平台高频15分钟内已经下发过指令");
  253. }
  254. }
  255. else
  256. {
  257. _logger.LogInformation($"{heartRate.Serialno} 处于高频状态...");
  258. }
  259. #endregion
  260. }
  261. // 高频心率结束或常规心率
  262. else
  263. {
  264. var phrFreqstatus = await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  265. // 高频心率结束
  266. if (phrFreqstatus != null)
  267. {
  268. /// 在highFreqSampleTimes=0一直异常(大于等于triggerHighFreqLow,少于等于triggerHighFreqHig),
  269. /// 取所有值的平均值,推送胎心数据到api/v1/open/OpenIot/SetFetalHeartRateConfig
  270. if (highFreqSampleTimes == 0)
  271. {
  272. var avgPhr = phr.OrderByDescending(i => i.LastUpdate)
  273. .Where(i => i.LastUpdate >= phrFreqstatus?.LastUpdate)
  274. .Skip(1) // 去除首条
  275. .Where(i => i.PregnancyHeartRate < triggerHighFreqLow || i.PregnancyHeartRate > triggerHighFreqHigh)
  276. .Select(i => i.PregnancyHeartRate).Average();
  277. // 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig
  278. // 计算一般心率得到胎心系数
  279. // 高频数据不建模
  280. FreqStatsEnd = firstTwoPhr[1];
  281. _logger.LogInformation($"{heartRate.Serialno} 高频状态已经持续{(FreqStatsEnd - phrFreqstatus!.LastUpdate).TotalSeconds} 秒,highFreqSampleTimes={highFreqSampleTimes}秒,即将结束高频状态,将下发指令");
  282. //await SaveAndPushFreqFetalHeartRateAsync(heartRate, commonPHR, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(phrFreqstatus!.LastUpdate).ToString());
  283. await SaveAndPushFetalHeartRateAsync(heartRate, commonPHR, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(phrFreqstatus!.LastUpdate).ToString(), phrFreqstatus!.LastUpdate, FreqStatsEnd);
  284. }
  285. /// 在highFreqSampleTimes>0一直异常(大于等于triggerHighFreqLow,少于等于triggerHighFreqHig),
  286. /// 取所有值的平均值,推送胎心数据到api/v1/open/OpenIot/SetFetalHeartRateConfig
  287. if (highFreqSampleTimes > 0 && heartRate.LastUpdate >= (phrFreqstatus?.LastUpdate.AddSeconds(highFreqSampleTimes)))
  288. {
  289. // 获取高频心率数据个数
  290. var filterPhr = phr
  291. .Where(i => i.LastUpdate >= phrFreqstatus?.LastUpdate)
  292. .Skip(1)
  293. .ToList();
  294. _logger.LogInformation($"{heartRate.Serialno} 高频周期 {phrFreqstatus?.LastUpdate.ToString("yyyy-MM-dd HH:mm:ss")}--{firstTwoPhr[1].ToString("yyyy-MM-dd HH:mm:ss")} 产生高频心率数量 {filterPhr.Count} 条");
  295. // 高频心率数据大于stopHighFreqSampleCount/12个才计算胎心数据
  296. //
  297. if (filterPhr.Count > stopHighFreqSampleCount)
  298. {
  299. FreqStatsEnd = firstTwoPhr[1];
  300. var avgPhr = filterPhr
  301. .OrderByDescending(i => i.LastUpdate)
  302. .Take(stopHighFreqSampleCount) // 计算最后12条
  303. .Select(i => i.PregnancyHeartRate).Average();
  304. _logger.LogInformation($"{heartRate.Serialno} 高频状态已经持续{(FreqStatsEnd - phrFreqstatus!.LastUpdate).TotalSeconds} 秒,超过约定的 {highFreqSampleTimes} 秒,即将结束高频状态,将下发指令");
  305. //计算高频
  306. await SaveAndPushFetalHeartRateAsync(heartRate, commonPHR, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(phrFreqstatus!.LastUpdate).ToString(), phrFreqstatus!.LastUpdate, FreqStatsEnd);
  307. }
  308. else
  309. {
  310. _logger.LogInformation($"{heartRate.Serialno} 高频心率的数据不大于{stopHighFreqSampleCount}条,不进行高频数据的胎心计算");
  311. }
  312. // 删除高频状态的首条记录
  313. await _deviceCacheMgr.DelPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  314. // 计算本次常规心率的胎心数据(高频结束后,实时处理)
  315. await CalculateNormalFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, intervalFHR, commonPHR);
  316. }
  317. ///不满足持续10分钟highFreqSampleTimes或出现时间倒叙
  318. ///
  319. else
  320. {
  321. // 高频结束后与常规的心率时间倒叙
  322. if ((firstTwoPhr[1] - phrFreqstatus!.LastUpdate).TotalSeconds < 0)
  323. {
  324. _logger.LogInformation($"{heartRate.Serialno} 高频结束出现时间倒叙,计算当条心率创建之前的高频心率");
  325. #region 计算当条心率创建之前的高频心率
  326. // 取得高频之后的所有数据
  327. var phrFlashBack = daysPhr.Where(p => p.LastUpdate >= phrFreqstatus!.LastUpdate)
  328. .OrderByDescending(i => i.LastUpdate);
  329. // 取得高频数据
  330. var freqCollection = phrFlashBack.ToList();
  331. //var freqCollection = new List<PregnancyHeartRateModel>();
  332. //PregnancyHeartRateModel? previousItem = null;
  333. //foreach (var item in phrFlashBack)
  334. //{
  335. // if (previousItem != null)
  336. // {
  337. // var timeNextDiff = (previousItem!.LastUpdate - item.LastUpdate).TotalSeconds;
  338. // if (timeNextDiff <= highFreqSampleInterval)
  339. // {
  340. // freqCollection.Add(item);
  341. // }
  342. // }
  343. // previousItem = item;
  344. //}
  345. _logger.LogInformation($"{heartRate.Serialno} 高频数据个数{freqCollection.Count},触发高频开始时间{phrFreqstatus!.LastUpdate}");
  346. if (freqCollection.Count > stopHighFreqSampleCount)
  347. {
  348. // 计算高频产生的胎心
  349. var avgPhr = freqCollection
  350. .OrderByDescending(i => i.LastUpdate)
  351. .Take(stopHighFreqSampleCount) // 计算最后12条
  352. .Select(i => i.PregnancyHeartRate).Average();
  353. await SaveAndPushFetalHeartRateAsync(heartRate, commonPHR, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(phrFreqstatus!.LastUpdate).ToString(), freqCollection.Last().LastUpdate, freqCollection.First().LastUpdate);
  354. }
  355. else
  356. {
  357. _logger.LogInformation($"{heartRate.Serialno} 时间倒叙触发计算高频心率的数据不足{stopHighFreqSampleCount}条,不进行胎心计算");
  358. }
  359. #endregion
  360. }
  361. else
  362. {
  363. _logger.LogInformation($"{heartRate.Serialno} 高频持续时间不足{highFreqSampleTimes},只持续{(firstTwoPhr[1] - phrFreqstatus!.LastUpdate).TotalSeconds} 秒");
  364. }
  365. // 删除高频状态的首条记录
  366. await _deviceCacheMgr.DelPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  367. // 计算本次常规心率的胎心数据(高频结束后,实时处理)
  368. await CalculateNormalFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, intervalFHR, commonPHR);
  369. }
  370. // 删除高频状态的首条记录
  371. await _deviceCacheMgr.DelPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  372. _logger.LogInformation($"{heartRate.Serialno} 超时结束高频心率状态 timeDiffInSeconds {timeDiffInSeconds},highFreqSampleInterval:{highFreqSampleInterval},高频状态持续{(firstTwoPhr[1] - phrFreqstatus!.LastUpdate).TotalSeconds} 秒");
  373. // 计算本次常规心率的胎心数据(高频结束后,实时处理)
  374. //await CalculateNormalFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, intervalFHR, commonPHR);
  375. //// 使用延后计算
  376. //var fhrScheduleKey = $"health_monitor/schedule_push/cal_fetal_heart_rate/imei/{heartRate.Serialno}";
  377. //var fhrScheduleTTL = 60;
  378. //await SetIntervalTriggerAsync(fhrScheduleKey, heartRate.Serialno, fhrScheduleTTL, heartRate);
  379. }
  380. // 常规心率(本次心率可能是高频心率的首条,所以要使用延后计算胎心率)
  381. else
  382. {
  383. // 计算本次常规心率的胎心数据
  384. //await CalculateNormalFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, intervalFHR, commonPHR);
  385. // 本次心率可能是高频心率的首条,所以要使用本次常规心率延后计算胎心率
  386. //var fhrScheduleKey = $"health_monitor/schedule_push/cal_fetal_heart_rate/imei/{heartRate.Serialno}";
  387. //var fhrScheduleTTL = 30;
  388. //await SetIntervalTriggerAsync(fhrScheduleKey, heartRate.Serialno, fhrScheduleTTL, heartRate);
  389. //_logger.LogInformation($"{heartRate.Serialno} 延时50秒,判断当前数据是否为高频首条");
  390. // 本次心率可能是高频心率的首条,所以要使用本次常规心率延后计算胎心率 查询30秒后是否有高频缓存
  391. Thread thread = new(async () =>
  392. {
  393. try
  394. {
  395. #region 休眠2秒
  396. var startTime = DateTime.Now;
  397. //var highFreqSampleInterval2 = (int)watchConfig!["highFreqSampleInterval"]!+5;
  398. var highFreqSampleInterval2 = highFreqSampleInterval;
  399. var during = TimeSpan.FromSeconds(highFreqSampleInterval2);
  400. while (true)
  401. {
  402. if (DateTime.Now - startTime > during)
  403. {
  404. break;
  405. }
  406. await Task.Delay(TimeSpan.FromSeconds(1));
  407. }
  408. #endregion
  409. var phrFreqstatus = await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  410. _logger.LogInformation($"phrFreqstatus==null:{phrFreqstatus == null}");
  411. _logger.LogInformation($"phrFreqstatus.LastUpdate < heartRate.LastUpdate:{phrFreqstatus?.LastUpdate < heartRate.LastUpdate}");
  412. if (phrFreqstatus == null || phrFreqstatus.LastUpdate < heartRate.LastUpdate)
  413. {
  414. await CalculateNormalFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, intervalFHR, commonPHR);
  415. _logger.LogInformation($"{heartRate.Serialno} 计算常规心率");
  416. }
  417. }
  418. catch (Exception ex)
  419. {
  420. _logger.LogError($"处理延时计算异常:{ex.Message}, {ex.StackTrace}");
  421. }
  422. });
  423. thread.Start();
  424. }
  425. }
  426. #endregion
  427. }
  428. else
  429. {
  430. _logger.LogInformation($"{heartRate.Serialno} 记录不足30条,建模中");
  431. }
  432. }
  433. }
  434. catch (Exception ex)
  435. {
  436. _logger.LogError($"{heartRate.Serialno} 处理孕妇心率数据异常 \n{ex.Message}\n{ex.StackTrace}");
  437. }
  438. }
  439. /// <summary>
  440. /// 常规心率计算胎心数据
  441. /// </summary>
  442. /// <param name="heartRate"></param>
  443. /// <param name="upperAlarmThreshold"></param>
  444. /// <param name="lowerAlarmThreshold"></param>
  445. /// <param name="intervalFHR"></param>
  446. /// <param name="commonPHR"></param>
  447. /// <returns></returns>
  448. private async Task CalculateNormalFetalHeartRateAsync(HisGpsHeartRate heartRate, int upperAlarmThreshold, int lowerAlarmThreshold, int intervalFHR, PregnancyCommonHeartRateModel? commonPHR)
  449. {
  450. // 上15分钟的数据
  451. // 获取当前时间
  452. DateTime nowInterval = (DateTime)heartRate.LastUpdate!;
  453. if (nowInterval.Second > 0)
  454. {
  455. nowInterval = nowInterval.AddMinutes(1);
  456. }
  457. // 计算last_update到上一间隔的分钟数
  458. int minutesToSubtract = nowInterval.Minute % intervalFHR;
  459. // 计算上一间隔的时间
  460. DateTime previousInterval = nowInterval.AddMinutes(-minutesToSubtract).AddSeconds(-nowInterval.Second).AddMilliseconds(-nowInterval.Millisecond);
  461. // 使用 last_update 上一刻
  462. var sampleTimeFHR = DateTimeUtil.ConvertToTimeStamp(previousInterval).ToString();
  463. // 计算last_update到下一间隔的分钟数
  464. int minutesToAdd = intervalFHR - (nowInterval.Minute % intervalFHR);
  465. if (minutesToAdd == intervalFHR)
  466. {
  467. minutesToAdd = 0; // 如果已经是间隔,则不需要增加分钟
  468. }
  469. // 计算下一间隔的时间
  470. DateTime nextInterval = nowInterval.AddMinutes(minutesToAdd)
  471. .AddSeconds(-nowInterval.Second)
  472. .AddMilliseconds(-nowInterval.Millisecond);
  473. var daysPhr = await _serviceTDengine.GetBySerialNoAsync<PregnancyHeartRateModel>(heartRate.Serialno, 7);
  474. var normalPhrStatStartTime = nextInterval.AddMinutes(-intervalFHR);
  475. var normalPhrStatEndTime = nextInterval;
  476. _logger.LogInformation($"{heartRate.Serialno} 计算胎心数据, 周期:{normalPhrStatStartTime}-{normalPhrStatEndTime} ");
  477. var filteredPhr = daysPhr
  478. // 使用 last_update 下一刻
  479. .Where(i => i.LastUpdate <= normalPhrStatEndTime && i.LastUpdate >= normalPhrStatStartTime)
  480. .ToList();
  481. if (filteredPhr.Count == 0)
  482. {
  483. _logger.LogWarning($"{heartRate.Serialno} 周期:{normalPhrStatStartTime}-{normalPhrStatEndTime} 孕妇心率数据不足,{filteredPhr.Count}条记录");
  484. return;
  485. }
  486. var phrValue = filteredPhr.Count == 1
  487. ? filteredPhr.First().PregnancyHeartRate
  488. : filteredPhr.Average(i => i.PregnancyHeartRate);
  489. //await SaveAndPushFreqFetalHeartRateAsync(heartRate, commonPHR!, upperAlarmThreshold, lowerAlarmThreshold, phrValue, sampleTimeFHR);
  490. await SaveAndPushFetalHeartRateAsync(heartRate, commonPHR!, upperAlarmThreshold, lowerAlarmThreshold, phrValue, sampleTimeFHR, normalPhrStatStartTime, normalPhrStatEndTime);
  491. }
  492. /// <summary>
  493. ///
  494. /// </summary>
  495. /// <param name="heartRate"></param>
  496. /// <param name="commonPHR"></param>
  497. /// <param name="upperAlarmThreshold"></param>
  498. /// <param name="lowerAlarmThreshold"></param>
  499. /// <param name="phrValue"></param>
  500. /// <param name="sampleTime"></param>
  501. /// <param name="statStartTime"></param>
  502. /// <param name="statEndTime"></param>
  503. /// <returns></returns>
  504. private async Task SaveAndPushFetalHeartRateAsync(HisGpsHeartRate heartRate, PregnancyCommonHeartRateModel commonPHR, int upperAlarmThreshold, int lowerAlarmThreshold, double phrValue, string sampleTime, DateTime statStartTime, DateTime statEndTime)
  505. {
  506. // 计算胎心=孕妇心率*系数
  507. /**
  508. var fetalHeartRate = SafeType.SafeInt(phrValue * commonPHR?.StatModeAvgFprCoefficient!);
  509. fetalHeartRate = fetalHeartRate > 220 ? 220 : fetalHeartRate; // 胎心的最大值调整为220,超过都按该值220输出
  510. if (fetalHeartRate >= 220)
  511. {
  512. // 先使用最小系数计算
  513. var statMaxValueFprCoefficient = commonPHR?.StatMaxValueFprCoefficient!;
  514. var statMinValueFprCoefficient = commonPHR?.StatMinValueFprCoefficient!;
  515. var coefficient = statMaxValueFprCoefficient < statMinValueFprCoefficient ? statMaxValueFprCoefficient : statMinValueFprCoefficient;
  516. fetalHeartRate = SafeType.SafeInt(phrValue * coefficient);
  517. if (fetalHeartRate < 220)
  518. {
  519. _logger.LogWarning($"{heartRate.Serialno} 使用极值系数 {coefficient} ,建模数据可能出现异常,请检查");
  520. }
  521. else
  522. {
  523. fetalHeartRate = 220;
  524. _logger.LogWarning($"{heartRate.Serialno} 使用所有系数都不能放映实际,建模数据可能出现异常,请检查");
  525. }
  526. }
  527. */
  528. #region 胎心系数使用基于心率与中位数对比
  529. var coefficient = 0f;
  530. // 孕妇心率少于中位数,取StatMinValueFprCoefficient
  531. if (heartRate.HeartRate < commonPHR!.Mode)
  532. {
  533. coefficient = commonPHR.StatMinValueFprCoefficient!;
  534. _logger.LogInformation($"{heartRate.Serialno} 孕妇心率少于中位数,使用最小值系数 {coefficient}");
  535. }
  536. // 孕妇心率大于中位数,取StatMaxValueFprCoefficient与StatModeAvgFprCoefficient中少的那个
  537. else if (heartRate.HeartRate > commonPHR.Mode)
  538. {
  539. if (commonPHR.StatModeAvgFprCoefficient > commonPHR.StatMaxValueFprCoefficient)
  540. {
  541. coefficient = commonPHR.StatMaxValueFprCoefficient!;
  542. _logger.LogInformation($"{heartRate.Serialno} 孕妇心率大于中位数,使用最大值系数 {coefficient}");
  543. }
  544. else
  545. {
  546. coefficient = commonPHR.StatModeAvgFprCoefficient!;
  547. _logger.LogInformation($"{heartRate.Serialno} 孕妇心率大于中位数,使用均值系数 {coefficient}");
  548. }
  549. }
  550. else
  551. {
  552. coefficient = commonPHR.StatModeAvgFprCoefficient;
  553. _logger.LogInformation($"{heartRate.Serialno} 孕妇心率等于中位数,使用均值系数 {coefficient}");
  554. }
  555. #endregion
  556. var fetalHeartRate = SafeType.SafeInt(phrValue * coefficient);
  557. // 胎心的最大值调整为220,超过都按该值220输出
  558. fetalHeartRate = fetalHeartRate>= 220 ? 220 : fetalHeartRate;
  559. var isAbnormal = fetalHeartRate > upperAlarmThreshold ? 1 : (fetalHeartRate < lowerAlarmThreshold ? 2 : 0);
  560. var phrFreqstatus = await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno);
  561. if (phrFreqstatus == null) isAbnormal = 0;
  562. var statsusDesc = (phrFreqstatus == null) ? "常规" : "高频";
  563. _logger.LogInformation($"{heartRate.Serialno} 在 {statsusDesc} 状态,生成胎心值:{fetalHeartRate},统计周期:{statStartTime.ToString("yyyy-MM-dd HH:mm:ss")}----{statEndTime.ToString("yyyy-MM-dd HH:mm:ss")}");
  564. //if (!isFreq)
  565. //{
  566. // statStartTime = heartRate.LastUpdate;
  567. //
  568. //}
  569. // 保存到 数据服务 MySQL 数据库
  570. HisGpsFetalHeartRate gpsFetalHeartRate = new()
  571. {
  572. FetalHeartRateId = Guid.NewGuid().ToString("D"),
  573. PersonId = commonPHR!.PersonId,
  574. Serialno = heartRate.Serialno,
  575. HeartRate = fetalHeartRate,
  576. SampleTime = sampleTime.Length > 10 ? sampleTime.Substring(0, 10) : sampleTime,
  577. IsAbnormal = isAbnormal,
  578. StatStartTime = statStartTime,
  579. StatEndTime = statEndTime,//commonPHR.StatEndTime,
  580. CreateTime = DateTime.Now,
  581. Method = 1,
  582. IsDisplay = 1,
  583. DeviceKey = commonPHR!.DeviceKey
  584. };
  585. await _hisFetalHeartApiClient.AddAsync(gpsFetalHeartRate).ConfigureAwait(false);
  586. // 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
  587. // 推送最后一条常规心率计算的胎心数据到iot设备
  588. #region 推送最后一条常规心率计算的胎心数据到iot设备
  589. // 高频(<=12)-常规
  590. var lastPhr = await _serviceTDengine.GetLastAsync<PregnancyHeartRateModel>(heartRate.Serialno);
  591. if (lastPhr.MessageId == heartRate.MessageId && phrFreqstatus == null)
  592. {
  593. await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal);
  594. _logger.LogInformation($"{heartRate.Serialno} 推送最后一条常规心率计算的胎心数据到iot设备,高频(<=12)-常规");
  595. }
  596. // 高频(13)-常规-高频(13)
  597. if (phrFreqstatus != null)
  598. {
  599. var phr = await _serviceTDengine.GetBySerialNoAsync<PregnancyHeartRateModel>(heartRate.Serialno, 1);
  600. phr = phr.OrderByDescending(i => i.LastUpdate).ToList();
  601. // 获取高频数据
  602. var freqCollection = new List<PregnancyHeartRateModel>();
  603. PregnancyHeartRateModel? previousItem = null;
  604. foreach (var item in phr)
  605. {
  606. if (previousItem != null)
  607. {
  608. var timeNextDiff = (previousItem!.LastUpdate - item.LastUpdate).TotalSeconds;
  609. if (timeNextDiff <= 60)
  610. {
  611. freqCollection.Add(item);
  612. }
  613. }
  614. // 高频最后一条
  615. if (lastPhr.MessageId == item.MessageId)
  616. {
  617. freqCollection.Add(item);
  618. }
  619. previousItem = item;
  620. }
  621. //去除高频
  622. foreach (var item in freqCollection)
  623. {
  624. phr.Remove(item);
  625. }
  626. lastPhr = phr.FirstOrDefault();
  627. if (lastPhr?.MessageId == heartRate.MessageId)
  628. {
  629. await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal);
  630. _logger.LogInformation($"{heartRate.Serialno} 推送最后一条常规心率计算的胎心数据到iot设备,高频(13)-常规-高频(13)");
  631. }
  632. }
  633. #endregion
  634. var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
  635. var fhrMsgId = $"{heartRate.Serialno}-{sampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
  636. var fhrMsgTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTime.Length < 13 ? sampleTime.PadRight(13, '0') : sampleTime)).ToString("yyyy-MM-dd HH:mm:ss");
  637. // 胎心数据推送到第三方
  638. var topic = "topic.push.third";
  639. var fhrThridMsg = new
  640. {
  641. messageId = fhrMsgId,
  642. topic = topic,
  643. time = fhrMsgTime,
  644. data = new
  645. {
  646. imei = heartRate.Serialno,
  647. value = fetalHeartRate,
  648. isAbnormal,
  649. type = "fetalHeart"
  650. }
  651. };
  652. await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic, 31, fhrThridMsg).ConfigureAwait(false);
  653. // 胎心数据推送到微信
  654. if (isAbnormal != 0)
  655. {
  656. topic = "topic.push.wx";
  657. var fhrMsg = new
  658. {
  659. messageId = fhrMsgId,
  660. topic = topic,
  661. time = fhrMsgTime,
  662. data = new
  663. {
  664. deviceId = device?.DeviceId,
  665. imei = heartRate.Serialno,
  666. alarmTypeId = 12,
  667. alarmDeviceName = heartRate.Serialno,
  668. alarmRemarks = JsonConvert.SerializeObject(new { fetalHeartValue = fetalHeartRate, isAbnormal = isAbnormal }),
  669. address = string.Empty,
  670. deviceKey = device?.DeviceId
  671. }
  672. };
  673. await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic, fhrMsg).ConfigureAwait(false);
  674. }
  675. }
  676. private async Task SetIntervalTriggerAsync(string key, string imei, long interval, HisGpsHeartRate heartRate)
  677. {
  678. // var key = $"health_monitor/schedule_push/{type}/imei/{imei}";
  679. var schedulePush = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false);
  680. if (string.IsNullOrWhiteSpace(schedulePush))
  681. {
  682. var now = DateTime.Now;
  683. var timeNextRun = now.Add(TimeSpan.FromSeconds(interval));
  684. var data = new
  685. {
  686. imei,
  687. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  688. ttl = interval,
  689. next_run_time = timeNextRun.ToString("yyyy-MM-dd HH:mm:ss"),
  690. trigger = heartRate,
  691. };
  692. var result = JsonConvert.SerializeObject(data);
  693. await _serviceEtcd.PutValAsync(key, result, interval, false).ConfigureAwait(false);
  694. }
  695. }
  696. /// <summary>
  697. /// 去除高频数据
  698. /// </summary>
  699. /// <param name="phr"></param>
  700. /// <param name="highFreqSampleInterva"></param>
  701. /// <returns></returns>
  702. private static List<PregnancyHeartRateModel> GetNonFreqPregnancyHeartRate(List<PregnancyHeartRateModel> phr,int highFreqSampleInterval)
  703. {
  704. phr = phr.OrderByDescending(i => i.LastUpdate).ToList();
  705. var result = new List<PregnancyHeartRateModel>();
  706. PregnancyHeartRateModel? previousItem = null;
  707. foreach (var item in phr)
  708. {
  709. if (previousItem != null)
  710. {
  711. var timeNextDiff =(previousItem!.LastUpdate - item.LastUpdate).TotalSeconds;
  712. if (timeNextDiff > highFreqSampleInterval)
  713. {
  714. result.Add(previousItem);
  715. }
  716. }
  717. previousItem = item;
  718. }
  719. // 添加上一个
  720. if (previousItem != null)
  721. {
  722. result.Add(previousItem);
  723. }
  724. return result;
  725. }
  726. /// <summary>
  727. /// 获取高频数据
  728. /// </summary>
  729. /// <param name="phr"></param>
  730. /// <param name="highFreqSampleInterval"></param>
  731. /// <returns></returns>
  732. private static List<PregnancyHeartRateModel> GetFreqPregnancyHeartRate(List<PregnancyHeartRateModel> phr, int highFreqSampleInterval)
  733. {
  734. phr = phr.OrderByDescending(i => i.LastUpdate).ToList();
  735. var freqCollection = new List<PregnancyHeartRateModel>();
  736. PregnancyHeartRateModel? previousItem = null;
  737. foreach (var item in phr)
  738. {
  739. if (previousItem != null)
  740. {
  741. var timeNextDiff = (previousItem.LastUpdate - item.LastUpdate).TotalSeconds;
  742. if (timeNextDiff <= highFreqSampleInterval)
  743. {
  744. freqCollection.Add(previousItem);
  745. }
  746. }
  747. previousItem = item;
  748. }
  749. // 检查最后一条是否高频
  750. if (previousItem != null && (phr.Last().LastUpdate - previousItem.LastUpdate).TotalSeconds <= highFreqSampleInterval)
  751. {
  752. freqCollection.Add(previousItem);
  753. }
  754. return freqCollection;
  755. }
  756. }
  757. }