using HealthMonitor.Common; using HealthMonitor.Common.helper; using HealthMonitor.Model.Service.Mapper; using HealthMonitor.Service.Biz; using HealthMonitor.Service.Biz.db; using HealthMonitor.Service.Cache; using HealthMonitor.Service.Etcd; using HealthMonitor.Service.Resolver.Interface; using HealthMonitor.Service.Sub; using HealthMonitor.Service.Sub.Topic.Model; using Microsoft.EntityFrameworkCore.Metadata; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using SqlSugar; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using TelpoDataService.Util.Clients; using TelpoDataService.Util.Entities.GpsLocationHistory; namespace HealthMonitor.Service.Resolver { public class PregnancyHeartRateResolver : IResolver { private readonly ILogger _logger; private readonly TDengineService _serviceTDengine; private readonly DeviceCacheManager _deviceCacheMgr; private readonly IotApiService _serviceIotApi; private readonly AsyncLocal _messageId = new(); private readonly AsyncLocal _msgData = new(); private readonly HttpHelper _httpHelper = default!; private readonly EtcdService _serviceEtcd; private readonly GpsLocationHistoryAccessorClient _hisFetalHeartApiClient; public PregnancyHeartRateResolver(ILogger logger, HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr, IotApiService iotApiService, TDengineService serviceDengine, GpsLocationHistoryAccessorClient hisFetalHeartApiClient) { _logger = logger; _httpHelper = httpHelper; _serviceEtcd = serviceEtcd; _serviceTDengine = serviceDengine; _deviceCacheMgr = deviceCacheMgr; _serviceIotApi = iotApiService; _hisFetalHeartApiClient = hisFetalHeartApiClient; } public void SetResolveInfo(PackageMsgModel msg) { var topicHmPregnancyHeartRate = JsonConvert.DeserializeObject(msg.DetailData.ToString()!); _messageId.Value = msg.MessageId; _msgData.Value = new HisGpsHeartRate() { HeartRateId = topicHmPregnancyHeartRate!.PregnancyHeartRateId, MessageId = topicHmPregnancyHeartRate!.MessageId, Serialno = topicHmPregnancyHeartRate!.Serialno, HeartRate= topicHmPregnancyHeartRate.PregnancyHeartRate, LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmPregnancyHeartRate.LastUpdate) / 1000000), CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmPregnancyHeartRate.CreateTime) / 1000000), Method = topicHmPregnancyHeartRate!.Method, IsDisplay = topicHmPregnancyHeartRate!.IsDisplay ? 1 : 0 }; } public override string ToString() { return $"{nameof(PregnancyHeartRateResolver)}[{_messageId.Value}]"; } public async Task ExecuteMessageAsync() { //throw new NotImplementedException(); var messageId = _messageId.Value; var heartRate = _msgData.Value!; var watchConfig = await _deviceCacheMgr.GetGpsDeviceWatchConfigCacheObjectBySerialNoAsync(heartRate.Serialno, "0067"); var isFetalHeartEnable = watchConfig != null && (int)watchConfig["enabled"]! == 1; if (isFetalHeartEnable) { var phr = await _serviceTDengine.GetBySerialNoAsync(heartRate.Serialno, 7); if (phr.Count >= 30) { #region 高频心率计算 // 获取最近的两个记录,并计算它们的 LastUpdate 时间差 var firstTwoPhr = phr.OrderByDescending(i => i.LastUpdate).Take(2).Select(i => i.LastUpdate).ToList(); var timeDiff = firstTwoPhr[0] - firstTwoPhr[1]; // 如果需要,将时间差转换为秒 var timeDiffInSeconds = timeDiff.TotalSeconds; // 高频心率采样间隔 var highFreqSampleInterval = (int)watchConfig!["highFreqSampleInterval"]!; // 触发高频监测的心率上限值 var triggerHighFreqHigh = (int)watchConfig["triggerHighFreqHigh"]!; // 触发高频监测的心率下限值 var triggerHighFreqLow = (int)watchConfig["triggerHighFreqLow"]!; //停止高频心率采样心率连续正常次数 var stopHighFreqSampleCount = (int)watchConfig["stopHighFreqSampleCount"]!; // 高频心率采集时长 0 为持续采集,非零为高频心率的采集时长 var highFreqSampleTimes = (int)watchConfig["stopHighFreqSampleCount"]!; // 告警上限阀值 var upperAlarmThreshold = (int)watchConfig["upperAlarmThreshold"]!; // 告警下限阀值 var lowerAlarmThreshold= (int)watchConfig["lowerAlarmThreshold"]!; // 高频心率启动 if (timeDiffInSeconds<=highFreqSampleInterval) { var phrFreqstatus =await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno); if (phrFreqstatus == null) { /// 设置高频状态 _logger.LogInformation($"进入高频心率启动状态 timeDiffInSeconds {timeDiffInSeconds},highFreqSampleInterval:{highFreqSampleInterval}"); // 设置高频状态 var freqFirstPhr= phr.OrderByDescending(i => i.Timestamp).First(); await _deviceCacheMgr.SetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno, freqFirstPhr); phrFreqstatus = await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno); } /// phr PregnancyHeartRate 连续连续正常次数个值都是正常(大于等于triggerHighFreqLow,少于等于triggerHighFreqHig), /// 取连续正常次数正常值的平均值,推送到api/v1/open/OpenIot/SetFetalHeartRateConfig #region 检查是否连续12个值都是正常的 // 获取最近连续正常次数个心率记录 var lastPhr = phr.OrderByDescending(i => i.Timestamp).Take(stopHighFreqSampleCount).ToList(); // 检查是否连续12个值都是正常的 if (lastPhr.All(i => i.PregnancyHeartRate >= triggerHighFreqLow && i.PregnancyHeartRate <= triggerHighFreqHigh)) { var avgPhr = lastPhr.Select(i => i.PregnancyHeartRate).Average(); // 计算一般心率得到胎心系数 await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr); } #endregion } // 高频心率结束 else { var phrFreqstatus = await _deviceCacheMgr.GetPregnancyHeartRateFreqStatusAsync(heartRate.Serialno); if (phrFreqstatus != null) { /// 在highFreqSampleTimes=0一直异常(大于等于triggerHighFreqLow,少于等于triggerHighFreqHig), /// 取所有值的平均值,推送胎心数据到api/v1/open/OpenIot/SetFetalHeartRateConfig if (highFreqSampleTimes==0) { // if (phr.OrderByDescending(i => i.Timestamp) //.Where(i => i.Timestamp >= phrFreqstatus?.Timestamp) //.Skip(1) // 去除首条 //.All(i => i.PregnancyHeartRate < triggerHighFreqLow || i.PregnancyHeartRate > triggerHighFreqHig)) // { // var avgPhr = phr.Select(i => i.PregnancyHeartRate).Average(); // // 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig // // 计算一般心率得到胎心系数 // } var avgPhr = phr.OrderByDescending(i => i.Timestamp) .Where(i => i.Timestamp >= phrFreqstatus?.Timestamp) .Skip(1) // 去除首条 .Where(i => i.PregnancyHeartRate < triggerHighFreqLow || i.PregnancyHeartRate > triggerHighFreqHigh) .Select(i => i.PregnancyHeartRate).Average(); // 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig // 计算一般心率得到胎心系数 await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr); } /// 在highFreqSampleTimes>0一直异常(大于等于triggerHighFreqLow,少于等于triggerHighFreqHig), /// 取所有值的平均值,推送胎心数据到api/v1/open/OpenIot/SetFetalHeartRateConfig if (highFreqSampleTimes > 0 && heartRate.LastUpdate >= (phrFreqstatus?.LastUpdate + TimeSpan.FromSeconds(highFreqSampleTimes))) { var avgPhr = phr .Where(i => i.Timestamp >= phrFreqstatus?.Timestamp) .Skip(1) // 去除首条 .Where(i => i.PregnancyHeartRate < triggerHighFreqLow || i.PregnancyHeartRate > triggerHighFreqHigh) .Select(i => i.PregnancyHeartRate).Average(); // 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig // 计算一般心率得到胎心系数 await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr); } // 删除高频状态的首条记录 await _deviceCacheMgr.DelPregnancyHeartRateFreqStatusAsync(heartRate.Serialno); /// 设置高频状态 _logger.LogInformation($"结束高频心率状态 timeDiffInSeconds {timeDiffInSeconds},highFreqSampleInterval:{highFreqSampleInterval}"); } } #endregion #region 定时计算胎心数据触发器 {interval} 秒后 var fetalKey = $"health_monitor/schedule_push/cal_fetal_heart_rate/imei/{heartRate.Serialno}"; await SetIntervalTriggerAsync(fetalKey, heartRate.Serialno, 60 * 15); //var scheduleFetalPush = await _serviceEtcd.GetValAsync(fetalKey).ConfigureAwait(false); //if (string.IsNullOrWhiteSpace(scheduleFetalPush)) //{ // //var fetalInterval = (int)watchConfig["interval"]!; // var fetalInterval = 60 * 15; // var fetalNow= DateTime.Now; // var fetalTimeNextRun = fetalNow.Add(TimeSpan.FromSeconds(fetalInterval)); // var fetalTTL = fetalInterval; // var data = new // { // imei = heartRate.Serialno, // create_time = fetalNow.ToString("yyyy-MM-dd HH:mm:ss"), // fetalTTL, // next_run_time = fetalTimeNextRun.ToString("yyyy-MM-dd HH:mm:ss") // }; // var result = JsonConvert.SerializeObject(data); // await _serviceEtcd.PutValAsync(fetalKey, result, fetalTTL, false).ConfigureAwait(false); //} #endregion #region 定时计算胎动数据触发器 0 点开始 var fetalMovementKey = $"health_monitor/schedule_push/cal_fetal_movement/imei/{heartRate.Serialno}"; /// 计算 0 点秒数 DateTime now = DateTime.Now; var rand = new Random(); var pushSec = rand.Next(59); int pushMin = int.TryParse(heartRate.Serialno.AsSpan(heartRate.Serialno.Length - 1), out pushMin) ? pushMin : 10; DateTime nextRunTime = new (now.Year, now.Month, now.Day, 0, pushMin, pushSec); TimeSpan timeUntilNextRun = nextRunTime - now; if (timeUntilNextRun < TimeSpan.Zero) { timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1)); nextRunTime += timeUntilNextRun; } var ttl = (long)timeUntilNextRun.TotalSeconds; await SetIntervalTriggerAsync(fetalMovementKey, heartRate.Serialno, ttl); //var scheduleFetalMovementPush = await _serviceEtcd.GetValAsync(fetalMovementKey).ConfigureAwait(false); //if (string.IsNullOrWhiteSpace(scheduleFetalMovementPush)) //{ // var fetalMovementInterval = 60 * 15; // var fetalMovementNow = DateTime.Now; // var fetalMovementTimeNextRun = fetalMovementNow.Add(TimeSpan.FromSeconds(fetalMovementInterval)); // var fetalMovementTTL = fetalMovementInterval; // var data = new // { // imei = heartRate.Serialno, // create_time = fetalMovementNow.ToString("yyyy-MM-dd HH:mm:ss"), // fetalMovementTTL, // next_run_time = fetalMovementTimeNextRun.ToString("yyyy-MM-dd HH:mm:ss") // }; // var result = JsonConvert.SerializeObject(data); // await _serviceEtcd.PutValAsync(fetalKey, result, fetalMovementTTL, false).ConfigureAwait(false); //} #endregion } #region 定时下发触发器(定时建模) var key = $"health_monitor/schedule_push/pregnancy_heart_rate/imei/{heartRate.Serialno}"; var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false); if (string.IsNullOrWhiteSpace(schedule_push)) { // 注册首次下推 #if DEBUG // await _serviceEtcd.PutValAsync(key, result, 60*1, false).ConfigureAwait(false); var interval = 0; // 获取当前时间 DateTime now = DateTime.Now; // 计算距离下一个$interval天后的8点的时间间隔 DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 1, 58).AddDays(interval); TimeSpan timeUntilNextRun = nextRunTime - now; // 如果当前时间已经超过了8点,将等待到明天后的8点 if (timeUntilNextRun < TimeSpan.Zero) { timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1)); nextRunTime += timeUntilNextRun; } var ttl = (long)timeUntilNextRun.TotalSeconds; var data = new { imei = heartRate.Serialno, create_time = now.ToString("yyyy-MM-dd HH:mm:ss"), ttl, next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss") }; var result = JsonConvert.SerializeObject(data); await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false); #else var interval = 0; // 获取当前时间 DateTime now = DateTime.Now; var rand=new Random(); var pushSec = rand.Next(59); int pushMin= int.TryParse(heartRate.Serialno.AsSpan(heartRate.Serialno.Length - 1), out pushMin) ? pushMin : 10; // 计算距离下一个$interval天后的8点的时间间隔 DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 18, pushMin, pushSec).AddDays(interval); TimeSpan timeUntilNextRun = nextRunTime - now; if (timeUntilNextRun < TimeSpan.Zero) { timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1)); nextRunTime += TimeSpan.FromDays(1); } var ttl =(long)timeUntilNextRun.TotalSeconds; var data = new { imei = heartRate.Serialno, create_time = now.ToString("yyyy-MM-dd HH:mm:ss"), ttl, next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss") }; var result = JsonConvert.SerializeObject(data); await _serviceEtcd.PutValAsync(key, result,ttl, false).ConfigureAwait(false); #endif } #endregion } } private async Task SaveAndPushFetalHeartRateAsync(HisGpsHeartRate heartRate, int upperAlarmThreshold, int lowerAlarmThreshold, double avgPhr) { var commonPHR = await _serviceTDengine.InitPregnancyCommonHeartRateModeAsync(heartRate.Serialno); if (commonPHR != null) { // 保存到TDengine数据库 await _serviceTDengine.InsertAsync("hm_pchr", commonPHR); // 计算胎心=孕妇心率*系数 var fetalHeartRate = SafeType.SafeInt(avgPhr * commonPHR?.StatModeAvgFprCoefficient!); var sampleTime = DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString(); var isAbnormal = fetalHeartRate > upperAlarmThreshold ? 1 : (fetalHeartRate < lowerAlarmThreshold ? 2 : 0); // 保存到 数据服务 MySQL 数据库 HisGpsFetalHeartRate gpsFetalHeartRate = new () { FetalHeartRateId = Guid.NewGuid().ToString("D"), PersonId = commonPHR!.PersonId, Serialno = heartRate.Serialno, HeartRate = fetalHeartRate, SampleTime = sampleTime, IsAbnormal = isAbnormal, StatStartTime = commonPHR.StatStartTime, StatEndTime = commonPHR.StatEndTime, CreateTime = DateTime.Now, Method = 1, IsDisplay = 1, DeviceKey = commonPHR!.DeviceKey }; await _hisFetalHeartApiClient.AddAsync(gpsFetalHeartRate).ConfigureAwait(false); // 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal); } } private async Task SetIntervalTriggerAsync(string key,string imei, long interval) { // var key = $"health_monitor/schedule_push/{type}/imei/{imei}"; var schedulePush = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false); if (string.IsNullOrWhiteSpace(schedulePush)) { var now = DateTime.Now; var timeNextRun = now.Add(TimeSpan.FromSeconds(interval)); var data = new { imei, create_time = now.ToString("yyyy-MM-dd HH:mm:ss"), ttl = interval, next_run_time = timeNextRun.ToString("yyyy-MM-dd HH:mm:ss") }; var result = JsonConvert.SerializeObject(data); await _serviceEtcd.PutValAsync(key, result, interval, false).ConfigureAwait(false); } } } }