using HealthMonitor.Common; using HealthMonitor.Common.helper; using HealthMonitor.Service.Biz.db; using HealthMonitor.Service.Cache; using HealthMonitor.Service.Etcd; using HealthMonitor.Service.Resolver.Interface; using HealthMonitor.Service.Sub; using HealthMonitor.Service.Sub.Topic.Model; using Microsoft.EntityFrameworkCore.Metadata; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Data.Common; using System.Linq; using System.Text; using System.Text.Json.Serialization; using System.Threading.Tasks; using TDengineTMQ; using TelpoDataService.Util.Entities.GpsCard; using TelpoDataService.Util; using TelpoDataService.Util.Entities.GpsLocationHistory; using HealthMonitor.Service.Biz; using HealthMonitor.Model.Service; using Microsoft.Extensions.Options; using HealthMonitor.Model.Config; using HealthMonitor.Model.Service.Mapper; using Mvccpb; namespace HealthMonitor.Service.Resolver { public class BloodpressResolver: IResolver { private readonly ILogger _logger; private readonly BoodPressResolverConfig _configBoodPressResolver; private readonly PersonCacheManager _personCacheMgr; private readonly TDengineService _serviceTDengine; private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager; private readonly HttpHelper _httpHelper = default!; private readonly GpsCardAccessorClient _gpsPersonApiClient; private readonly IotWebApiService _serviceIotWebApi; private readonly AsyncLocal _messageId = new(); private readonly AsyncLocal _msgData = new(); private readonly EtcdService _serviceEtcd; public BloodpressResolver( TDengineService serviceDengine, BloodPressReferenceValueCacheManager bpRefValCacheManager, PersonCacheManager personCacheMgr, HttpHelper httpHelper, GpsCardAccessorClient gpsPersonApiClient, IotWebApiService iotWebApiService, EtcdService serviceEtcd, IOptions optionBoodPressResolver, ILogger logger) { _httpHelper = httpHelper; _serviceTDengine = serviceDengine; _bpRefValCacheManager = bpRefValCacheManager; _gpsPersonApiClient = gpsPersonApiClient; _serviceIotWebApi = iotWebApiService; _logger = logger; _personCacheMgr = personCacheMgr; _serviceEtcd = serviceEtcd; _configBoodPressResolver= optionBoodPressResolver.Value; } public void SetResolveInfo(PackageMsgModel msg) { var topicHmBloodPress = JsonConvert.DeserializeObject(msg.DetailData.ToString()!); _messageId.Value = msg.MessageId; _msgData.Value = new HisGpsBloodPress() { BloodPressId = topicHmBloodPress!.BloodPressId, MessageId = topicHmBloodPress!.MessageId, Serialno= topicHmBloodPress!.Serialno, SystolicValue = topicHmBloodPress!.SystolicValue, DiastolicValue= topicHmBloodPress!.DiastolicValue, LastUpdate= DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmBloodPress.LastUpdate) / 1000000), CreateTime= DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmBloodPress.CreateTime) / 1000000), Method= topicHmBloodPress!.Method, IsDisplay=topicHmBloodPress!.IsDisplay ? 1 : 0 }; } public override string ToString() { return $"{nameof(BloodpressResolver)}[{_messageId.Value}]"; } public async Task ExecuteMessageAsync() { try { var messageId = _messageId.Value; var bp = _msgData.Value!; int systolicRefValue; int diastolicRefValue; int systolicInc; int diastolicInc; int systolicAvg; int diastolicAvg; int systolicMax = 0; int diastolicMax = 0; // 最小值 int systolicMin = 0; int diastolicMin = 0; // 偏移参数 var avgOffset = 0.25M; var systolicAvgOffset = avgOffset; var diastolicAvgOffset = avgOffset; // 统计时间 DateTime endTime = DateTime.Now; //测试 DateTime startTime = DateTime.Now; // 最后一次下发值 //int lastPushSystolicInc = 0; //int lastPushDiastolicInc = 0; bool remarkFlag = false; //long duration = 7 * 24 * 3600 * 1000; string sql = string.Empty; #region 获取个人信息 var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false); //验证这个信息是否存在 if (person == null || person?.Person.BornDate == null) { _logger.LogWarning($"{bp.Serialno}--{bp.MessageId} 验证个人信息,找不到个人信息,跳过此消息"); return; } // 验证年龄是否在范围 (2 - 120) var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!); if (age < 2 || age > 120) { _logger.LogWarning($"{bp.Serialno}--{bp.MessageId} 验证年龄,不在范围 (2 - 120)岁,跳过此消息"); return; } var gender = person?.Person.Gender == true ? 1 : 2; var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!); var height = SafeType.SafeDouble(person?.Person.Height!); var weight = SafeType.SafeDouble(person?.Person.Weight!); #endregion #region 初始化常规血压标定值标定值 var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension); systolicRefValue = bpRef!.Systolic;//? diastolicRefValue = bpRef!.Diastolic;//? #endregion _logger.LogInformation($"{bp.Serialno} -- Person 值:{JsonConvert.SerializeObject(person)}"); _logger.LogInformation($"{bp.Serialno} -- Person Remarks 值:{person?.Person.Remarks}"); if (string.IsNullOrWhiteSpace(person?.Person.Remarks)) { _logger.LogInformation($"{bp.Serialno},设备解绑后绑定,首次手工测量了血压值,下发年龄标定值和增量值(测量值=平均值计算得出的)"); #region 初始化计算增量值(个人血压信息) // 测量值当作平均值 systolicAvg = bp.SystolicValue; diastolicAvg = bp.DiastolicValue; systolicInc = (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!; diastolicInc = (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!; #region 更新 gps_persoon remarks 下发增量值到iot // 更新 remarkFlag = await _serviceIotWebApi.UpdatePersonRemarksAsync(bp.Serialno, (int)systolicRefValue!, (int)diastolicRefValue!, systolicInc, diastolicInc).ConfigureAwait(false); if (remarkFlag) { _logger.LogInformation($"{nameof(BloodpressResolver)} 开启血压标定值下发: {_configBoodPressResolver.EnableBPRefPush}"); // 启血压标定值下发开关 if (_configBoodPressResolver.EnableBPRefPush) { // 下推 BloodPressCalibrationConfigModel bpIncData = new() { Imei = bp.Serialno, SystolicRefValue = (int)systolicRefValue!, //收缩压标定值,值为0 表示不生效 DiastolicRefValue = (int)diastolicRefValue!, //舒张压标定值,值为0表示不生效 SystolicIncValue = systolicInc, //收缩压显示增量,值为0 表示不生效 DiastolicIncValue = diastolicInc //舒张压显示增量,值为0 表示不生效 }; // 下发 IOT 增量值 var flagIot = await _serviceIotWebApi.SetBloodPressCalibrationConfigAsync(bpIncData).ConfigureAwait(false); if (flagIot) { startTime = (DateTime)bp.LastUpdate!; endTime = DateTime.Now; #region 保存下推记录 stb_hm_bp_push_ref_inc_value sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{bp.Serialno.Substring(bp.Serialno.Length - 2)} " + $"USING health_monitor.stb_hm_bp_push_ref_inc_value " + $"TAGS ('{bp.Serialno.Substring(bp.Serialno.Length - 2)}') " + $"VALUES(" + $"'{endTime:yyyy-MM-dd HH:mm:ss.fff}'," + $"'{bp.Serialno}'," + $"{systolicRefValue}," + $"{diastolicRefValue}," + $"{systolicInc}," + $"{diastolicInc}," + $"{true}," + $"{systolicAvg}," + $"{diastolicAvg}," + $"{systolicAvgOffset}," + $"{diastolicAvgOffset}," + $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'," + $"'{endTime:yyyy-MM-dd HH:mm:ss.fff}'" + $")"; _serviceTDengine.ExecuteInsertSQL(sql); #endregion } } } #endregion #endregion } else { #region (暂时取消)正常计算增量值 /** // var lastPush = await _serviceTDengine.GetLastAsync("stb_hm_bp_push_ref_inc_value", $"serialno='{bp.Serialno}' order by ts desc"); var lastPushResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bp_push_ref_inc_value", $"serialno='{bp.Serialno}' order by ts desc", "last_row(*)"); if (lastPushResponse == null) { return; } var lastPushParser = JsonConvert.DeserializeObject>(lastPushResponse); var lastPush = lastPushParser!.Select().FirstOrDefault(); //var ts = last?[0]; // 曾经有下发记录 if (lastPushParser?.Rows != 0) { // 重置设备,取正常值标定值 if ( lastPush!.SystolicRefValue == 0 && lastPush!.DiastolicRefValue == 0 && lastPush!.SystolicIncValue == 0 && lastPush!.DiastolicIncValue == 0 ) { systolicRefValue = bpRef!.Systolic;//? diastolicRefValue = bpRef!.Diastolic;//? } // 取最后一条下推的标定值 else { systolicRefValue = lastPush!.SystolicRefValue; diastolicRefValue = lastPush!.DiastolicRefValue; } duration = SafeType.SafeInt64(((DateTime)bp.LastUpdate! - lastPush!.Timestamp).TotalMilliseconds); lastPushSystolicInc = lastPush!.SystolicIncValue; lastPushDiastolicInc = lastPush!.DiastolicIncValue; } TimeSpan ts = TimeSpan.FromMilliseconds(duration); // 获取历史数据 ////DateTime now = DateTime.Now; //DateTime now = (DateTime)bp.LastUpdate!; //测试 //DateTime startTime = now.AddDays(-duration); //DateTime endTime = now; endTime = (DateTime)bp.LastUpdate!; //测试 startTime = endTime - ts; var condition = $"ts between '{startTime:yyyy-MM-dd HH:mm:ss.fff}' and '{endTime:yyyy-MM-dd HH:mm:ss.fff}' and serialno='{bp.Serialno}'"; var hmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition); var hmBpParser = JsonConvert.DeserializeObject>(hmBpResponse!); var hmBp = hmBpParser?.Select(); if (hmBp?.ToList().Count < 2) { _logger.LogInformation($"{bp.Serialno} 数据值不足"); return; } // 最大值 systolicMax = (int)hmBpParser?.Select(i => i.SystolicValue).Max()!; diastolicMax = (int)hmBpParser?.Select(i => i.DiastolicValue).Max()!; // 最小值 systolicMin = (int)hmBpParser?.Select(i => i.SystolicValue).Min()!; diastolicMin = (int)hmBpParser?.Select(i => i.DiastolicValue).Min()!; // 计算去除最大值和最小值和异常值的平均值 //var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValueAsync("systolic_value", "stb_hm_bloodpress", $"ts>='{startTime:yyyy-MM-dd HH:mm:ss.fff}' and ts <='{endTime:yyyy-MM-dd HH:mm:ss.fff}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} "); //var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValueAsync("diastolic_value", "stb_hm_bloodpress", $"ts>='{startTime:yyyy-MM-dd HH:mm:ss.fff}' and ts <='{endTime:yyyy-MM-dd HH:mm:ss.fff}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}"); systolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.SystolicValue, SafeType.SafeInt(systolicRefValue!)))!; diastolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.DiastolicValue, SafeType.SafeInt(diastolicRefValue!)))!; if (systolicAvg.Equals(0) || diastolicAvg.Equals(0)) { _logger.LogWarning($"{bp.Serialno} 历史数据{startTime}---{endTime}除最大值和最小值和异常值的平均值为0,使用测试量当做平均值"); systolicAvg = bp.SystolicValue; diastolicAvg = bp.DiastolicValue; } //var systolicAvg = _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-dd HH:mm:ss.fff}' and ts <='{endTime:yyyy-MM-dd HH:mm:ss.fff}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} "); //var diastolicAvg = _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-dd HH:mm:ss.fff}' and ts <='{endTime:yyyy-MM-dd HH:mm:ss.fff}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}"); // 增量值=(标定值-平均值)* 0.25 var currentSystolicInc = (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!; var currentDiastolicInc = (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!; // 累计增量 systolicInc = currentSystolicInc + lastPushSystolicInc; diastolicInc = currentDiastolicInc + lastPushDiastolicInc; */ #endregion } #region (暂时取消)插入BP增量值 hm_bloodpress_stats_inc // 自动建表 /** sql = $"INSERT INTO health_monitor.hm_bp_stats_inc_{bp.Serialno.Substring(bp.Serialno.Length - 2)} " + $"USING health_monitor.stb_hm_bloodpress_stats_inc " + $"TAGS ('{bp.Serialno.Substring(bp.Serialno.Length - 2)}') " + $"VALUES(" + $"'{bp.LastUpdate:yyyy-MM-dd HH:mm:ss.fff}'," + $"'{bp.BloodPressId}'," + $"'{bp.MessageId}'," + $"'{bp.Serialno}'," + $"{bp.SystolicValue}," + $"{systolicRefValue}," + $"{systolicAvg}," + $"{systolicMax}," + $"{systolicMin}," + $"{systolicAvgOffset}," + $"{systolicInc}," + $"{bp.DiastolicValue}," + $"{diastolicRefValue}," + $"{diastolicAvg}," + $"{diastolicMax}," + $"{diastolicMin}," + $"{diastolicAvgOffset}," + $"{diastolicInc}," + $"{gender}," + $"{age}," + $"{height}," + $"{weight}," + $"'{bp.LastUpdate:yyyy-MM-dd HH:mm:ss.fff}'," + $"{duration}," + $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'," + $"'{endTime:yyyy-MM-dd HH:mm:ss.fff}'," + $"'{string.Empty}'," + $"{isHypertension})"; _serviceTDengine.ExecuteInsertSQL(sql); */ // 发送到 设置设备血压标定参数 #endregion #region 定时下发触发器 var key = $"health_moniter/schedule_push/imei/{bp.Serialno}"; var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false); if (string.IsNullOrWhiteSpace(schedule_push)) { // 注册首次下推 #if DEBUG // await _serviceEtcd.PutValAsync(key, result, 60*1, false).ConfigureAwait(false); var interval = 0; // 获取当前时间 DateTime now = DateTime.Now; // 计算距离下一个$interval天后的8点的时间间隔 DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 1, 58).AddDays(interval); TimeSpan timeUntilNextRun = nextRunTime - now; // 如果当前时间已经超过了8点,将等待到明天后的8点 if (timeUntilNextRun < TimeSpan.Zero) { timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1)); nextRunTime += timeUntilNextRun; } var ttl = (long)timeUntilNextRun.TotalSeconds; var data = new { imei = bp.Serialno, create_time = now.ToString("yyyy-MM-dd HH:mm:ss"), ttl, next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss") }; var result = JsonConvert.SerializeObject(data); await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false); #else //DateTime sNow = DateTime.Now; //// 计算距离19:59:55点的时间间隔 //TimeSpan timeUntil = new DateTime(sNow.Year, sNow.Month, sNow.Day, 19, 59, 55) - sNow; //// 如果当前时间已经超过了12点,将等待到明天 //if (timeUntil < TimeSpan.Zero) //{ // timeUntil = timeUntil.Add(TimeSpan.FromHours(24)); //} //var ttl = (long)timeUntil.TotalSeconds; var interval = 0; // 获取当前时间 DateTime now = DateTime.Now; // 计算距离下一个$interval天后的8点的时间间隔 DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 19, 59, 58).AddDays(interval); TimeSpan timeUntilNextRun = nextRunTime - now; // 如果当前时间已经超过了8点,将等待到明天后的8点 if (timeUntilNextRun < TimeSpan.Zero) { timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1)); // nextRunTime += timeUntilNextRun; nextRunTime += TimeSpan.FromDays(1); } var ttl =(long)timeUntilNextRun.TotalSeconds; var data = new { imei = bp.Serialno, create_time = now.ToString("yyyy-MM-dd HH:mm:ss"), ttl, next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss") }; var result = JsonConvert.SerializeObject(data); await _serviceEtcd.PutValAsync(key, result,ttl, false).ConfigureAwait(false); #endif } #endregion } catch (Exception ex) { _logger.LogError($"解析血压出错, {ex.Message}\n{ex.StackTrace}"); } } } }