You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

323 lines
16KB

  1. using dotnet_etcd;
  2. using Etcdserverpb;
  3. using Google.Protobuf.WellKnownTypes;
  4. using HealthMonitor.Common;
  5. using HealthMonitor.Common.helper;
  6. using HealthMonitor.Core.Common.Extensions;
  7. using HealthMonitor.Service.Biz.db;
  8. using HealthMonitor.Service.Etcd;
  9. using HealthMonitor.Service.Sub;
  10. using Microsoft.AspNetCore.Mvc.RazorPages;
  11. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  12. using Newtonsoft.Json;
  13. using Newtonsoft.Json.Linq;
  14. using System.Reflection;
  15. using System.Threading.Channels;
  16. using TDengineDriver;
  17. using TDengineTMQ;
  18. namespace HealthMonitor.WebApi
  19. {
  20. public class Worker : BackgroundService
  21. {
  22. private readonly ILogger<Worker> _logger;
  23. private readonly IServiceProvider _services;
  24. private readonly TDengineDataSubcribe _tdEngineDataSubcribe;
  25. private readonly PackageProcess _processor;
  26. private readonly TDengineService _serviceTDengine;
  27. private readonly EtcdService _serviceEtcd;
  28. private readonly HttpHelper _httpHelper = default!;
  29. private CancellationTokenSource _tokenSource=default!;
  30. public Worker(ILogger<Worker> logger, IServiceProvider services, PackageProcess processor,TDengineDataSubcribe tdEngineDataSubcribe, TDengineService serviceDengine, HttpHelper httpHelper, EtcdService serviceEtcd)
  31. {
  32. _logger = logger;
  33. _tdEngineDataSubcribe = tdEngineDataSubcribe;
  34. _services = services;
  35. _processor = processor;
  36. _serviceEtcd = serviceEtcd;
  37. _serviceTDengine = serviceDengine;
  38. _httpHelper = httpHelper;
  39. }
  40. public override Task StartAsync(CancellationToken cancellationToken)
  41. {
  42. _logger.LogInformation("------StartAsync");
  43. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  44. // 创建消费者
  45. // _tdEngineDataSubcribe.CreateConsumer();
  46. return base.StartAsync(cancellationToken);
  47. }
  48. public override Task StopAsync(CancellationToken cancellationToken)
  49. {
  50. _logger.LogInformation("------StopAsync");
  51. _tokenSource.Cancel(); //停止工作线程
  52. // 关闭消费者
  53. // _tdEngineDataSubcribe.CloseConsumer();
  54. return base.StopAsync(cancellationToken);
  55. }
  56. protected override Task ExecuteAsync(CancellationToken stoppingToken)
  57. {
  58. // var processor = _services.GetService<PackageProcess>();
  59. TaskFactory factory = new(_tokenSource.Token);
  60. factory.StartNew(async () =>
  61. {
  62. if (_tokenSource.IsCancellationRequested)
  63. _logger.LogWarning("Worker exit");
  64. _logger.LogInformation("------ResolveAsync");
  65. while (!_tokenSource.IsCancellationRequested)
  66. {
  67. //
  68. await _processor.ResolveAsync().ConfigureAwait(false);
  69. // await _tdEngineDataSubcribe.ProcessMsg();
  70. }
  71. }, TaskCreationOptions.LongRunning);
  72. factory.StartNew(() =>
  73. {
  74. _logger.LogInformation("------_tdEngineDataSubcribe");
  75. while (!_tokenSource.IsCancellationRequested)
  76. {
  77. _tdEngineDataSubcribe.BeginListen(_tokenSource.Token);
  78. }
  79. }, TaskCreationOptions.LongRunning);
  80. Task.Run(() =>
  81. _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents)
  82. , stoppingToken);
  83. // watch
  84. //factory.StartNew(() =>
  85. //{
  86. // while (!_tokenSource.IsCancellationRequested)
  87. // {
  88. // //_serviceEtcd.WacthKeysWithPrefixAsync($"health_moniter/schedule_push", watchEvents => WatchEvents(watchEvents));
  89. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  90. // }
  91. //}, TaskCreationOptions.LongRunning);
  92. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  93. return Task.Delay(1000, _tokenSource.Token);
  94. }
  95. private void WatchEvents(WatchEvent[] response)
  96. {
  97. foreach (WatchEvent e1 in response)
  98. {
  99. // Console.WriteLine($"{nameof(WatchEventsAsync)} --- {e1.Key}:{e1.Value}:{e1.Type}");
  100. switch (e1.Type.ToString())
  101. {
  102. //case "Put":
  103. // // 获取时间点计算TTL
  104. // break;
  105. case "Delete":
  106. // TTL到了重新计算TTL,下发
  107. Console.WriteLine($"--- {e1.Key}:{e1.Value}:{e1.Type}");
  108. break;
  109. }
  110. }
  111. }
  112. private void WatchEvents(WatchResponse response)
  113. {
  114. response.Events.ToList().ForEach(async e =>
  115. {
  116. switch (e.Type.ToString())
  117. {
  118. case "Put":
  119. // 获取时间点计算TTL
  120. Console.BackgroundColor = ConsoleColor.Blue;
  121. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  122. Console.BackgroundColor = ConsoleColor.Black;
  123. break;
  124. case "Delete":
  125. // TTL到了重新计算TTL,下发
  126. Console.BackgroundColor = ConsoleColor.Green;
  127. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  128. // var key = $"health_moniter/schedule_push/imei/{bp.Serialno}";
  129. var key = e.Kv.Key.ToStringUtf8();
  130. var imeiDel = key.Split('/')[3];
  131. var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false);
  132. if (string.IsNullOrWhiteSpace(schedule_push))
  133. {
  134. var startTime=DateTime.Now;
  135. // 下发增量值
  136. #region 定时下发增量值
  137. var last= await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc");
  138. var ts = last?[0];
  139. if (DateTime.TryParse(ts?.ToString(),out DateTime newTs))
  140. {
  141. // 7 天有效数据
  142. if (newTs.AddDays(7) > DateTime.Now)
  143. {
  144. Console.WriteLine(ts);
  145. var systolic_ref_value = last?[5];
  146. var diastolic_ref_value = last?[12];
  147. var systolic_inc_value = last?[10];
  148. var diastolic_inc_value = last?[17];
  149. var bpData = new
  150. {
  151. imei = imeiDel,
  152. //systolicCalibrationValue = last?[5], //收缩压标定值,值为0 表示不生效
  153. //diastolicCalibrationValue = last?[12], //舒张压标定值,值为0表示不生效
  154. //systolicIncValue = last?[10], //收缩压显示增量,值为0 表示不生效
  155. //diastolicIncValue = last?[17] //舒张压显示增量,值为0 表示不生效
  156. systolicCalibrationValue = systolic_ref_value, //收缩压标定值,值为0 表示不生效
  157. diastolicCalibrationValue = diastolic_ref_value, //舒张压标定值,值为0表示不生效
  158. systolicIncValue = systolic_inc_value, //收缩压显示增量,值为0 表示不生效
  159. diastolicIncValue = diastolic_inc_value //舒张压显示增量,值为0 表示不生效
  160. };
  161. var str = JsonConvert.SerializeObject(bpData);
  162. var url = $"http://id.ssjlai.com/webapi/api/Command/SetBloodPressCalibrationConfig";
  163. List<KeyValuePair<string, string>> headers = new()
  164. {
  165. new KeyValuePair<string, string>("AuthKey", "key1")
  166. };
  167. var res = await _httpHelper.HttpToPostAsync(url, bpData, headers).ConfigureAwait(false);
  168. _logger.LogInformation($"向{imeiDel}下发增量值数据:{str},响应:{res}");
  169. var resJToken = JsonConvert.DeserializeObject(res!) as JToken;
  170. if (resJToken!["message"]!.ToString().Equals("ok"))
  171. {
  172. #region 保存下推记录 stb_hm_bp_push_ref_inc_value
  173. var sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{imeiDel.Substring(imeiDel.Length - 2)} " +
  174. $"USING health_monitor.stb_hm_bp_push_ref_inc_value " +
  175. $"TAGS ('{imeiDel.Substring(imeiDel.Length - 2)}') " +
  176. $"VALUES(" +
  177. $"'{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}'," +
  178. $"'{imeiDel}'," +
  179. $"{systolic_ref_value}," +
  180. $"{diastolic_ref_value}," +
  181. $"{systolic_inc_value}," +
  182. $"{diastolic_inc_value}," +
  183. $"{false})";
  184. _serviceTDengine.ExecuteInsertSQL(sql);
  185. #endregion
  186. // 注册下次下推
  187. var endTime = DateTime.Now;
  188. #if DEBUG
  189. //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000);
  190. //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false);
  191. var interval = 0;
  192. // 获取当前时间
  193. DateTime now = DateTime.Now;
  194. // 计算距离下一个$interval天后的8点的时间间隔
  195. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval);
  196. TimeSpan timeUntilNextRun = nextRunTime - now;
  197. // 如果当前时间已经超过了8点,将等待到明天后的8点
  198. if (timeUntilNextRun < TimeSpan.Zero)
  199. {
  200. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1));
  201. nextRunTime += timeUntilNextRun;
  202. }
  203. // var ttl = timeUntilNextRun.TotalMilliseconds;
  204. long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000);
  205. var data = new
  206. {
  207. imei = imeiDel,
  208. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  209. ttl,
  210. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  211. };
  212. var result = JsonConvert.SerializeObject(data);
  213. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  214. #else
  215. // 每$interval天,晚上8点
  216. var interval = 1;
  217. // 获取当前时间
  218. DateTime now = DateTime.Now;
  219. // 计算距离下一个$interval天后的8点的时间间隔
  220. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 20, 0, 0).AddDays(interval);
  221. TimeSpan timeUntilNextRun = nextRunTime - now;
  222. // 如果当前时间已经超过了8点,将等待到明天后的8点
  223. if (timeUntilNextRun < TimeSpan.Zero)
  224. {
  225. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1));
  226. nextRunTime += timeUntilNextRun;
  227. }
  228. // var ttl = timeUntilNextRun.TotalMilliseconds;
  229. long ttl = (long)((timeUntilNextRun.TotalMilliseconds-(endTime-startTime).TotalMilliseconds)/1000);
  230. var data = new
  231. {
  232. imei = imeiDel,
  233. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  234. ttl,
  235. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  236. };
  237. var result = JsonConvert.SerializeObject(data);
  238. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  239. #endif
  240. }
  241. else
  242. {
  243. _logger.LogInformation($"错误响应,没有下推数据");
  244. }
  245. Console.WriteLine(str);
  246. }
  247. }
  248. else
  249. {
  250. _logger.LogInformation($"向{imeiDel}准备下推的数据已经失效ts,{ts}");
  251. }
  252. #endregion
  253. //if (imeiDel.Equals("861281060086216"))
  254. //{
  255. // var result = await _httpHelper.HttpToPostAsync(url, data, headers).ConfigureAwait(false);
  256. // _logger.LogInformation($"向{imeiDel}下发增量值数据:{str},响应:{result}");
  257. // Console.WriteLine(str);
  258. //}
  259. Console.BackgroundColor = ConsoleColor.Black;
  260. }
  261. break;
  262. }
  263. });
  264. //if (response.Events.Count == 0)
  265. //{
  266. // Console.WriteLine(response);
  267. //}
  268. //else
  269. //{
  270. // Console.WriteLine($"{response.Events[0].Kv.Key.ToStringUtf8()}:{response.Events.Kv.Value.ToStringUtf8()}");
  271. //}
  272. }
  273. }
  274. }