You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

Worker.cs 30KB

1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
1 년 전
8 달 전

  1. using dotnet_etcd;
  2. using Etcdserverpb;
  3. using Google.Protobuf.WellKnownTypes;
  4. using HealthMonitor.Common;
  5. using HealthMonitor.Common.helper;
  6. using HealthMonitor.Core.Common.Extensions;
  7. using HealthMonitor.Model.Config;
  8. using HealthMonitor.Model.Service;
  9. using HealthMonitor.Model.Service.Mapper;
  10. using HealthMonitor.Service.Biz;
  11. using HealthMonitor.Service.Biz.db;
  12. using HealthMonitor.Service.Cache;
  13. using HealthMonitor.Service.Etcd;
  14. using HealthMonitor.Service.Sub;
  15. using Microsoft.AspNetCore.Mvc.RazorPages;
  16. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  17. using Microsoft.Extensions.Options;
  18. using Newtonsoft.Json;
  19. using Newtonsoft.Json.Linq;
  20. using System.Reflection;
  21. using System.Threading.Channels;
  22. using TDengineDriver;
  23. using TDengineTMQ;
  24. namespace HealthMonitor.WebApi
  25. {
  26. public class Worker : BackgroundService
  27. {
  28. private readonly ILogger<Worker> _logger;
  29. private readonly IServiceProvider _services;
  30. private readonly TDengineDataSubcribe _tdEngineDataSubcribe;
  31. private readonly PackageProcess _processor;
  32. private readonly TDengineService _serviceTDengine;
  33. private readonly EtcdService _serviceEtcd;
  34. private readonly HttpHelper _httpHelper = default!;
  35. private readonly IotWebApiService _serviceIotWebApi;
  36. private readonly BoodPressResolverConfig _configBoodPressResolver;
  37. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  38. private readonly PersonCacheManager _personCacheMgr;
  39. private CancellationTokenSource _tokenSource = default!;
  40. public Worker(ILogger<Worker> logger, IServiceProvider services, PersonCacheManager personCacheMgr, BloodPressReferenceValueCacheManager bpRefValCacheManager, IotWebApiService iotWebApiService, IOptions<BoodPressResolverConfig> optionBoodPressResolver, PackageProcess processor, TDengineDataSubcribe tdEngineDataSubcribe, TDengineService serviceDengine, HttpHelper httpHelper, EtcdService serviceEtcd)
  41. {
  42. _logger = logger;
  43. _tdEngineDataSubcribe = tdEngineDataSubcribe;
  44. _services = services;
  45. _serviceIotWebApi = iotWebApiService;
  46. _processor = processor;
  47. _serviceEtcd = serviceEtcd;
  48. _serviceTDengine = serviceDengine;
  49. _httpHelper = httpHelper;
  50. _configBoodPressResolver = optionBoodPressResolver.Value;
  51. _bpRefValCacheManager = bpRefValCacheManager;
  52. _personCacheMgr = personCacheMgr;
  53. }
  54. public override Task StartAsync(CancellationToken cancellationToken)
  55. {
  56. _logger.LogInformation("------StartAsync");
  57. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  58. // 创建消费者
  59. // _tdEngineDataSubcribe.CreateConsumer();
  60. return base.StartAsync(cancellationToken);
  61. }
  62. public override Task StopAsync(CancellationToken cancellationToken)
  63. {
  64. _logger.LogInformation("------StopAsync");
  65. _tokenSource.Cancel(); //停止工作线程
  66. // 关闭消费者
  67. // _tdEngineDataSubcribe.CloseConsumer();
  68. return base.StopAsync(cancellationToken);
  69. }
  70. protected override Task ExecuteAsync(CancellationToken stoppingToken)
  71. {
  72. // var processor = _services.GetService<PackageProcess>();
  73. TaskFactory factory = new(_tokenSource.Token);
  74. factory.StartNew(async () =>
  75. {
  76. if (_tokenSource.IsCancellationRequested)
  77. _logger.LogWarning("Worker exit");
  78. _logger.LogInformation("------ResolveAsync");
  79. while (!_tokenSource.IsCancellationRequested)
  80. {
  81. //
  82. await _processor.ResolveAsync().ConfigureAwait(false);
  83. // await _tdEngineDataSubcribe.ProcessMsg();
  84. }
  85. }, TaskCreationOptions.LongRunning);
  86. factory.StartNew(() =>
  87. {
  88. _logger.LogInformation("------_tdEngineDataSubcribe");
  89. while (!_tokenSource.IsCancellationRequested)
  90. {
  91. Console.WriteLine("test");
  92. _tdEngineDataSubcribe.BeginListen(_tokenSource.Token);
  93. }
  94. }, TaskCreationOptions.LongRunning);
  95. Task.Run(() =>
  96. _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents)
  97. , stoppingToken);
  98. // watch
  99. //factory.StartNew(() =>
  100. //{
  101. // while (!_tokenSource.IsCancellationRequested)
  102. // {
  103. // //_serviceEtcd.WacthKeysWithPrefixAsync($"health_moniter/schedule_push", watchEvents => WatchEvents(watchEvents));
  104. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  105. // }
  106. //}, TaskCreationOptions.LongRunning);
  107. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  108. return Task.Delay(1000, _tokenSource.Token);
  109. }
  110. private void WatchEvents(WatchEvent[] response)
  111. {
  112. foreach (WatchEvent e1 in response)
  113. {
  114. // Console.WriteLine($"{nameof(WatchEventsAsync)} --- {e1.Key}:{e1.Value}:{e1.Type}");
  115. switch (e1.Type.ToString())
  116. {
  117. //case "Put":
  118. // // 获取时间点计算TTL
  119. // break;
  120. case "Delete":
  121. // TTL到了重新计算TTL,下发
  122. Console.WriteLine($"--- {e1.Key}:{e1.Value}:{e1.Type}");
  123. break;
  124. }
  125. }
  126. }
  127. private void WatchEvents(WatchResponse response)
  128. {
  129. response.Events.ToList<Mvccpb.Event>().ForEach(async e =>
  130. {
  131. try
  132. {
  133. switch (e.Type.ToString())
  134. {
  135. case "Put":
  136. // 获取时间点计算TTL
  137. Console.BackgroundColor = ConsoleColor.Blue;
  138. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  139. Console.BackgroundColor = ConsoleColor.Black;
  140. break;
  141. case "Delete":
  142. // TTL到了重新计算TTL,下发
  143. Console.BackgroundColor = ConsoleColor.Green;
  144. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  145. // var key = $"health_moniter/schedule_push/imei/{bp.Serialno}";
  146. var key = e.Kv.Key.ToStringUtf8();
  147. var imeiDel = key.Split('/')[3];
  148. var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false);
  149. if (string.IsNullOrWhiteSpace(schedule_push))
  150. {
  151. int systolicInc;
  152. int diastolicInc;
  153. int systolicRefValue;
  154. int diastolicRefValue;
  155. decimal systolicAvg;
  156. decimal diastolicAvg;
  157. int systolicMax = 0;
  158. int diastolicMax = 0;
  159. // 统计时间
  160. //DateTime endTime = DateTime.Now; //测试
  161. DateTime statStartTime = DateTime.Now;
  162. // 最小值
  163. int systolicMin = 0;
  164. int diastolicMin = 0;
  165. // 偏移参数
  166. var avgOffset = 0.25M;
  167. var systolicAvgOffset = avgOffset;
  168. var diastolicAvgOffset = avgOffset;
  169. // 最后一次下发值
  170. int lastPushSystolicInc = 0;
  171. int lastPushDiastolicInc = 0;
  172. var startTime = DateTime.Now;
  173. // 下发增量值
  174. #region 统计定时下发增量值
  175. //var last = await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc");
  176. //var ts = last?[0];
  177. // 最后一条血压数据
  178. var condition = $"serialno='{imeiDel}' order by last_update desc";
  179. var field = "last_row(*)";
  180. var lastHmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition, field);
  181. var lastHmBpParser = JsonConvert.DeserializeObject<ParseTDengineRestResponse<BloodPressureModel>>(lastHmBpResponse!);
  182. var lastHmBp = lastHmBpParser?.Select().FirstOrDefault();
  183. //if (lastHmBpParser?.Select()?.ToList().Count < 2)
  184. //{
  185. // _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 血压数据条目不足");
  186. // break;
  187. //}
  188. // 7 天有效数据
  189. if (lastHmBp?.Timestamp.AddDays(7) > DateTime.Now)
  190. {
  191. // 计算增量值
  192. condition = $"serialno='{imeiDel}' order by ts desc";
  193. var lastPushResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bp_push_ref_inc_value", condition, field);
  194. if (lastPushResponse == null)
  195. {
  196. _logger.LogInformation($"{imeiDel}--没有下发记录");
  197. break;
  198. }
  199. var lastPushParser = JsonConvert.DeserializeObject<ParseTDengineRestResponse<BloodPressurePushRefIncModel>>(lastPushResponse);
  200. var lastPush = lastPushParser!.Select().FirstOrDefault();
  201. // 有下推记录
  202. if (lastPush != null)
  203. {
  204. systolicRefValue = lastPush!.SystolicRefValue;
  205. diastolicRefValue = lastPush!.DiastolicRefValue;
  206. lastPushSystolicInc = lastPush!.SystolicIncValue;
  207. lastPushDiastolicInc = lastPush!.DiastolicIncValue;
  208. condition = $"ts between '{lastPush?.Timestamp:yyyy-MM-dd HH:mm:ss.fff}' and '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " +
  209. $"and serialno='{imeiDel}' " +
  210. $"and is_display = true";
  211. // 使用最近一次的下推时间作为统计的开始时间
  212. statStartTime= lastPush!.Timestamp;
  213. }
  214. // 没有下推记录(历史遗留数据),没有初始的测量值产生的平均值(测量值=平均值)
  215. else
  216. {
  217. #region 获取个人信息
  218. var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(Guid.NewGuid().ToString(), imeiDel).ConfigureAwait(false);
  219. //验证这个信息是否存在
  220. if (person == null || person?.Person.BornDate == null)
  221. {
  222. _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证个人信息,找不到个人信息,跳过此消息");
  223. break;
  224. }
  225. // 验证年龄是否在范围 (2 - 120)
  226. var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
  227. if (age < 2 || age > 120)
  228. {
  229. _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证年龄,不在范围 (2 - 120)岁,跳过此消息");
  230. break;
  231. }
  232. var gender = person?.Person.Gender == true ? 1 : 2;
  233. var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
  234. var height = SafeType.SafeDouble(person?.Person.Height!);
  235. var weight = SafeType.SafeDouble(person?.Person.Weight!);
  236. #endregion
  237. #region 初始化常规血压标定值标定值
  238. var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);
  239. //systolicRefValue = bpRef!.Systolic;//?
  240. //diastolicRefValue = bpRef!.Diastolic;//?
  241. #endregion
  242. systolicRefValue = bpRef!.Systolic;
  243. diastolicRefValue = bpRef!.Diastolic;
  244. lastPushSystolicInc = 0;
  245. lastPushDiastolicInc = 0;
  246. condition = $"ts <= '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " +
  247. $"and serialno='{imeiDel}' " +
  248. $"and is_display = true";
  249. }
  250. var hmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition);
  251. var hmBpParser = JsonConvert.DeserializeObject<ParseTDengineRestResponse<BloodPressureModel>>(hmBpResponse!);
  252. var hmBp = hmBpParser?.Select();
  253. //if (hmBp?.ToList().Count < 2)
  254. // 1. 判断数据样本数量
  255. if (hmBpParser!.Rows < 5)
  256. {
  257. _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 统计定时下发,计算增量值的数据条目不足:{hmBpParser!.Rows} < 5");
  258. _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} 没有足够的数据样本,不会定时下发");
  259. break;
  260. }
  261. // 没有下推记录重新计算统计时间
  262. if (lastPush == null)
  263. {
  264. var firstHmBp = hmBpParser?.Select(i=>i).OrderBy(i=>i.Timestamp).FirstOrDefault();
  265. statStartTime = firstHmBp!.Timestamp;
  266. }
  267. // NewMethod(systolicRefValue, hmBpParser);
  268. // 最大值
  269. //systolicMax = (int)hmBpParser?.Select(i => i.SystolicValue).Max()!;
  270. //diastolicMax = (int)hmBpParser?.Select(i => i.DiastolicValue).Max()!;
  271. //// 最小值
  272. //systolicMin = (int)hmBpParser?.Select(i => i.SystolicValue).Min()!;
  273. //diastolicMin = (int)hmBpParser?.Select(i => i.DiastolicValue).Min()!;
  274. //systolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.SystolicValue, SafeType.SafeInt(systolicRefValue!)))!;
  275. //diastolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.DiastolicValue, SafeType.SafeInt(diastolicRefValue!)))!;
  276. var avgs = _serviceTDengine.AverageAfterRemovingOneMinMaxRef(hmBpParser!);
  277. systolicAvg = avgs[0];
  278. diastolicAvg = avgs[1];
  279. // 2. 判断能否计算增量值
  280. if (systolicAvg.Equals(0))
  281. {
  282. _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算平均值" +
  283. $"\n currentSystolicAvg:{systolicAvg} -- lastPushSystolicInc:{lastPushSystolicInc}" +
  284. $"\n currentDiastolicInc:{diastolicAvg} -- lastPushDiastolicInc:{lastPushDiastolicInc}");
  285. _logger.LogInformation($"{imeiDel}--{nameof(Worker)} 没有足够的数据样本计算平均值,不会定时下发");
  286. break;
  287. }
  288. // 除最大值和最小值后的平均值与标定值差值少于4后(当天计算出该结果则也不产生增量调整),就不再进行增量值调整了。
  289. if (systolicAvg-systolicRefValue < 4)
  290. {
  291. _logger.LogInformation($"除最大值和最小值后的平均值与标定值差值少于4后,不再进行增量值调整");
  292. break;
  293. }
  294. // 增量值=(标定值-平均值)* 0.25
  295. var currentSystolicInc = (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
  296. var currentDiastolicInc = (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;
  297. // 累计增量
  298. systolicInc = currentSystolicInc + lastPushSystolicInc;
  299. diastolicInc = currentDiastolicInc + lastPushDiastolicInc;
  300. _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算增量值" +
  301. $"\n {imeiDel} -- systolicAvg:{systolicAvg}-- systolicInc:{systolicInc}-- currentSystolicInc:{currentSystolicInc} -- lastPushSystolicInc:{lastPushSystolicInc}" +
  302. $"\n {imeiDel} -- diastolicAvg:{diastolicAvg}-- diastolicInc:{diastolicInc} --currentDiastolicInc:{currentDiastolicInc} -- lastPushDiastolicInc:{lastPushDiastolicInc}");
  303. _logger.LogInformation($"{imeiDel}--{nameof(Worker)}-- 定时校准,发给设备的绝对增量值=(上次绝对增量值+新数据的增量值)");
  304. _logger.LogInformation($"{nameof(Worker)} 开启血压标定值下发: {_configBoodPressResolver.EnableBPRefPush}");
  305. if (_configBoodPressResolver.EnableBPRefPush)
  306. // if (false) // 临时关闭
  307. {
  308. BloodPressCalibrationConfigModel bpIncData = new()
  309. {
  310. Imei = imeiDel,
  311. SystolicRefValue = SafeType.SafeInt(((int)systolicRefValue!)), //收缩压标定值,值为0 表示不生效
  312. DiastolicRefValue = SafeType.SafeInt(((int)diastolicRefValue!)), //舒张压标定值,值为0表示不生效
  313. SystolicIncValue = SafeType.SafeInt(((int)systolicInc!)), //收缩压显示增量,值为0 表示不生效
  314. DiastolicIncValue = SafeType.SafeInt(((int)diastolicInc!)) //舒张压显示增量,值为0 表示不生效
  315. };
  316. //var pushedBP = await _serviceIotWebApi.SetBloodPressCalibrationConfigAsync(bpIncData).ConfigureAwait(false);
  317. var response = await _serviceIotWebApi.SetBloodPressCalibrationConfig2Async(bpIncData).ConfigureAwait(false);
  318. var pushedBP = response.Flag;
  319. if (pushedBP)
  320. {
  321. #region 保存下推记录 stb_hm_bp_push_ref_inc_value
  322. var sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{imeiDel.Substring(imeiDel.Length - 2)} " +
  323. $"USING health_monitor.stb_hm_bp_push_ref_inc_value " +
  324. $"TAGS ('{imeiDel.Substring(imeiDel.Length - 2)}') " +
  325. $"VALUES(" +
  326. $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'," +
  327. $"'{imeiDel}'," +
  328. $"{bpIncData.SystolicRefValue}," +
  329. $"{bpIncData.DiastolicRefValue}," +
  330. $"{bpIncData.SystolicIncValue}," +
  331. $"{bpIncData.DiastolicIncValue}," +
  332. $"{false}," +
  333. $"{systolicAvg}," +
  334. $"{diastolicAvg}," +
  335. $"{systolicAvgOffset}," +
  336. $"{diastolicAvgOffset}," +
  337. $"'{statStartTime:yyyy-MM-dd HH:mm:ss.fff}'," +
  338. $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'" +
  339. $")";
  340. _serviceTDengine.ExecuteInsertSQL(sql);
  341. #endregion
  342. #region 注册定时下发
  343. // 注册下次下推
  344. var endTime = DateTime.Now;
  345. #if DEBUG
  346. //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000);
  347. //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false);
  348. var interval = 0;
  349. // 获取当前时间
  350. DateTime now = DateTime.Now;
  351. // 计算距离下一个$interval天后的8点的时间间隔
  352. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval);
  353. TimeSpan timeUntilNextRun = nextRunTime - now;
  354. // 如果当前时间已经超过了8点,将等待到明天后的8点
  355. if (timeUntilNextRun < TimeSpan.Zero)
  356. {
  357. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1));
  358. nextRunTime += timeUntilNextRun;
  359. }
  360. // var ttl = timeUntilNextRun.TotalMilliseconds;
  361. long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000);
  362. var data = new
  363. {
  364. imei = imeiDel,
  365. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  366. ttl,
  367. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  368. };
  369. var result = JsonConvert.SerializeObject(data);
  370. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  371. #else
  372. // 每$interval天,晚上8点
  373. var interval = 1;
  374. // 获取当前时间
  375. DateTime now = DateTime.Now;
  376. // 计算距离下一个$interval天后的8点的时间间隔
  377. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 20, 0, 0).AddDays(interval);
  378. TimeSpan timeUntilNextRun = nextRunTime - now;
  379. // 如果当前时间已经超过了8点,将等待到明天后的8点
  380. if (timeUntilNextRun < TimeSpan.Zero)
  381. {
  382. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1));
  383. nextRunTime += timeUntilNextRun;
  384. }
  385. // var ttl = timeUntilNextRun.TotalMilliseconds;
  386. long ttl = (long)((timeUntilNextRun.TotalMilliseconds-(endTime-startTime).TotalMilliseconds)/1000);
  387. var data = new
  388. {
  389. imei = imeiDel,
  390. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  391. ttl,
  392. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  393. };
  394. var result = JsonConvert.SerializeObject(data);
  395. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  396. #endif
  397. #endregion
  398. }
  399. else
  400. {
  401. _logger.LogInformation($"错误响应,没有下推数据:{response.Message}");
  402. }
  403. }
  404. }
  405. else
  406. {
  407. _logger.LogInformation($"向{imeiDel}统计数据已经失效");
  408. }
  409. #endregion
  410. }
  411. break;
  412. }
  413. }
  414. catch (Exception ex)
  415. {
  416. _logger.LogInformation($"{nameof(WatchEvents)},出错: |{ex.Message}|{ex.StackTrace}");
  417. }
  418. });
  419. }
  420. }
  421. }