You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

489 lines
29KB

  1. using dotnet_etcd;
  2. using Etcdserverpb;
  3. using Google.Protobuf.WellKnownTypes;
  4. using HealthMonitor.Common;
  5. using HealthMonitor.Common.helper;
  6. using HealthMonitor.Core.Common.Extensions;
  7. using HealthMonitor.Model.Config;
  8. using HealthMonitor.Model.Service;
  9. using HealthMonitor.Model.Service.Mapper;
  10. using HealthMonitor.Service.Biz;
  11. using HealthMonitor.Service.Biz.db;
  12. using HealthMonitor.Service.Cache;
  13. using HealthMonitor.Service.Etcd;
  14. using HealthMonitor.Service.Sub;
  15. using Microsoft.AspNetCore.Mvc.RazorPages;
  16. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  17. using Microsoft.Extensions.Options;
  18. using Newtonsoft.Json;
  19. using Newtonsoft.Json.Linq;
  20. using System.Reflection;
  21. using System.Threading.Channels;
  22. using TDengineDriver;
  23. using TDengineTMQ;
  24. namespace HealthMonitor.WebApi
  25. {
  26. public class Worker : BackgroundService
  27. {
  28. private readonly ILogger<Worker> _logger;
  29. private readonly IServiceProvider _services;
  30. private readonly TDengineDataSubcribe _tdEngineDataSubcribe;
  31. private readonly PackageProcess _processor;
  32. private readonly TDengineService _serviceTDengine;
  33. private readonly EtcdService _serviceEtcd;
  34. private readonly HttpHelper _httpHelper = default!;
  35. private readonly IotWebApiService _serviceIotWebApi;
  36. private readonly BoodPressResolverConfig _configBoodPressResolver;
  37. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  38. private readonly PersonCacheManager _personCacheMgr;
  39. private CancellationTokenSource _tokenSource = default!;
  40. public Worker(ILogger<Worker> logger, IServiceProvider services, PersonCacheManager personCacheMgr, BloodPressReferenceValueCacheManager bpRefValCacheManager, IotWebApiService iotWebApiService, IOptions<BoodPressResolverConfig> optionBoodPressResolver, PackageProcess processor, TDengineDataSubcribe tdEngineDataSubcribe, TDengineService serviceDengine, HttpHelper httpHelper, EtcdService serviceEtcd)
  41. {
  42. _logger = logger;
  43. _tdEngineDataSubcribe = tdEngineDataSubcribe;
  44. _services = services;
  45. _serviceIotWebApi = iotWebApiService;
  46. _processor = processor;
  47. _serviceEtcd = serviceEtcd;
  48. _serviceTDengine = serviceDengine;
  49. _httpHelper = httpHelper;
  50. _configBoodPressResolver = optionBoodPressResolver.Value;
  51. _bpRefValCacheManager = bpRefValCacheManager;
  52. _personCacheMgr = personCacheMgr;
  53. }
  54. public override Task StartAsync(CancellationToken cancellationToken)
  55. {
  56. _logger.LogInformation("------StartAsync");
  57. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  58. // 创建消费者
  59. // _tdEngineDataSubcribe.CreateConsumer();
  60. return base.StartAsync(cancellationToken);
  61. }
  62. public override Task StopAsync(CancellationToken cancellationToken)
  63. {
  64. _logger.LogInformation("------StopAsync");
  65. _tokenSource.Cancel(); //停止工作线程
  66. // 关闭消费者
  67. // _tdEngineDataSubcribe.CloseConsumer();
  68. return base.StopAsync(cancellationToken);
  69. }
  70. protected override Task ExecuteAsync(CancellationToken stoppingToken)
  71. {
  72. // var processor = _services.GetService<PackageProcess>();
  73. TaskFactory factory = new(_tokenSource.Token);
  74. factory.StartNew(async () =>
  75. {
  76. if (_tokenSource.IsCancellationRequested)
  77. _logger.LogWarning("Worker exit");
  78. _logger.LogInformation("------ResolveAsync");
  79. while (!_tokenSource.IsCancellationRequested)
  80. {
  81. //
  82. await _processor.ResolveAsync().ConfigureAwait(false);
  83. // await _tdEngineDataSubcribe.ProcessMsg();
  84. }
  85. }, TaskCreationOptions.LongRunning);
  86. factory.StartNew(() =>
  87. {
  88. _logger.LogInformation("------_tdEngineDataSubcribe");
  89. while (!_tokenSource.IsCancellationRequested)
  90. {
  91. _tdEngineDataSubcribe.BeginListen(_tokenSource.Token);
  92. }
  93. }, TaskCreationOptions.LongRunning);
  94. Task.Run(() =>
  95. _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents)
  96. , stoppingToken);
  97. // watch
  98. //factory.StartNew(() =>
  99. //{
  100. // while (!_tokenSource.IsCancellationRequested)
  101. // {
  102. // //_serviceEtcd.WacthKeysWithPrefixAsync($"health_moniter/schedule_push", watchEvents => WatchEvents(watchEvents));
  103. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  104. // }
  105. //}, TaskCreationOptions.LongRunning);
  106. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  107. return Task.Delay(1000, _tokenSource.Token);
  108. }
  109. private void WatchEvents(WatchEvent[] response)
  110. {
  111. foreach (WatchEvent e1 in response)
  112. {
  113. // Console.WriteLine($"{nameof(WatchEventsAsync)} --- {e1.Key}:{e1.Value}:{e1.Type}");
  114. switch (e1.Type.ToString())
  115. {
  116. //case "Put":
  117. // // 获取时间点计算TTL
  118. // break;
  119. case "Delete":
  120. // TTL到了重新计算TTL,下发
  121. Console.WriteLine($"--- {e1.Key}:{e1.Value}:{e1.Type}");
  122. break;
  123. }
  124. }
  125. }
  126. private void WatchEvents(WatchResponse response)
  127. {
  128. response.Events.ToList<Mvccpb.Event>().ForEach(async e =>
  129. {
  130. try
  131. {
  132. switch (e.Type.ToString())
  133. {
  134. case "Put":
  135. // 获取时间点计算TTL
  136. Console.BackgroundColor = ConsoleColor.Blue;
  137. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  138. Console.BackgroundColor = ConsoleColor.Black;
  139. break;
  140. case "Delete":
  141. // TTL到了重新计算TTL,下发
  142. Console.BackgroundColor = ConsoleColor.Green;
  143. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  144. // var key = $"health_moniter/schedule_push/imei/{bp.Serialno}";
  145. var key = e.Kv.Key.ToStringUtf8();
  146. var imeiDel = key.Split('/')[3];
  147. var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false);
  148. if (string.IsNullOrWhiteSpace(schedule_push))
  149. {
  150. int systolicInc;
  151. int diastolicInc;
  152. int systolicRefValue;
  153. int diastolicRefValue;
  154. decimal systolicAvg;
  155. decimal diastolicAvg;
  156. int systolicMax = 0;
  157. int diastolicMax = 0;
  158. // 统计时间
  159. //DateTime endTime = DateTime.Now; //测试
  160. DateTime statStartTime = DateTime.Now;
  161. // 最小值
  162. int systolicMin = 0;
  163. int diastolicMin = 0;
  164. // 偏移参数
  165. var avgOffset = 0.25M;
  166. var systolicAvgOffset = avgOffset;
  167. var diastolicAvgOffset = avgOffset;
  168. // 最后一次下发值
  169. int lastPushSystolicInc = 0;
  170. int lastPushDiastolicInc = 0;
  171. var startTime = DateTime.Now;
  172. // 下发增量值
  173. #region 统计定时下发增量值
  174. //var last = await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc");
  175. //var ts = last?[0];
  176. // 最后一条血压数据
  177. var condition = $"serialno='{imeiDel}' order by last_update desc";
  178. var field = "last_row(*)";
  179. var lastHmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition, field);
  180. var lastHmBpParser = JsonConvert.DeserializeObject<ParseTDengineRestResponse<BloodPressureModel>>(lastHmBpResponse!);
  181. var lastHmBp = lastHmBpParser?.Select().FirstOrDefault();
  182. //if (lastHmBpParser?.Select()?.ToList().Count < 2)
  183. //{
  184. // _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 血压数据条目不足");
  185. // break;
  186. //}
  187. // 7 天有效数据
  188. if (lastHmBp?.Timestamp.AddDays(7) > DateTime.Now)
  189. {
  190. // 计算增量值
  191. condition = $"serialno='{imeiDel}' order by ts desc";
  192. var lastPushResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bp_push_ref_inc_value", condition, field);
  193. if (lastPushResponse == null)
  194. {
  195. _logger.LogInformation($"{imeiDel}--没有下发记录");
  196. break;
  197. }
  198. var lastPushParser = JsonConvert.DeserializeObject<ParseTDengineRestResponse<BloodPressurePushRefIncModel>>(lastPushResponse);
  199. var lastPush = lastPushParser!.Select().FirstOrDefault();
  200. // 有下推记录
  201. if (lastPush != null)
  202. {
  203. systolicRefValue = lastPush!.SystolicRefValue;
  204. diastolicRefValue = lastPush!.DiastolicRefValue;
  205. lastPushSystolicInc = lastPush!.SystolicIncValue;
  206. lastPushDiastolicInc = lastPush!.DiastolicIncValue;
  207. condition = $"ts between '{lastPush?.Timestamp:yyyy-MM-dd HH:mm:ss.fff}' and '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " +
  208. $"and serialno='{imeiDel}' " +
  209. $"and is_display = true";
  210. // 使用最近一次的下推时间作为统计的开始时间
  211. statStartTime= lastPush!.Timestamp;
  212. }
  213. // 没有下推记录(历史遗留数据),没有初始的测量值产生的平均值(测量值=平均值)
  214. else
  215. {
  216. #region 获取个人信息
  217. var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(Guid.NewGuid().ToString(), imeiDel).ConfigureAwait(false);
  218. //验证这个信息是否存在
  219. if (person == null || person?.Person.BornDate == null)
  220. {
  221. _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证个人信息,找不到个人信息,跳过此消息");
  222. break;
  223. }
  224. // 验证年龄是否在范围 (2 - 120)
  225. var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
  226. if (age < 2 || age > 120)
  227. {
  228. _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证年龄,不在范围 (2 - 120)岁,跳过此消息");
  229. break;
  230. }
  231. var gender = person?.Person.Gender == true ? 1 : 2;
  232. var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
  233. var height = SafeType.SafeDouble(person?.Person.Height!);
  234. var weight = SafeType.SafeDouble(person?.Person.Weight!);
  235. #endregion
  236. #region 初始化常规血压标定值标定值
  237. var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);
  238. //systolicRefValue = bpRef!.Systolic;//?
  239. //diastolicRefValue = bpRef!.Diastolic;//?
  240. #endregion
  241. systolicRefValue = bpRef!.Systolic;
  242. diastolicRefValue = bpRef!.Diastolic;
  243. lastPushSystolicInc = 0;
  244. lastPushDiastolicInc = 0;
  245. condition = $"ts <= '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " +
  246. $"and serialno='{imeiDel}' " +
  247. $"and is_display = true";
  248. }
  249. var hmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition);
  250. var hmBpParser = JsonConvert.DeserializeObject<ParseTDengineRestResponse<BloodPressureModel>>(hmBpResponse!);
  251. var hmBp = hmBpParser?.Select();
  252. //if (hmBp?.ToList().Count < 2)
  253. // 1. 判断数据样本数量
  254. if (hmBpParser!.Rows < 5)
  255. {
  256. _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 统计定时下发,计算增量值的数据条目不足:{hmBpParser!.Rows} < 5");
  257. _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} 没有足够的数据样本,不会定时下发");
  258. break;
  259. }
  260. // 没有下推记录重新计算统计时间
  261. if (lastPush == null)
  262. {
  263. var firstHmBp = hmBpParser?.Select(i=>i).OrderBy(i=>i.Timestamp).FirstOrDefault();
  264. statStartTime = firstHmBp!.Timestamp;
  265. }
  266. // NewMethod(systolicRefValue, hmBpParser);
  267. // 最大值
  268. //systolicMax = (int)hmBpParser?.Select(i => i.SystolicValue).Max()!;
  269. //diastolicMax = (int)hmBpParser?.Select(i => i.DiastolicValue).Max()!;
  270. //// 最小值
  271. //systolicMin = (int)hmBpParser?.Select(i => i.SystolicValue).Min()!;
  272. //diastolicMin = (int)hmBpParser?.Select(i => i.DiastolicValue).Min()!;
  273. //systolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.SystolicValue, SafeType.SafeInt(systolicRefValue!)))!;
  274. //diastolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.DiastolicValue, SafeType.SafeInt(diastolicRefValue!)))!;
  275. var avgs = _serviceTDengine.AverageAfterRemovingOneMinMaxRef(SafeType.SafeInt(systolicRefValue!), hmBpParser!);
  276. systolicAvg = avgs[0];
  277. diastolicAvg = avgs[1];
  278. // 2. 判断能否计算增量值
  279. if (systolicAvg.Equals(0))
  280. {
  281. _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算平均值" +
  282. $"\n currentSystolicAvg:{systolicAvg} -- lastPushSystolicInc:{lastPushSystolicInc}" +
  283. $"\n currentDiastolicInc:{diastolicAvg} -- lastPushDiastolicInc:{lastPushDiastolicInc}");
  284. _logger.LogInformation($"{imeiDel}--{nameof(Worker)} 没有足够的数据样本计算平均值,不会定时下发");
  285. break;
  286. }
  287. // 增量值=(标定值-平均值)* 0.25
  288. var currentSystolicInc = (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
  289. var currentDiastolicInc = (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;
  290. // 累计增量
  291. systolicInc = currentSystolicInc + lastPushSystolicInc;
  292. diastolicInc = currentDiastolicInc + lastPushDiastolicInc;
  293. _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算增量值" +
  294. $"\n {imeiDel} -- systolicAvg:{systolicAvg}-- systolicInc:{systolicInc}-- currentSystolicInc:{currentSystolicInc} -- lastPushSystolicInc:{lastPushSystolicInc}" +
  295. $"\n {imeiDel} -- diastolicAvg:{diastolicAvg}-- diastolicInc:{diastolicInc} --currentDiastolicInc:{currentDiastolicInc} -- lastPushDiastolicInc:{lastPushDiastolicInc}");
  296. _logger.LogInformation($"{imeiDel}--{nameof(Worker)}-- 定时校准,发给设备的绝对增量值=(上次绝对增量值+新数据的增量值)");
  297. _logger.LogInformation($"{nameof(Worker)} 开启血压标定值下发: {_configBoodPressResolver.EnableBPRefPush}");
  298. if (_configBoodPressResolver.EnableBPRefPush)
  299. // if (false) // 临时关闭
  300. {
  301. BloodPressCalibrationConfigModel bpIncData = new()
  302. {
  303. Imei = imeiDel,
  304. SystolicRefValue = SafeType.SafeInt(((int)systolicRefValue!)), //收缩压标定值,值为0 表示不生效
  305. DiastolicRefValue = SafeType.SafeInt(((int)diastolicRefValue!)), //舒张压标定值,值为0表示不生效
  306. SystolicIncValue = SafeType.SafeInt(((int)systolicInc!)), //收缩压显示增量,值为0 表示不生效
  307. DiastolicIncValue = SafeType.SafeInt(((int)diastolicInc!)) //舒张压显示增量,值为0 表示不生效
  308. };
  309. var pushedBP = await _serviceIotWebApi.SetBloodPressCalibrationConfigAsync(bpIncData).ConfigureAwait(false);
  310. if (pushedBP)
  311. {
  312. #region 保存下推记录 stb_hm_bp_push_ref_inc_value
  313. var sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{imeiDel.Substring(imeiDel.Length - 2)} " +
  314. $"USING health_monitor.stb_hm_bp_push_ref_inc_value " +
  315. $"TAGS ('{imeiDel.Substring(imeiDel.Length - 2)}') " +
  316. $"VALUES(" +
  317. $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'," +
  318. $"'{imeiDel}'," +
  319. $"{bpIncData.SystolicRefValue}," +
  320. $"{bpIncData.DiastolicRefValue}," +
  321. $"{bpIncData.SystolicIncValue}," +
  322. $"{bpIncData.DiastolicIncValue}," +
  323. $"{false}," +
  324. $"{systolicAvg}," +
  325. $"{diastolicAvg}," +
  326. $"{systolicAvgOffset}," +
  327. $"{diastolicAvgOffset}," +
  328. $"'{statStartTime:yyyy-MM-dd HH:mm:ss.fff}'," +
  329. $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'" +
  330. $")";
  331. _serviceTDengine.ExecuteInsertSQL(sql);
  332. #endregion
  333. #region 注册定时下发
  334. // 注册下次下推
  335. var endTime = DateTime.Now;
  336. #if DEBUG
  337. //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000);
  338. //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false);
  339. var interval = 0;
  340. // 获取当前时间
  341. DateTime now = DateTime.Now;
  342. // 计算距离下一个$interval天后的8点的时间间隔
  343. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval);
  344. TimeSpan timeUntilNextRun = nextRunTime - now;
  345. // 如果当前时间已经超过了8点,将等待到明天后的8点
  346. if (timeUntilNextRun < TimeSpan.Zero)
  347. {
  348. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1));
  349. nextRunTime += timeUntilNextRun;
  350. }
  351. // var ttl = timeUntilNextRun.TotalMilliseconds;
  352. long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000);
  353. var data = new
  354. {
  355. imei = imeiDel,
  356. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  357. ttl,
  358. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  359. };
  360. var result = JsonConvert.SerializeObject(data);
  361. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  362. #else
  363. // 每$interval天,晚上8点
  364. var interval = 1;
  365. // 获取当前时间
  366. DateTime now = DateTime.Now;
  367. // 计算距离下一个$interval天后的8点的时间间隔
  368. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 20, 0, 0).AddDays(interval);
  369. TimeSpan timeUntilNextRun = nextRunTime - now;
  370. // 如果当前时间已经超过了8点,将等待到明天后的8点
  371. if (timeUntilNextRun < TimeSpan.Zero)
  372. {
  373. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1));
  374. nextRunTime += timeUntilNextRun;
  375. }
  376. // var ttl = timeUntilNextRun.TotalMilliseconds;
  377. long ttl = (long)((timeUntilNextRun.TotalMilliseconds-(endTime-startTime).TotalMilliseconds)/1000);
  378. var data = new
  379. {
  380. imei = imeiDel,
  381. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  382. ttl,
  383. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  384. };
  385. var result = JsonConvert.SerializeObject(data);
  386. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  387. #endif
  388. #endregion
  389. }
  390. else
  391. {
  392. _logger.LogInformation($"错误响应,没有下推数据");
  393. }
  394. }
  395. }
  396. else
  397. {
  398. _logger.LogInformation($"向{imeiDel}统计数据已经失效");
  399. }
  400. #endregion
  401. }
  402. break;
  403. }
  404. }
  405. catch (Exception ex)
  406. {
  407. _logger.LogInformation($"{nameof(WatchEvents)},出错: |{ex.Message}|{ex.StackTrace}");
  408. }
  409. });
  410. }
  411. }
  412. }