您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

651 行
41KB

  1. using dotnet_etcd;
  2. using Etcdserverpb;
  3. using Google.Protobuf.WellKnownTypes;
  4. using HealthMonitor.Common;
  5. using HealthMonitor.Common.helper;
  6. using HealthMonitor.Core.Common.Extensions;
  7. using HealthMonitor.Model.Config;
  8. using HealthMonitor.Model.Service;
  9. using HealthMonitor.Model.Service.Mapper;
  10. using HealthMonitor.Service.Biz;
  11. using HealthMonitor.Service.Biz.db;
  12. using HealthMonitor.Service.Cache;
  13. using HealthMonitor.Service.Etcd;
  14. using HealthMonitor.Service.Sub;
  15. using Microsoft.AspNetCore.Mvc.RazorPages;
  16. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  17. using Microsoft.Extensions.Options;
  18. using Newtonsoft.Json;
  19. using Newtonsoft.Json.Linq;
  20. using System.Reflection;
  21. using System.Threading.Channels;
  22. using TDengineDriver;
  23. using TDengineTMQ;
  24. using TelpoDataService.Util.Clients;
  25. using TelpoDataService.Util.Entities.GpsLocationHistory;
  26. namespace HealthMonitor.WebApi
  27. {
  28. public class Worker : BackgroundService
  29. {
  30. private readonly ILogger<Worker> _logger;
  31. private readonly IServiceProvider _services;
  32. private readonly TDengineDataSubcribe _tdEngineDataSubcribe;
  33. private readonly PackageProcess _processor;
  34. private readonly TDengineService _serviceTDengine;
  35. private readonly EtcdService _serviceEtcd;
  36. private readonly HttpHelper _httpHelper = default!;
  37. private readonly IotApiService _serviceIotApi;
  38. private readonly BoodPressResolverConfig _configBoodPressResolver;
  39. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  40. private readonly PersonCacheManager _personCacheMgr;
  41. private readonly DeviceCacheManager _deviceCacheMgr;
  42. private readonly GpsLocationHistoryAccessorClient<HisGpsFetalHeartRate> _hisFetalHeartApiClient;
  43. private CancellationTokenSource _tokenSource = default!;
  44. public Worker(ILogger<Worker> logger, IServiceProvider services, PersonCacheManager personCacheMgr,
  45. BloodPressReferenceValueCacheManager bpRefValCacheManager, IotApiService IotApiService,
  46. IOptions<BoodPressResolverConfig> optionBoodPressResolver, PackageProcess processor,
  47. TDengineDataSubcribe tdEngineDataSubcribe, TDengineService serviceDengine,
  48. GpsLocationHistoryAccessorClient<HisGpsFetalHeartRate> hisFetalHeartApiClient,
  49. HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr)
  50. {
  51. _logger = logger;
  52. _tdEngineDataSubcribe = tdEngineDataSubcribe;
  53. _services = services;
  54. _serviceIotApi = IotApiService;
  55. _processor = processor;
  56. _serviceEtcd = serviceEtcd;
  57. _serviceTDengine = serviceDengine;
  58. _httpHelper = httpHelper;
  59. _configBoodPressResolver = optionBoodPressResolver.Value;
  60. _bpRefValCacheManager = bpRefValCacheManager;
  61. _personCacheMgr = personCacheMgr;
  62. _deviceCacheMgr = deviceCacheMgr;
  63. _hisFetalHeartApiClient = hisFetalHeartApiClient;
  64. }
  65. public override Task StartAsync(CancellationToken cancellationToken)
  66. {
  67. //_logger.LogInformation("------StartAsync");
  68. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  69. return base.StartAsync(cancellationToken);
  70. }
  71. public override Task StopAsync(CancellationToken cancellationToken)
  72. {
  73. //_logger.LogInformation("------StopAsync");
  74. _tokenSource.Cancel(); //停止工作线程
  75. return base.StopAsync(cancellationToken);
  76. }
  77. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  78. {
  79. var tasks = new[]
  80. {
  81. Task.Run(async () =>
  82. {
  83. _logger.LogInformation("解析器启动");
  84. while (!stoppingToken.IsCancellationRequested)
  85. {
  86. await _processor.ResolveAsync().ConfigureAwait(false);
  87. }
  88. }, stoppingToken),
  89. Task.Run(() =>
  90. {
  91. _logger.LogInformation("TDengine 订阅启动");
  92. while (!stoppingToken.IsCancellationRequested)
  93. {
  94. _tdEngineDataSubcribe.BeginListen(stoppingToken);
  95. }
  96. }, stoppingToken),
  97. Task.Run(() =>
  98. _serviceEtcd.WacthKeysWithPrefixResponseAsync("health_monitor/schedule_push", WatchEvents),
  99. stoppingToken)
  100. };
  101. await Task.WhenAll(tasks);
  102. }
  103. private void WatchEvents(WatchResponse response)
  104. {
  105. response.Events.ToList<Mvccpb.Event>().ForEach(async e =>
  106. {
  107. try
  108. {
  109. switch (e.Type.ToString())
  110. {
  111. case "Put":
  112. // 获取时间点计算TTL
  113. Console.BackgroundColor = ConsoleColor.Blue;
  114. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  115. Console.BackgroundColor = ConsoleColor.Black;
  116. break;
  117. case "Delete":
  118. // TTL到了重新计算TTL,下发
  119. //Console.BackgroundColor = ConsoleColor.Green;
  120. //Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  121. // var key = $"health_monitor/schedule_push/imei/{bp.Serialno}";
  122. var key = e.Kv.Key.ToStringUtf8();
  123. var imeiDel = key.Split('/')[^1];
  124. var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false);
  125. if (string.IsNullOrWhiteSpace(schedule_push))
  126. {
  127. if (key.Contains("pregnancy_heart_rate"))
  128. {
  129. // 处理孕妇业务,计算一般心率并下发
  130. var commonPHR= await _serviceTDengine.InitPregnancyCommonHeartRateModeAsync(imeiDel);
  131. // 设置胎心监测参数
  132. if (commonPHR == null)
  133. {
  134. // 建模中
  135. var flag= await _serviceIotApi.SetFetalConfig(imeiDel);
  136. _logger.LogInformation($"{imeiDel} 建模建模中");
  137. }
  138. else
  139. {
  140. // 建模完成
  141. var flag = await _serviceIotApi.SetFetalConfig(imeiDel,1,commonPHR.MaxValue,commonPHR.MinValue);
  142. _logger.LogInformation($"{imeiDel} 建模完成");
  143. // 保存到TDengine数据库
  144. await _serviceTDengine.InsertAsync<PregnancyCommonHeartRateModel>("hm_pchr", commonPHR);
  145. _logger.LogInformation($"保存TDengine完成");
  146. //
  147. }
  148. #region 注册定时下发
  149. var startTime = DateTime.Now;
  150. // 注册下次下推
  151. var endTime = DateTime.Now;
  152. #if DEBUG
  153. //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000);
  154. //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false);
  155. var interval = 0;
  156. // 获取当前时间
  157. DateTime now = DateTime.Now;
  158. // 计算距离下一个$interval天后的8点的时间间隔
  159. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval);
  160. TimeSpan timeUntilNextRun = nextRunTime - now;
  161. // 如果当前时间已经超过了8点,将等待到明天后的8点
  162. if (timeUntilNextRun < TimeSpan.Zero)
  163. {
  164. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1));
  165. nextRunTime += timeUntilNextRun;
  166. }
  167. //long ttl = timeUntilNextRun.Milliseconds/1000;
  168. long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000);
  169. var data = new
  170. {
  171. imei = imeiDel,
  172. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  173. ttl,
  174. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  175. };
  176. var result = JsonConvert.SerializeObject(data);
  177. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  178. #else
  179. // 每$interval天,晚上8点
  180. var interval = 1;
  181. // 获取当前时间
  182. DateTime now = DateTime.Now;
  183. var rand = new Random();
  184. var pushSec = rand.Next(59);
  185. int pushMin = int.TryParse(imeiDel.AsSpan(imeiDel.Length - 1), out pushMin) ? pushMin : 10;
  186. // 计算距离下一个$interval天后的8点的时间间隔
  187. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 18, pushMin, pushSec).AddDays(interval);
  188. TimeSpan timeUntilNextRun = nextRunTime - now;
  189. // 如果当前时间已经超过了8点,将等待到明天后的8点
  190. if (timeUntilNextRun < TimeSpan.Zero)
  191. {
  192. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1));
  193. nextRunTime += timeUntilNextRun;
  194. }
  195. // var ttl = timeUntilNextRun.TotalMilliseconds;
  196. long ttl = (long)((timeUntilNextRun.TotalMilliseconds-(endTime-startTime).TotalMilliseconds)/1000);
  197. var data = new
  198. {
  199. imei = imeiDel,
  200. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  201. ttl,
  202. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  203. };
  204. var result = JsonConvert.SerializeObject(data);
  205. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  206. #endif
  207. #endregion
  208. }
  209. // health_monitor/schedule_push/cal_fetal_heart_rate/imei/
  210. else if (key.Contains("health_monitor/schedule_push/cal_fetal_heart_rate/imei/"))
  211. {
  212. var watchConfig = await _deviceCacheMgr.GetGpsDeviceWatchConfigCacheObjectBySerialNoAsync(imeiDel, "0067");
  213. var isFetalHeartEnable = watchConfig != null && (int)watchConfig["enabled"]! == 1;
  214. if (isFetalHeartEnable)
  215. {
  216. // 处理胎心计算业务,计算一般心率获取胎心系数
  217. var commonPHR = await _serviceTDengine.InitPregnancyCommonHeartRateModeAsync(imeiDel);
  218. if (commonPHR != null)
  219. {
  220. # region 计算胎心数据
  221. // 告警上限阀值
  222. var upperAlarmThreshold = (int)watchConfig!["upperAlarmThreshold"]!;
  223. // 告警下限阀值
  224. var lowerAlarmThreshold = (int)watchConfig["lowerAlarmThreshold"]!;
  225. var lastPhr = await _serviceTDengine.GetLastAsync<PregnancyHeartRateModel>();
  226. // 计算胎心=孕妇心率*系数
  227. var fetalHeartRate = SafeType.SafeInt(lastPhr.PregnancyHeartRate * commonPHR?.StatModeAvgFprCoefficient!);
  228. var sampleTime = DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString();
  229. var isAbnormal = fetalHeartRate > upperAlarmThreshold ? 1 : (fetalHeartRate < lowerAlarmThreshold ? 2 : 0);
  230. HisGpsFetalHeartRate gpsFetalHeartRate = new()
  231. {
  232. FetalHeartRateId = Guid.NewGuid().ToString("D"),
  233. PersonId = commonPHR!.PersonId,
  234. Serialno = imeiDel,
  235. HeartRate = fetalHeartRate,
  236. SampleTime = sampleTime,
  237. IsAbnormal = isAbnormal,
  238. StatStartTime = commonPHR.StatStartTime,
  239. StatEndTime = commonPHR.StatEndTime,
  240. CreateTime = DateTime.Now,
  241. Method = 1,
  242. IsDisplay = 1,
  243. DeviceKey = commonPHR!.DeviceKey
  244. };
  245. // 保存到 数据服务 MySQL 数据库
  246. await _hisFetalHeartApiClient.AddAsync(gpsFetalHeartRate).ConfigureAwait(false);
  247. // 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
  248. await _serviceIotApi.SetFetalHeartRateConfig(imeiDel, fetalHeartRate, sampleTime, isAbnormal);
  249. #endregion
  250. #region 注册定时计算胎心数据触发器 {interval} 秒后
  251. //var fetalKey = $"health_monitor/schedule_push/cal_fetal_heart_rate/imei/{imeiDel}";
  252. var fetalKey = key;
  253. var scheduleFetalPush = await _serviceEtcd.GetValAsync(fetalKey).ConfigureAwait(false);
  254. if (string.IsNullOrWhiteSpace(scheduleFetalPush))
  255. {
  256. // var fetalInterval = (int)watchConfig!["interval"]!;
  257. var fetalInterval = 60 * 15;
  258. var fetalNow = DateTime.Now;
  259. var fetalTimeNextRun = fetalNow.Add(TimeSpan.FromSeconds(fetalInterval));
  260. var fetalTTL = fetalInterval;
  261. var data = new
  262. {
  263. imei = imeiDel,
  264. create_time = fetalNow.ToString("yyyy-MM-dd HH:mm:ss"),
  265. fetalTTL,
  266. next_run_time = fetalTimeNextRun.ToString("yyyy-MM-dd HH:mm:ss")
  267. };
  268. var result = JsonConvert.SerializeObject(data);
  269. await _serviceEtcd.PutValAsync(fetalKey, result, fetalTTL, false).ConfigureAwait(false);
  270. }
  271. #endregion
  272. }
  273. }
  274. }
  275. else
  276. {
  277. // 处理血压业务
  278. int systolicInc;
  279. int diastolicInc;
  280. int systolicRefValue;
  281. int diastolicRefValue;
  282. decimal systolicAvg;
  283. decimal diastolicAvg;
  284. int systolicMax = 0;
  285. int diastolicMax = 0;
  286. // 统计时间
  287. //DateTime endTime = DateTime.Now; //测试
  288. DateTime statStartTime = DateTime.Now;
  289. // 最小值
  290. int systolicMin = 0;
  291. int diastolicMin = 0;
  292. // 偏移参数
  293. var avgOffset = 0.25M;
  294. var systolicAvgOffset = avgOffset;
  295. var diastolicAvgOffset = avgOffset;
  296. // 最后一次下发值
  297. int lastPushSystolicInc = 0;
  298. int lastPushDiastolicInc = 0;
  299. var startTime = DateTime.Now;
  300. // 下发增量值
  301. #region 统计定时下发增量值
  302. //var last = await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc");
  303. //var ts = last?[0];
  304. // 最后一条血压数据
  305. var condition = $"serialno='{imeiDel}' order by last_update desc";
  306. var field = "last_row(*)";
  307. var lastHmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition, field);
  308. var lastHmBpParser = JsonConvert.DeserializeObject<ParseTDengineRestResponse<BloodPressureModel>>(lastHmBpResponse!);
  309. var lastHmBp = lastHmBpParser?.Select().FirstOrDefault();
  310. //if (lastHmBpParser?.Select()?.ToList().Count < 2)
  311. //{
  312. // _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 血压数据条目不足");
  313. // break;
  314. //}
  315. // 7 天有效数据
  316. if (lastHmBp?.Timestamp.AddDays(7) > DateTime.Now)
  317. {
  318. // 计算增量值
  319. condition = $"serialno='{imeiDel}' order by ts desc";
  320. var lastPushResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bp_push_ref_inc_value", condition, field);
  321. if (lastPushResponse == null)
  322. {
  323. _logger.LogInformation($"{imeiDel}--没有下发记录");
  324. break;
  325. }
  326. var lastPushParser = JsonConvert.DeserializeObject<ParseTDengineRestResponse<BloodPressurePushRefIncModel>>(lastPushResponse);
  327. var lastPush = lastPushParser!.Select().FirstOrDefault();
  328. // 有下推记录
  329. if (lastPush != null)
  330. {
  331. systolicRefValue = lastPush!.SystolicRefValue;
  332. diastolicRefValue = lastPush!.DiastolicRefValue;
  333. lastPushSystolicInc = lastPush!.SystolicIncValue;
  334. lastPushDiastolicInc = lastPush!.DiastolicIncValue;
  335. condition = $"ts between '{lastPush?.Timestamp:yyyy-MM-dd HH:mm:ss.fff}' and '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " +
  336. $"and serialno='{imeiDel}' " +
  337. $"and is_display = true";
  338. // 使用最近一次的下推时间作为统计的开始时间
  339. statStartTime = lastPush!.Timestamp;
  340. }
  341. // 没有下推记录(历史遗留数据),没有初始的测量值产生的平均值(测量值=平均值)
  342. else
  343. {
  344. #region 获取个人信息
  345. var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(Guid.NewGuid().ToString(), imeiDel).ConfigureAwait(false);
  346. //验证这个信息是否存在
  347. if (person == null || person?.Person.BornDate == null)
  348. {
  349. _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证个人信息,找不到个人信息,跳过此消息");
  350. break;
  351. }
  352. // 验证年龄是否在范围 (2 - 120)
  353. var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
  354. if (age < 2 || age > 120)
  355. {
  356. _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证年龄,不在范围 (2 - 120)岁,跳过此消息");
  357. break;
  358. }
  359. var gender = person?.Person.Gender == true ? 1 : 2;
  360. var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
  361. var height = SafeType.SafeDouble(person?.Person.Height!);
  362. var weight = SafeType.SafeDouble(person?.Person.Weight!);
  363. #endregion
  364. #region 初始化常规血压标定值标定值
  365. var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);
  366. //systolicRefValue = bpRef!.Systolic;//?
  367. //diastolicRefValue = bpRef!.Diastolic;//?
  368. #endregion
  369. systolicRefValue = bpRef!.Systolic;
  370. diastolicRefValue = bpRef!.Diastolic;
  371. lastPushSystolicInc = 0;
  372. lastPushDiastolicInc = 0;
  373. condition = $"ts <= '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " +
  374. $"and serialno='{imeiDel}' " +
  375. $"and is_display = true";
  376. }
  377. var hmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition);
  378. var hmBpParser = JsonConvert.DeserializeObject<ParseTDengineRestResponse<BloodPressureModel>>(hmBpResponse!);
  379. var hmBp = hmBpParser?.Select();
  380. //if (hmBp?.ToList().Count < 2)
  381. // 1. 判断数据样本数量
  382. if (hmBpParser!.Rows < 5)
  383. {
  384. _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 统计定时下发,计算增量值的数据条目不足:{hmBpParser!.Rows} < 5");
  385. _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} 没有足够的数据样本,不会定时下发");
  386. break;
  387. }
  388. // 没有下推记录重新计算统计时间
  389. if (lastPush == null)
  390. {
  391. var firstHmBp = hmBpParser?.Select(i => i).OrderBy(i => i.Timestamp).FirstOrDefault();
  392. statStartTime = firstHmBp!.Timestamp;
  393. }
  394. // NewMethod(systolicRefValue, hmBpParser);
  395. // 最大值
  396. //systolicMax = (int)hmBpParser?.Select(i => i.SystolicValue).Max()!;
  397. //diastolicMax = (int)hmBpParser?.Select(i => i.DiastolicValue).Max()!;
  398. //// 最小值
  399. //systolicMin = (int)hmBpParser?.Select(i => i.SystolicValue).Min()!;
  400. //diastolicMin = (int)hmBpParser?.Select(i => i.DiastolicValue).Min()!;
  401. //systolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.SystolicValue, SafeType.SafeInt(systolicRefValue!)))!;
  402. //diastolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.DiastolicValue, SafeType.SafeInt(diastolicRefValue!)))!;
  403. var avgs = _serviceTDengine.AverageAfterRemovingOneMinMaxRef(hmBpParser!);
  404. systolicAvg = avgs[0];
  405. diastolicAvg = avgs[1];
  406. // 2. 判断能否计算增量值
  407. if (systolicAvg.Equals(0))
  408. {
  409. _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算平均值" +
  410. $"\n currentSystolicAvg:{systolicAvg} -- lastPushSystolicInc:{lastPushSystolicInc}" +
  411. $"\n currentDiastolicInc:{diastolicAvg} -- lastPushDiastolicInc:{lastPushDiastolicInc}");
  412. _logger.LogInformation($"{imeiDel}--{nameof(Worker)} 没有足够的数据样本计算平均值,不会定时下发");
  413. break;
  414. }
  415. // 除最大值和最小值后的平均值与标定值差值少于4后(当天计算出该结果则也不产生增量调整),就不再进行增量值调整了。
  416. if (systolicRefValue - systolicAvg < 4)
  417. {
  418. _logger.LogInformation($"diastolic 舒张压 {imeiDel}除最大值和最小值后的平均值:{diastolicAvg}与标定值:{diastolicRefValue}\n systolic 收缩压 {imeiDel}除最大值和最小值后的平均值:{systolicAvg}与标定值:{systolicRefValue}的差值(标定值-平均值)少于4后,systolic 收缩压 不再进行增量值调整");
  419. break;
  420. }
  421. if (diastolicRefValue - diastolicAvg < 4)
  422. {
  423. _logger.LogInformation($"systolic 收缩压 {imeiDel}除最大值和最小值后的平均值:{systolicAvg}与标定值:{systolicRefValue}\n diastolic 舒张压 {imeiDel}除最大值和最小值后的平均值:{diastolicAvg}与标定值:{diastolicRefValue}的差值(标定值-平均值)少于4后,diastolic 舒张压 不再进行增量值调整");
  424. break;
  425. }
  426. // 增量值=(标定值-平均值)* 0.25
  427. var currentSystolicInc = (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
  428. var currentDiastolicInc = (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;
  429. // 累计增量
  430. systolicInc = currentSystolicInc + lastPushSystolicInc;
  431. diastolicInc = currentDiastolicInc + lastPushDiastolicInc;
  432. _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算增量值" +
  433. $"\n {imeiDel} -- systolicAvg:{systolicAvg}-- systolicInc:{systolicInc}-- currentSystolicInc:{currentSystolicInc} -- lastPushSystolicInc:{lastPushSystolicInc}" +
  434. $"\n {imeiDel} -- diastolicAvg:{diastolicAvg}-- diastolicInc:{diastolicInc} --currentDiastolicInc:{currentDiastolicInc} -- lastPushDiastolicInc:{lastPushDiastolicInc}");
  435. _logger.LogInformation($"{imeiDel}--{nameof(Worker)}-- 定时校准,发给设备的绝对增量值=(上次绝对增量值+新数据的增量值)");
  436. _logger.LogInformation($"{nameof(Worker)} 开启血压标定值下发: {_configBoodPressResolver.EnableBPRefPush}");
  437. if (_configBoodPressResolver.EnableBPRefPush)
  438. // if (false) // 临时关闭
  439. {
  440. BloodPressCalibrationConfigModel bpIncData = new()
  441. {
  442. Imei = imeiDel,
  443. SystolicRefValue = SafeType.SafeInt(((int)systolicRefValue!)), //收缩压标定值,值为0 表示不生效
  444. DiastolicRefValue = SafeType.SafeInt(((int)diastolicRefValue!)), //舒张压标定值,值为0表示不生效
  445. SystolicIncValue = SafeType.SafeInt(((int)systolicInc!)), //收缩压显示增量,值为0 表示不生效
  446. DiastolicIncValue = SafeType.SafeInt(((int)diastolicInc!)) //舒张压显示增量,值为0 表示不生效
  447. };
  448. //var pushedBP = await _serviceIotApi.SetBloodPressCalibrationConfigAsync(bpIncData).ConfigureAwait(false);
  449. var response = await _serviceIotApi.SetBloodPressCalibrationConfig2Async(bpIncData).ConfigureAwait(false);
  450. var pushedBP = response.Flag;
  451. if (pushedBP)
  452. {
  453. #region 保存下推记录 stb_hm_bp_push_ref_inc_value
  454. var sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{imeiDel.Substring(imeiDel.Length - 2)} " +
  455. $"USING health_monitor.stb_hm_bp_push_ref_inc_value " +
  456. $"TAGS ('{imeiDel.Substring(imeiDel.Length - 2)}') " +
  457. $"VALUES(" +
  458. $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'," +
  459. $"'{imeiDel}'," +
  460. $"{bpIncData.SystolicRefValue}," +
  461. $"{bpIncData.DiastolicRefValue}," +
  462. $"{bpIncData.SystolicIncValue}," +
  463. $"{bpIncData.DiastolicIncValue}," +
  464. $"{false}," +
  465. $"{systolicAvg}," +
  466. $"{diastolicAvg}," +
  467. $"{systolicAvgOffset}," +
  468. $"{diastolicAvgOffset}," +
  469. $"'{statStartTime:yyyy-MM-dd HH:mm:ss.fff}'," +
  470. $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'" +
  471. $")";
  472. _serviceTDengine.ExecuteInsertSQL(sql);
  473. #endregion
  474. #region 注册定时下发
  475. // 注册下次下推
  476. var endTime = DateTime.Now;
  477. #if DEBUG
  478. //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000);
  479. //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false);
  480. var interval = 0;
  481. // 获取当前时间
  482. DateTime now = DateTime.Now;
  483. // 计算距离下一个$interval天后的8点的时间间隔
  484. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval);
  485. TimeSpan timeUntilNextRun = nextRunTime - now;
  486. // 如果当前时间已经超过了8点,将等待到明天后的8点
  487. if (timeUntilNextRun < TimeSpan.Zero)
  488. {
  489. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1));
  490. nextRunTime += timeUntilNextRun;
  491. }
  492. // var ttl = timeUntilNextRun.TotalMilliseconds;
  493. long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000);
  494. var data = new
  495. {
  496. imei = imeiDel,
  497. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  498. ttl,
  499. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  500. };
  501. var result = JsonConvert.SerializeObject(data);
  502. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  503. #else
  504. // 每$interval天,晚上8点
  505. var interval = 1;
  506. // 获取当前时间
  507. DateTime now = DateTime.Now;
  508. // 计算距离下一个$interval天后的8点的时间间隔
  509. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 20, 0, 0).AddDays(interval);
  510. TimeSpan timeUntilNextRun = nextRunTime - now;
  511. // 如果当前时间已经超过了8点,将等待到明天后的8点
  512. if (timeUntilNextRun < TimeSpan.Zero)
  513. {
  514. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1));
  515. nextRunTime += timeUntilNextRun;
  516. }
  517. // var ttl = timeUntilNextRun.TotalMilliseconds;
  518. long ttl = (long)((timeUntilNextRun.TotalMilliseconds-(endTime-startTime).TotalMilliseconds)/1000);
  519. var data = new
  520. {
  521. imei = imeiDel,
  522. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  523. ttl,
  524. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  525. };
  526. var result = JsonConvert.SerializeObject(data);
  527. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  528. #endif
  529. #endregion
  530. }
  531. else
  532. {
  533. _logger.LogInformation($"错误响应,没有下推数据:{response.Message}");
  534. }
  535. }
  536. }
  537. else
  538. {
  539. _logger.LogInformation($"向{imeiDel}统计数据已经失效");
  540. }
  541. #endregion
  542. }
  543. }
  544. break;
  545. }
  546. }
  547. catch (Exception ex)
  548. {
  549. _logger.LogInformation($"{nameof(WatchEvents)},出错: |{ex.Message}|{ex.StackTrace}");
  550. }
  551. });
  552. }
  553. }
  554. }