您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

387 行
20KB

  1. using dotnet_etcd;
  2. using Etcdserverpb;
  3. using Google.Protobuf.WellKnownTypes;
  4. using HealthMonitor.Common;
  5. using HealthMonitor.Common.helper;
  6. using HealthMonitor.Core.Common.Extensions;
  7. using HealthMonitor.Service.Biz.db;
  8. using HealthMonitor.Service.Etcd;
  9. using HealthMonitor.Service.Sub;
  10. using Microsoft.AspNetCore.Mvc.RazorPages;
  11. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  12. using Newtonsoft.Json;
  13. using Newtonsoft.Json.Linq;
  14. using System.Reflection;
  15. using System.Threading.Channels;
  16. using TDengineDriver;
  17. using TDengineTMQ;
  18. namespace HealthMonitor.WebApi
  19. {
  20. public class Worker : BackgroundService
  21. {
  22. private readonly ILogger<Worker> _logger;
  23. private readonly IServiceProvider _services;
  24. private readonly TDengineDataSubcribe _tdEngineDataSubcribe;
  25. private readonly PackageProcess _processor;
  26. private readonly TDengineService _serviceTDengine;
  27. private readonly EtcdService _serviceEtcd;
  28. private readonly HttpHelper _httpHelper = default!;
  29. private CancellationTokenSource _tokenSource=default!;
  30. public Worker(ILogger<Worker> logger, IServiceProvider services, PackageProcess processor,TDengineDataSubcribe tdEngineDataSubcribe, TDengineService serviceDengine, HttpHelper httpHelper, EtcdService serviceEtcd)
  31. {
  32. _logger = logger;
  33. _tdEngineDataSubcribe = tdEngineDataSubcribe;
  34. _services = services;
  35. _processor = processor;
  36. _serviceEtcd = serviceEtcd;
  37. _serviceTDengine = serviceDengine;
  38. _httpHelper = httpHelper;
  39. }
  40. public override Task StartAsync(CancellationToken cancellationToken)
  41. {
  42. _logger.LogInformation("------StartAsync");
  43. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  44. // 创建消费者
  45. // _tdEngineDataSubcribe.CreateConsumer();
  46. return base.StartAsync(cancellationToken);
  47. }
  48. public override Task StopAsync(CancellationToken cancellationToken)
  49. {
  50. _logger.LogInformation("------StopAsync");
  51. _tokenSource.Cancel(); //停止工作线程
  52. // 关闭消费者
  53. // _tdEngineDataSubcribe.CloseConsumer();
  54. return base.StopAsync(cancellationToken);
  55. }
  56. protected override Task ExecuteAsync(CancellationToken stoppingToken)
  57. {
  58. // var processor = _services.GetService<PackageProcess>();
  59. TaskFactory factory = new(_tokenSource.Token);
  60. factory.StartNew(async () =>
  61. {
  62. if (_tokenSource.IsCancellationRequested)
  63. _logger.LogWarning("Worker exit");
  64. _logger.LogInformation("------ResolveAsync");
  65. while (!_tokenSource.IsCancellationRequested)
  66. {
  67. //
  68. await _processor.ResolveAsync().ConfigureAwait(false);
  69. // await _tdEngineDataSubcribe.ProcessMsg();
  70. }
  71. }, TaskCreationOptions.LongRunning);
  72. factory.StartNew(() =>
  73. {
  74. _logger.LogInformation("------_tdEngineDataSubcribe");
  75. while (!_tokenSource.IsCancellationRequested)
  76. {
  77. _tdEngineDataSubcribe.BeginListen(_tokenSource.Token);
  78. }
  79. }, TaskCreationOptions.LongRunning);
  80. Task.Run(() =>
  81. _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents)
  82. , stoppingToken);
  83. // watch
  84. //factory.StartNew(() =>
  85. //{
  86. // while (!_tokenSource.IsCancellationRequested)
  87. // {
  88. // //_serviceEtcd.WacthKeysWithPrefixAsync($"health_moniter/schedule_push", watchEvents => WatchEvents(watchEvents));
  89. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  90. // }
  91. //}, TaskCreationOptions.LongRunning);
  92. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  93. return Task.Delay(1000, _tokenSource.Token);
  94. }
  95. private void WatchEvents(WatchEvent[] response)
  96. {
  97. foreach (WatchEvent e1 in response)
  98. {
  99. // Console.WriteLine($"{nameof(WatchEventsAsync)} --- {e1.Key}:{e1.Value}:{e1.Type}");
  100. switch (e1.Type.ToString())
  101. {
  102. //case "Put":
  103. // // 获取时间点计算TTL
  104. // break;
  105. case "Delete":
  106. // TTL到了重新计算TTL,下发
  107. Console.WriteLine($"--- {e1.Key}:{e1.Value}:{e1.Type}");
  108. break;
  109. }
  110. }
  111. }
  112. private void WatchEvents(WatchResponse response)
  113. {
  114. response.Events.ToList().ForEach(async e =>
  115. {
  116. switch (e.Type.ToString())
  117. {
  118. case "Put":
  119. // 获取时间点计算TTL
  120. Console.BackgroundColor = ConsoleColor.Blue;
  121. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  122. Console.BackgroundColor = ConsoleColor.Black;
  123. break;
  124. case "Delete":
  125. // TTL到了重新计算TTL,下发
  126. Console.BackgroundColor = ConsoleColor.Green;
  127. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  128. // var key = $"health_moniter/schedule_push/imei/{bp.Serialno}";
  129. var key = e.Kv.Key.ToStringUtf8();
  130. var imeiDel = key.Split('/')[3];
  131. var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false);
  132. if (string.IsNullOrWhiteSpace(schedule_push))
  133. {
  134. var startTime =DateTime.Now;
  135. // 下发增量值
  136. #region 定时下发增量值
  137. var last= await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc");
  138. var ts = last?[0];
  139. if (DateTime.TryParse(ts?.ToString(),out DateTime newTs))
  140. {
  141. // 7 天有效数据
  142. if (newTs.AddDays(7) > DateTime.Now)
  143. {
  144. Console.WriteLine(ts);
  145. var systolic_ref_value = last?[5];
  146. var diastolic_ref_value = last?[12];
  147. var systolic_inc_value = last?[10];
  148. var diastolic_inc_value = last?[17];
  149. #region 判断是否初始化remark
  150. var imei = imeiDel;
  151. // 判断是否初始化remark
  152. var last_push = await _serviceTDengine.GetLastAsync("stb_hm_bp_push_ref_inc_value", $"serialno='{imei}' order by ts desc");
  153. if (last_push?.Count == 0)
  154. {
  155. var dataServiceBaseUrl = $"https://id.ssjlai.com/data";
  156. var DataServicePersionGet = $"{dataServiceBaseUrl}/api/GpsCard/GpsPerson/GetFirst";
  157. List<KeyValuePair<string, string>> Dataheaders = new()
  158. {
  159. new KeyValuePair<string, string>("requestId", $"{imei}")
  160. };
  161. var dataPersion = new
  162. {
  163. filters = new List<object>() { new { key = "serialno", value = $"{imei}", valueType = "string", @operator = "Equal" } },
  164. orderBys = new List<object>() { new { key = "serialno", isDesc = true } }
  165. };
  166. var resRef = await _httpHelper.HttpToPostAsync(DataServicePersionGet, dataPersion, Dataheaders).ConfigureAwait(false);
  167. var resObj = JsonConvert.DeserializeObject(resRef!) as JToken;
  168. var remark = resObj?["remarks"]?.ToString();
  169. // 修改数据库
  170. if (string.IsNullOrWhiteSpace(remark))
  171. {
  172. var newRemarkData = new
  173. {
  174. imei,
  175. time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
  176. commandValue = new
  177. {
  178. systolicCalibrationValue = systolic_ref_value, //收缩压标定值,值为0 表示不生效
  179. diastolicCalibrationValue = diastolic_ref_value, //舒张压标定值,值为0表示不生效
  180. systolicIncValue = 0, //收缩压显示增量,值为0 表示不生效
  181. diastolicIncValue = 0 //舒张压显示增量,值为0 表示不生效
  182. }
  183. };
  184. resObj!["remarks"] = $"is_blood_press:{JsonConvert.SerializeObject(newRemarkData)}|";
  185. var DataServicePersionUpdate = $"{dataServiceBaseUrl}/api/GpsCard/GpsPerson/Update";
  186. var resUpdate = await _httpHelper.HttpToPostAsync(DataServicePersionUpdate, resObj, Dataheaders).ConfigureAwait(false);
  187. _logger.LogInformation($"更新Person数据库|{resUpdate}");
  188. //更新缓存
  189. List<KeyValuePair<string, string>> headersCache = new()
  190. {
  191. new KeyValuePair<string, string>("AuthKey", "key1")
  192. };
  193. var cacheUrl = $"http://id.ssjlai.com/webapi/api/Device/UpdatePersonInfoCache?imei={imei}";
  194. var updateCache = await _httpHelper.HttpToGetAsync(cacheUrl, headersCache);
  195. _logger.LogInformation($"更新Person缓存|{updateCache}");
  196. }
  197. }
  198. #endregion
  199. var bpData = new
  200. {
  201. imei = imeiDel,
  202. //systolicCalibrationValue = last?[5], //收缩压标定值,值为0 表示不生效
  203. //diastolicCalibrationValue = last?[12], //舒张压标定值,值为0表示不生效
  204. //systolicIncValue = last?[10], //收缩压显示增量,值为0 表示不生效
  205. //diastolicIncValue = last?[17] //舒张压显示增量,值为0 表示不生效
  206. systolicCalibrationValue = systolic_ref_value, //收缩压标定值,值为0 表示不生效
  207. diastolicCalibrationValue = diastolic_ref_value, //舒张压标定值,值为0表示不生效
  208. systolicIncValue = systolic_inc_value, //收缩压显示增量,值为0 表示不生效
  209. diastolicIncValue = diastolic_inc_value //舒张压显示增量,值为0 表示不生效
  210. };
  211. var str = JsonConvert.SerializeObject(bpData);
  212. var url = $"http://id.ssjlai.com/webapi/api/Command/SetBloodPressCalibrationConfig";
  213. List<KeyValuePair<string, string>> headers = new()
  214. {
  215. new KeyValuePair<string, string>("AuthKey", "key1")
  216. };
  217. var res = await _httpHelper.HttpToPostAsync(url, bpData, headers).ConfigureAwait(false);
  218. _logger.LogInformation($"向{imeiDel}下发增量值数据:{str},响应:{res}");
  219. var resJToken = JsonConvert.DeserializeObject(res!) as JToken;
  220. if (resJToken!["message"]!.ToString().Equals("ok"))
  221. {
  222. #region 保存下推记录 stb_hm_bp_push_ref_inc_value
  223. var sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{imeiDel.Substring(imeiDel.Length - 2)} " +
  224. $"USING health_monitor.stb_hm_bp_push_ref_inc_value " +
  225. $"TAGS ('{imeiDel.Substring(imeiDel.Length - 2)}') " +
  226. $"VALUES(" +
  227. $"'{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}'," +
  228. $"'{imeiDel}'," +
  229. $"{systolic_ref_value}," +
  230. $"{diastolic_ref_value}," +
  231. $"{systolic_inc_value}," +
  232. $"{diastolic_inc_value}," +
  233. $"{false})";
  234. _serviceTDengine.ExecuteInsertSQL(sql);
  235. #endregion
  236. // 注册下次下推
  237. var endTime = DateTime.Now;
  238. #if DEBUG
  239. //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000);
  240. //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false);
  241. var interval = 0;
  242. // 获取当前时间
  243. DateTime now = DateTime.Now;
  244. // 计算距离下一个$interval天后的8点的时间间隔
  245. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval);
  246. TimeSpan timeUntilNextRun = nextRunTime - now;
  247. // 如果当前时间已经超过了8点,将等待到明天后的8点
  248. if (timeUntilNextRun < TimeSpan.Zero)
  249. {
  250. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1));
  251. nextRunTime += timeUntilNextRun;
  252. }
  253. // var ttl = timeUntilNextRun.TotalMilliseconds;
  254. long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000);
  255. var data = new
  256. {
  257. imei = imeiDel,
  258. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  259. ttl,
  260. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  261. };
  262. var result = JsonConvert.SerializeObject(data);
  263. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  264. #else
  265. // 每$interval天,晚上8点
  266. var interval = 1;
  267. // 获取当前时间
  268. DateTime now = DateTime.Now;
  269. // 计算距离下一个$interval天后的8点的时间间隔
  270. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 20, 0, 0).AddDays(interval);
  271. TimeSpan timeUntilNextRun = nextRunTime - now;
  272. // 如果当前时间已经超过了8点,将等待到明天后的8点
  273. if (timeUntilNextRun < TimeSpan.Zero)
  274. {
  275. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1));
  276. nextRunTime += timeUntilNextRun;
  277. }
  278. // var ttl = timeUntilNextRun.TotalMilliseconds;
  279. long ttl = (long)((timeUntilNextRun.TotalMilliseconds-(endTime-startTime).TotalMilliseconds)/1000);
  280. var data = new
  281. {
  282. imei = imeiDel,
  283. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  284. ttl,
  285. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  286. };
  287. var result = JsonConvert.SerializeObject(data);
  288. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  289. #endif
  290. }
  291. else
  292. {
  293. _logger.LogInformation($"错误响应,没有下推数据");
  294. }
  295. Console.WriteLine(str);
  296. }
  297. }
  298. else
  299. {
  300. _logger.LogInformation($"向{imeiDel}准备下推的数据已经失效ts,{ts}");
  301. }
  302. #endregion
  303. //if (imeiDel.Equals("861281060086216"))
  304. //{
  305. // var result = await _httpHelper.HttpToPostAsync(url, data, headers).ConfigureAwait(false);
  306. // _logger.LogInformation($"向{imeiDel}下发增量值数据:{str},响应:{result}");
  307. // Console.WriteLine(str);
  308. //}
  309. Console.BackgroundColor = ConsoleColor.Black;
  310. }
  311. break;
  312. }
  313. });
  314. //if (response.Events.Count == 0)
  315. //{
  316. // Console.WriteLine(response);
  317. //}
  318. //else
  319. //{
  320. // Console.WriteLine($"{response.Events[0].Kv.Key.ToStringUtf8()}:{response.Events.Kv.Value.ToStringUtf8()}");
  321. //}
  322. }
  323. }
  324. }