Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

407 lines
22KB

  1. using dotnet_etcd;
  2. using Etcdserverpb;
  3. using Google.Protobuf.WellKnownTypes;
  4. using HealthMonitor.Common;
  5. using HealthMonitor.Common.helper;
  6. using HealthMonitor.Core.Common.Extensions;
  7. using HealthMonitor.Model.Service;
  8. using HealthMonitor.Service.Biz;
  9. using HealthMonitor.Service.Biz.db;
  10. using HealthMonitor.Service.Etcd;
  11. using HealthMonitor.Service.Sub;
  12. using Microsoft.AspNetCore.Mvc.RazorPages;
  13. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  14. using Newtonsoft.Json;
  15. using Newtonsoft.Json.Linq;
  16. using System.Reflection;
  17. using System.Threading.Channels;
  18. using TDengineDriver;
  19. using TDengineTMQ;
  20. namespace HealthMonitor.WebApi
  21. {
  22. public class Worker : BackgroundService
  23. {
  24. private readonly ILogger<Worker> _logger;
  25. private readonly IServiceProvider _services;
  26. private readonly TDengineDataSubcribe _tdEngineDataSubcribe;
  27. private readonly PackageProcess _processor;
  28. private readonly TDengineService _serviceTDengine;
  29. private readonly EtcdService _serviceEtcd;
  30. private readonly HttpHelper _httpHelper = default!;
  31. private readonly IotWebApiService _serviceIotWebApi;
  32. private CancellationTokenSource _tokenSource=default!;
  33. public Worker(ILogger<Worker> logger, IServiceProvider services, IotWebApiService iotWebApiService, PackageProcess processor,TDengineDataSubcribe tdEngineDataSubcribe, TDengineService serviceDengine, HttpHelper httpHelper, EtcdService serviceEtcd)
  34. {
  35. _logger = logger;
  36. _tdEngineDataSubcribe = tdEngineDataSubcribe;
  37. _services = services;
  38. _serviceIotWebApi = iotWebApiService;
  39. _processor = processor;
  40. _serviceEtcd = serviceEtcd;
  41. _serviceTDengine = serviceDengine;
  42. _httpHelper = httpHelper;
  43. }
  44. public override Task StartAsync(CancellationToken cancellationToken)
  45. {
  46. _logger.LogInformation("------StartAsync");
  47. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  48. // 创建消费者
  49. // _tdEngineDataSubcribe.CreateConsumer();
  50. return base.StartAsync(cancellationToken);
  51. }
  52. public override Task StopAsync(CancellationToken cancellationToken)
  53. {
  54. _logger.LogInformation("------StopAsync");
  55. _tokenSource.Cancel(); //停止工作线程
  56. // 关闭消费者
  57. // _tdEngineDataSubcribe.CloseConsumer();
  58. return base.StopAsync(cancellationToken);
  59. }
  60. protected override Task ExecuteAsync(CancellationToken stoppingToken)
  61. {
  62. // var processor = _services.GetService<PackageProcess>();
  63. TaskFactory factory = new(_tokenSource.Token);
  64. factory.StartNew(async () =>
  65. {
  66. if (_tokenSource.IsCancellationRequested)
  67. _logger.LogWarning("Worker exit");
  68. _logger.LogInformation("------ResolveAsync");
  69. while (!_tokenSource.IsCancellationRequested)
  70. {
  71. //
  72. await _processor.ResolveAsync().ConfigureAwait(false);
  73. // await _tdEngineDataSubcribe.ProcessMsg();
  74. }
  75. }, TaskCreationOptions.LongRunning);
  76. factory.StartNew(() =>
  77. {
  78. _logger.LogInformation("------_tdEngineDataSubcribe");
  79. while (!_tokenSource.IsCancellationRequested)
  80. {
  81. _tdEngineDataSubcribe.BeginListen(_tokenSource.Token);
  82. }
  83. }, TaskCreationOptions.LongRunning);
  84. Task.Run(() =>
  85. _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents)
  86. , stoppingToken);
  87. // watch
  88. //factory.StartNew(() =>
  89. //{
  90. // while (!_tokenSource.IsCancellationRequested)
  91. // {
  92. // //_serviceEtcd.WacthKeysWithPrefixAsync($"health_moniter/schedule_push", watchEvents => WatchEvents(watchEvents));
  93. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  94. // }
  95. //}, TaskCreationOptions.LongRunning);
  96. // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents);
  97. return Task.Delay(1000, _tokenSource.Token);
  98. }
  99. private void WatchEvents(WatchEvent[] response)
  100. {
  101. foreach (WatchEvent e1 in response)
  102. {
  103. // Console.WriteLine($"{nameof(WatchEventsAsync)} --- {e1.Key}:{e1.Value}:{e1.Type}");
  104. switch (e1.Type.ToString())
  105. {
  106. //case "Put":
  107. // // 获取时间点计算TTL
  108. // break;
  109. case "Delete":
  110. // TTL到了重新计算TTL,下发
  111. Console.WriteLine($"--- {e1.Key}:{e1.Value}:{e1.Type}");
  112. break;
  113. }
  114. }
  115. }
  116. private void WatchEvents(WatchResponse response)
  117. {
  118. response.Events.ToList().ForEach(async e =>
  119. {
  120. switch (e.Type.ToString())
  121. {
  122. case "Put":
  123. // 获取时间点计算TTL
  124. Console.BackgroundColor = ConsoleColor.Blue;
  125. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  126. Console.BackgroundColor = ConsoleColor.Black;
  127. break;
  128. case "Delete":
  129. // TTL到了重新计算TTL,下发
  130. Console.BackgroundColor = ConsoleColor.Green;
  131. Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}");
  132. // var key = $"health_moniter/schedule_push/imei/{bp.Serialno}";
  133. var key = e.Kv.Key.ToStringUtf8();
  134. var imeiDel = key.Split('/')[3];
  135. var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false);
  136. if (string.IsNullOrWhiteSpace(schedule_push))
  137. {
  138. var startTime =DateTime.Now;
  139. // 下发增量值
  140. #region 定时下发增量值
  141. var last= await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc");
  142. var ts = last?[0];
  143. if (DateTime.TryParse(ts?.ToString(),out DateTime newTs))
  144. {
  145. // 7 天有效数据
  146. if (newTs.AddDays(7) > DateTime.Now)
  147. {
  148. Console.WriteLine(ts);
  149. var systolic_ref_value = last?[5];
  150. var diastolic_ref_value = last?[12];
  151. var systolic_inc_value = last?[10];
  152. var diastolic_inc_value = last?[17];
  153. #region 判断是否初始化remark
  154. // 判断是否初始化remark
  155. /**
  156. *
  157. var imei = imeiDel;
  158. var last_push = await _serviceTDengine.GetLastAsync("stb_hm_bp_push_ref_inc_value", $"serialno='{imei}' order by ts desc");
  159. if (last_push?.Count == 0)
  160. {
  161. var dataServiceBaseUrl = $"https://id.ssjlai.com/data";
  162. var DataServicePersionGet = $"{dataServiceBaseUrl}/api/GpsCard/GpsPerson/GetFirst";
  163. List<KeyValuePair<string, string>> Dataheaders = new()
  164. {
  165. new KeyValuePair<string, string>("requestId", $"{imei}")
  166. };
  167. var dataPersion = new
  168. {
  169. filters = new List<object>() { new { key = "serialno", value = $"{imei}", valueType = "string", @operator = "Equal" } },
  170. orderBys = new List<object>() { new { key = "serialno", isDesc = true } }
  171. };
  172. var resRef = await _httpHelper.HttpToPostAsync(DataServicePersionGet, dataPersion, Dataheaders).ConfigureAwait(false);
  173. var resObj = JsonConvert.DeserializeObject(resRef!) as JToken;
  174. var remark = resObj?["remarks"]?.ToString();
  175. // 修改数据库
  176. if (string.IsNullOrWhiteSpace(remark))
  177. {
  178. var newRemarkData = new
  179. {
  180. imei,
  181. time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
  182. commandValue = new
  183. {
  184. systolicCalibrationValue = systolic_ref_value, //收缩压标定值,值为0 表示不生效
  185. diastolicCalibrationValue = diastolic_ref_value, //舒张压标定值,值为0表示不生效
  186. systolicIncValue = 0, //收缩压显示增量,值为0 表示不生效
  187. diastolicIncValue = 0 //舒张压显示增量,值为0 表示不生效
  188. }
  189. };
  190. resObj!["remarks"] = $"is_blood_press:{JsonConvert.SerializeObject(newRemarkData)}|";
  191. var DataServicePersionUpdate = $"{dataServiceBaseUrl}/api/GpsCard/GpsPerson/Update";
  192. var resUpdate = await _httpHelper.HttpToPostAsync(DataServicePersionUpdate, resObj, Dataheaders).ConfigureAwait(false);
  193. _logger.LogInformation($"更新Person数据库|{resUpdate}");
  194. //更新缓存
  195. List<KeyValuePair<string, string>> headersCache = new()
  196. {
  197. new KeyValuePair<string, string>("AuthKey", "key1")
  198. };
  199. var cacheUrl = $"http://id.ssjlai.com/webapi/api/Device/UpdatePersonInfoCache?imei={imei}";
  200. var updateCache = await _httpHelper.HttpToGetAsync(cacheUrl, headersCache);
  201. _logger.LogInformation($"更新Person缓存|{updateCache}");
  202. }
  203. }
  204. */
  205. #endregion
  206. //var bpData = new
  207. //{
  208. // imei = imeiDel,
  209. // //systolicCalibrationValue = last?[5], //收缩压标定值,值为0 表示不生效
  210. // //diastolicCalibrationValue = last?[12], //舒张压标定值,值为0表示不生效
  211. // //systolicIncValue = last?[10], //收缩压显示增量,值为0 表示不生效
  212. // //diastolicIncValue = last?[17] //舒张压显示增量,值为0 表示不生效
  213. // systolicCalibrationValue = systolic_ref_value, //收缩压标定值,值为0 表示不生效
  214. // diastolicCalibrationValue = diastolic_ref_value, //舒张压标定值,值为0表示不生效
  215. // systolicIncValue = systolic_inc_value, //收缩压显示增量,值为0 表示不生效
  216. // diastolicIncValue = diastolic_inc_value //舒张压显示增量,值为0 表示不生效
  217. //};
  218. //var str = JsonConvert.SerializeObject(bpData);
  219. //var url = $"http://id.ssjlai.com/webapi/api/Command/SetBloodPressCalibrationConfig";
  220. //List<KeyValuePair<string, string>> headers = new()
  221. //{
  222. // new KeyValuePair<string, string>("AuthKey", "key1")
  223. //};
  224. //var res = await _httpHelper.HttpToPostAsync(url, bpData, headers).ConfigureAwait(false);
  225. //_logger.LogInformation($"向{imeiDel}下发增量值数据:{str},响应:{res}");
  226. //var resJToken = JsonConvert.DeserializeObject(res!) as JToken;
  227. //if (resJToken!["message"]!.ToString().Equals("ok"))
  228. BloodPressCalibrationConfigModel bpIncData = new()
  229. {
  230. Imei = imeiDel,
  231. SystolicRefValue = SafeType.SafeInt(((int)systolic_ref_value!)), //收缩压标定值,值为0 表示不生效
  232. DiastolicRefValue = SafeType.SafeInt(((int)diastolic_ref_value!)), //舒张压标定值,值为0表示不生效
  233. SystolicIncValue = SafeType.SafeInt(((int)systolic_inc_value!)), //收缩压显示增量,值为0 表示不生效
  234. DiastolicIncValue = SafeType.SafeInt(((int)diastolic_inc_value!)) //舒张压显示增量,值为0 表示不生效
  235. };
  236. var pushedBP = await _serviceIotWebApi.SetBloodPressCalibrationConfigAsync(bpIncData).ConfigureAwait(false);
  237. if (pushedBP)
  238. {
  239. #region 保存下推记录 stb_hm_bp_push_ref_inc_value
  240. var sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{imeiDel.Substring(imeiDel.Length - 2)} " +
  241. $"USING health_monitor.stb_hm_bp_push_ref_inc_value " +
  242. $"TAGS ('{imeiDel.Substring(imeiDel.Length - 2)}') " +
  243. $"VALUES(" +
  244. $"'{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}'," +
  245. $"'{imeiDel}'," +
  246. $"{systolic_ref_value}," +
  247. $"{diastolic_ref_value}," +
  248. $"{systolic_inc_value}," +
  249. $"{diastolic_inc_value}," +
  250. $"{false})";
  251. _serviceTDengine.ExecuteInsertSQL(sql);
  252. #endregion
  253. // 注册下次下推
  254. var endTime = DateTime.Now;
  255. #if DEBUG
  256. //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000);
  257. //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false);
  258. var interval = 0;
  259. // 获取当前时间
  260. DateTime now = DateTime.Now;
  261. // 计算距离下一个$interval天后的8点的时间间隔
  262. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval);
  263. TimeSpan timeUntilNextRun = nextRunTime - now;
  264. // 如果当前时间已经超过了8点,将等待到明天后的8点
  265. if (timeUntilNextRun < TimeSpan.Zero)
  266. {
  267. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1));
  268. nextRunTime += timeUntilNextRun;
  269. }
  270. // var ttl = timeUntilNextRun.TotalMilliseconds;
  271. long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000);
  272. var data = new
  273. {
  274. imei = imeiDel,
  275. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  276. ttl,
  277. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  278. };
  279. var result = JsonConvert.SerializeObject(data);
  280. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  281. #else
  282. // 每$interval天,晚上8点
  283. var interval = 1;
  284. // 获取当前时间
  285. DateTime now = DateTime.Now;
  286. // 计算距离下一个$interval天后的8点的时间间隔
  287. DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 20, 0, 0).AddDays(interval);
  288. TimeSpan timeUntilNextRun = nextRunTime - now;
  289. // 如果当前时间已经超过了8点,将等待到明天后的8点
  290. if (timeUntilNextRun < TimeSpan.Zero)
  291. {
  292. timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1));
  293. nextRunTime += timeUntilNextRun;
  294. }
  295. // var ttl = timeUntilNextRun.TotalMilliseconds;
  296. long ttl = (long)((timeUntilNextRun.TotalMilliseconds-(endTime-startTime).TotalMilliseconds)/1000);
  297. var data = new
  298. {
  299. imei = imeiDel,
  300. create_time = now.ToString("yyyy-MM-dd HH:mm:ss"),
  301. ttl,
  302. next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss")
  303. };
  304. var result = JsonConvert.SerializeObject(data);
  305. await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false);
  306. #endif
  307. }
  308. else
  309. {
  310. _logger.LogInformation($"错误响应,没有下推数据");
  311. }
  312. }
  313. }
  314. else
  315. {
  316. _logger.LogInformation($"向{imeiDel}准备下推的数据已经失效ts,{ts}");
  317. }
  318. #endregion
  319. //if (imeiDel.Equals("861281060086216"))
  320. //{
  321. // var result = await _httpHelper.HttpToPostAsync(url, data, headers).ConfigureAwait(false);
  322. // _logger.LogInformation($"向{imeiDel}下发增量值数据:{str},响应:{result}");
  323. // Console.WriteLine(str);
  324. //}
  325. //Console.BackgroundColor = ConsoleColor.Black;
  326. }
  327. break;
  328. }
  329. });
  330. //if (response.Events.Count == 0)
  331. //{
  332. // Console.WriteLine(response);
  333. //}
  334. //else
  335. //{
  336. // Console.WriteLine($"{response.Events[0].Kv.Key.ToStringUtf8()}:{response.Events.Kv.Value.ToStringUtf8()}");
  337. //}
  338. }
  339. }
  340. }