using dotnet_etcd; using Etcdserverpb; using Google.Protobuf.WellKnownTypes; using HealthMonitor.Common; using HealthMonitor.Common.helper; using HealthMonitor.Core.Common.Extensions; using HealthMonitor.Model.Config; using HealthMonitor.Model.Service; using HealthMonitor.Model.Service.Mapper; using HealthMonitor.Service.Biz; using HealthMonitor.Service.Biz.db; using HealthMonitor.Service.Cache; using HealthMonitor.Service.Etcd; using HealthMonitor.Service.Sub; using Microsoft.AspNetCore.Mvc.RazorPages; using Microsoft.EntityFrameworkCore.Metadata.Internal; using Microsoft.Extensions.Options; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System.Reflection; using System.Threading.Channels; using TDengineDriver; using TDengineTMQ; namespace HealthMonitor.WebApi { public class Worker : BackgroundService { private readonly ILogger _logger; private readonly IServiceProvider _services; private readonly TDengineDataSubcribe _tdEngineDataSubcribe; private readonly PackageProcess _processor; private readonly TDengineService _serviceTDengine; private readonly EtcdService _serviceEtcd; private readonly HttpHelper _httpHelper = default!; private readonly IotWebApiService _serviceIotWebApi; private readonly BoodPressResolverConfig _configBoodPressResolver; private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager; private readonly PersonCacheManager _personCacheMgr; private CancellationTokenSource _tokenSource = default!; public Worker(ILogger logger, IServiceProvider services, PersonCacheManager personCacheMgr, BloodPressReferenceValueCacheManager bpRefValCacheManager, IotWebApiService iotWebApiService, IOptions optionBoodPressResolver, PackageProcess processor, TDengineDataSubcribe tdEngineDataSubcribe, TDengineService serviceDengine, HttpHelper httpHelper, EtcdService serviceEtcd) { _logger = logger; _tdEngineDataSubcribe = tdEngineDataSubcribe; _services = services; _serviceIotWebApi = iotWebApiService; _processor = processor; _serviceEtcd = serviceEtcd; _serviceTDengine = serviceDengine; _httpHelper = httpHelper; _configBoodPressResolver = optionBoodPressResolver.Value; _bpRefValCacheManager = bpRefValCacheManager; _personCacheMgr = personCacheMgr; } public override Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("------StartAsync"); _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); // 创建消费者 // _tdEngineDataSubcribe.CreateConsumer(); return base.StartAsync(cancellationToken); } public override Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("------StopAsync"); _tokenSource.Cancel(); //停止工作线程 // 关闭消费者 // _tdEngineDataSubcribe.CloseConsumer(); return base.StopAsync(cancellationToken); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { // var processor = _services.GetService(); TaskFactory factory = new(_tokenSource.Token); factory.StartNew(async () => { if (_tokenSource.IsCancellationRequested) _logger.LogWarning("Worker exit"); _logger.LogInformation("------ResolveAsync"); while (!_tokenSource.IsCancellationRequested) { // await _processor.ResolveAsync().ConfigureAwait(false); // await _tdEngineDataSubcribe.ProcessMsg(); } }, TaskCreationOptions.LongRunning); factory.StartNew(() => { _logger.LogInformation("------_tdEngineDataSubcribe"); while (!_tokenSource.IsCancellationRequested) { Console.WriteLine("test"); _tdEngineDataSubcribe.BeginListen(_tokenSource.Token); } }, TaskCreationOptions.LongRunning); Task.Run(() => _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents) , stoppingToken); // watch //factory.StartNew(() => //{ // while (!_tokenSource.IsCancellationRequested) // { // //_serviceEtcd.WacthKeysWithPrefixAsync($"health_moniter/schedule_push", watchEvents => WatchEvents(watchEvents)); // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents); // } //}, TaskCreationOptions.LongRunning); // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents); return Task.Delay(1000, _tokenSource.Token); } private void WatchEvents(WatchEvent[] response) { foreach (WatchEvent e1 in response) { // Console.WriteLine($"{nameof(WatchEventsAsync)} --- {e1.Key}:{e1.Value}:{e1.Type}"); switch (e1.Type.ToString()) { //case "Put": // // 获取时间点计算TTL // break; case "Delete": // TTL到了重新计算TTL,下发 Console.WriteLine($"--- {e1.Key}:{e1.Value}:{e1.Type}"); break; } } } private void WatchEvents(WatchResponse response) { response.Events.ToList().ForEach(async e => { try { switch (e.Type.ToString()) { case "Put": // 获取时间点计算TTL Console.BackgroundColor = ConsoleColor.Blue; Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}"); Console.BackgroundColor = ConsoleColor.Black; break; case "Delete": // TTL到了重新计算TTL,下发 Console.BackgroundColor = ConsoleColor.Green; Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}"); // var key = $"health_moniter/schedule_push/imei/{bp.Serialno}"; var key = e.Kv.Key.ToStringUtf8(); var imeiDel = key.Split('/')[3]; var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false); if (string.IsNullOrWhiteSpace(schedule_push)) { int systolicInc; int diastolicInc; int systolicRefValue; int diastolicRefValue; decimal systolicAvg; decimal diastolicAvg; int systolicMax = 0; int diastolicMax = 0; // 统计时间 //DateTime endTime = DateTime.Now; //测试 DateTime statStartTime = DateTime.Now; // 最小值 int systolicMin = 0; int diastolicMin = 0; // 偏移参数 var avgOffset = 0.25M; var systolicAvgOffset = avgOffset; var diastolicAvgOffset = avgOffset; // 最后一次下发值 int lastPushSystolicInc = 0; int lastPushDiastolicInc = 0; var startTime = DateTime.Now; // 下发增量值 #region 统计定时下发增量值 //var last = await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc"); //var ts = last?[0]; // 最后一条血压数据 var condition = $"serialno='{imeiDel}' order by last_update desc"; var field = "last_row(*)"; var lastHmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition, field); var lastHmBpParser = JsonConvert.DeserializeObject>(lastHmBpResponse!); var lastHmBp = lastHmBpParser?.Select().FirstOrDefault(); //if (lastHmBpParser?.Select()?.ToList().Count < 2) //{ // _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 血压数据条目不足"); // break; //} // 7 天有效数据 if (lastHmBp?.Timestamp.AddDays(7) > DateTime.Now) { // 计算增量值 condition = $"serialno='{imeiDel}' order by ts desc"; var lastPushResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bp_push_ref_inc_value", condition, field); if (lastPushResponse == null) { _logger.LogInformation($"{imeiDel}--没有下发记录"); break; } var lastPushParser = JsonConvert.DeserializeObject>(lastPushResponse); var lastPush = lastPushParser!.Select().FirstOrDefault(); // 有下推记录 if (lastPush != null) { systolicRefValue = lastPush!.SystolicRefValue; diastolicRefValue = lastPush!.DiastolicRefValue; lastPushSystolicInc = lastPush!.SystolicIncValue; lastPushDiastolicInc = lastPush!.DiastolicIncValue; condition = $"ts between '{lastPush?.Timestamp:yyyy-MM-dd HH:mm:ss.fff}' and '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " + $"and serialno='{imeiDel}' " + $"and is_display = true"; // 使用最近一次的下推时间作为统计的开始时间 statStartTime= lastPush!.Timestamp; } // 没有下推记录(历史遗留数据),没有初始的测量值产生的平均值(测量值=平均值) else { #region 获取个人信息 var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(Guid.NewGuid().ToString(), imeiDel).ConfigureAwait(false); //验证这个信息是否存在 if (person == null || person?.Person.BornDate == null) { _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证个人信息,找不到个人信息,跳过此消息"); break; } // 验证年龄是否在范围 (2 - 120) var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!); if (age < 2 || age > 120) { _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证年龄,不在范围 (2 - 120)岁,跳过此消息"); break; } var gender = person?.Person.Gender == true ? 1 : 2; var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!); var height = SafeType.SafeDouble(person?.Person.Height!); var weight = SafeType.SafeDouble(person?.Person.Weight!); #endregion #region 初始化常规血压标定值标定值 var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension); //systolicRefValue = bpRef!.Systolic;//? //diastolicRefValue = bpRef!.Diastolic;//? #endregion systolicRefValue = bpRef!.Systolic; diastolicRefValue = bpRef!.Diastolic; lastPushSystolicInc = 0; lastPushDiastolicInc = 0; condition = $"ts <= '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " + $"and serialno='{imeiDel}' " + $"and is_display = true"; } var hmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition); var hmBpParser = JsonConvert.DeserializeObject>(hmBpResponse!); var hmBp = hmBpParser?.Select(); //if (hmBp?.ToList().Count < 2) // 1. 判断数据样本数量 if (hmBpParser!.Rows < 5) { _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 统计定时下发,计算增量值的数据条目不足:{hmBpParser!.Rows} < 5"); _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} 没有足够的数据样本,不会定时下发"); break; } // 没有下推记录重新计算统计时间 if (lastPush == null) { var firstHmBp = hmBpParser?.Select(i=>i).OrderBy(i=>i.Timestamp).FirstOrDefault(); statStartTime = firstHmBp!.Timestamp; } // NewMethod(systolicRefValue, hmBpParser); // 最大值 //systolicMax = (int)hmBpParser?.Select(i => i.SystolicValue).Max()!; //diastolicMax = (int)hmBpParser?.Select(i => i.DiastolicValue).Max()!; //// 最小值 //systolicMin = (int)hmBpParser?.Select(i => i.SystolicValue).Min()!; //diastolicMin = (int)hmBpParser?.Select(i => i.DiastolicValue).Min()!; //systolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.SystolicValue, SafeType.SafeInt(systolicRefValue!)))!; //diastolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.DiastolicValue, SafeType.SafeInt(diastolicRefValue!)))!; var avgs = _serviceTDengine.AverageAfterRemovingOneMinMaxRef(hmBpParser!); systolicAvg = avgs[0]; diastolicAvg = avgs[1]; // 2. 判断能否计算增量值 if (systolicAvg.Equals(0)) { _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算平均值" + $"\n currentSystolicAvg:{systolicAvg} -- lastPushSystolicInc:{lastPushSystolicInc}" + $"\n currentDiastolicInc:{diastolicAvg} -- lastPushDiastolicInc:{lastPushDiastolicInc}"); _logger.LogInformation($"{imeiDel}--{nameof(Worker)} 没有足够的数据样本计算平均值,不会定时下发"); break; } // 除最大值和最小值后的平均值与标定值差值少于4后(当天计算出该结果则也不产生增量调整),就不再进行增量值调整了。 if (systolicRefValue - systolicAvg < 4) { _logger.LogInformation($"systolic 收缩压 {imeiDel}除最大值和最小值后的平均值:{systolicAvg}与标定值:{systolicRefValue}的差值(标定值-平均值)少于4后,不再进行增量值调整"); break; } if (diastolicRefValue - diastolicAvg < 4) { _logger.LogInformation($"diastolic 舒张压 {imeiDel}除最大值和最小值后的平均值:{diastolicAvg}与标定值:{diastolicRefValue}的差值(标定值-平均值)少于4后,不再进行增量值调整"); break; } // 增量值=(标定值-平均值)* 0.25 var currentSystolicInc = (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!; var currentDiastolicInc = (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!; // 累计增量 systolicInc = currentSystolicInc + lastPushSystolicInc; diastolicInc = currentDiastolicInc + lastPushDiastolicInc; _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算增量值" + $"\n {imeiDel} -- systolicAvg:{systolicAvg}-- systolicInc:{systolicInc}-- currentSystolicInc:{currentSystolicInc} -- lastPushSystolicInc:{lastPushSystolicInc}" + $"\n {imeiDel} -- diastolicAvg:{diastolicAvg}-- diastolicInc:{diastolicInc} --currentDiastolicInc:{currentDiastolicInc} -- lastPushDiastolicInc:{lastPushDiastolicInc}"); _logger.LogInformation($"{imeiDel}--{nameof(Worker)}-- 定时校准,发给设备的绝对增量值=(上次绝对增量值+新数据的增量值)"); _logger.LogInformation($"{nameof(Worker)} 开启血压标定值下发: {_configBoodPressResolver.EnableBPRefPush}"); if (_configBoodPressResolver.EnableBPRefPush) // if (false) // 临时关闭 { BloodPressCalibrationConfigModel bpIncData = new() { Imei = imeiDel, SystolicRefValue = SafeType.SafeInt(((int)systolicRefValue!)), //收缩压标定值,值为0 表示不生效 DiastolicRefValue = SafeType.SafeInt(((int)diastolicRefValue!)), //舒张压标定值,值为0表示不生效 SystolicIncValue = SafeType.SafeInt(((int)systolicInc!)), //收缩压显示增量,值为0 表示不生效 DiastolicIncValue = SafeType.SafeInt(((int)diastolicInc!)) //舒张压显示增量,值为0 表示不生效 }; //var pushedBP = await _serviceIotWebApi.SetBloodPressCalibrationConfigAsync(bpIncData).ConfigureAwait(false); var response = await _serviceIotWebApi.SetBloodPressCalibrationConfig2Async(bpIncData).ConfigureAwait(false); var pushedBP = response.Flag; if (pushedBP) { #region 保存下推记录 stb_hm_bp_push_ref_inc_value var sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{imeiDel.Substring(imeiDel.Length - 2)} " + $"USING health_monitor.stb_hm_bp_push_ref_inc_value " + $"TAGS ('{imeiDel.Substring(imeiDel.Length - 2)}') " + $"VALUES(" + $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'," + $"'{imeiDel}'," + $"{bpIncData.SystolicRefValue}," + $"{bpIncData.DiastolicRefValue}," + $"{bpIncData.SystolicIncValue}," + $"{bpIncData.DiastolicIncValue}," + $"{false}," + $"{systolicAvg}," + $"{diastolicAvg}," + $"{systolicAvgOffset}," + $"{diastolicAvgOffset}," + $"'{statStartTime:yyyy-MM-dd HH:mm:ss.fff}'," + $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'" + $")"; _serviceTDengine.ExecuteInsertSQL(sql); #endregion #region 注册定时下发 // 注册下次下推 var endTime = DateTime.Now; #if DEBUG //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000); //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false); var interval = 0; // 获取当前时间 DateTime now = DateTime.Now; // 计算距离下一个$interval天后的8点的时间间隔 DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval); TimeSpan timeUntilNextRun = nextRunTime - now; // 如果当前时间已经超过了8点,将等待到明天后的8点 if (timeUntilNextRun < TimeSpan.Zero) { timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1)); nextRunTime += timeUntilNextRun; } // var ttl = timeUntilNextRun.TotalMilliseconds; long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000); var data = new { imei = imeiDel, create_time = now.ToString("yyyy-MM-dd HH:mm:ss"), ttl, next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss") }; var result = JsonConvert.SerializeObject(data); await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false); #else // 每$interval天,晚上8点 var interval = 1; // 获取当前时间 DateTime now = DateTime.Now; // 计算距离下一个$interval天后的8点的时间间隔 DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 20, 0, 0).AddDays(interval); TimeSpan timeUntilNextRun = nextRunTime - now; // 如果当前时间已经超过了8点,将等待到明天后的8点 if (timeUntilNextRun < TimeSpan.Zero) { timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1)); nextRunTime += timeUntilNextRun; } // var ttl = timeUntilNextRun.TotalMilliseconds; long ttl = (long)((timeUntilNextRun.TotalMilliseconds-(endTime-startTime).TotalMilliseconds)/1000); var data = new { imei = imeiDel, create_time = now.ToString("yyyy-MM-dd HH:mm:ss"), ttl, next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss") }; var result = JsonConvert.SerializeObject(data); await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false); #endif #endregion } else { _logger.LogInformation($"错误响应,没有下推数据:{response.Message}"); } } } else { _logger.LogInformation($"向{imeiDel}统计数据已经失效"); } #endregion } break; } } catch (Exception ex) { _logger.LogInformation($"{nameof(WatchEvents)},出错: |{ex.Message}|{ex.StackTrace}"); } }); } } }