Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

585 lines
25KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Core.Dal;
  3. using HealthMonitor.Service.Biz.db;
  4. using HealthMonitor.Service.Cache;
  5. using HealthMonitor.Service.Resolver.Factory;
  6. using HealthMonitor.Service.Resolver.Interface;
  7. using HealthMonitor.Service.Sub.Topic.Model;
  8. using Microsoft.Extensions.Logging;
  9. using Newtonsoft.Json;
  10. using System;
  11. using System.Collections.Concurrent;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Text;
  15. using System.Threading.Tasks;
  16. using TDengineDriver;
  17. using TDengineTMQ;
  18. using TelpoDataService.Util.Entities.GpsLocationHistory;
  19. namespace HealthMonitor.Service.Sub
  20. {
  21. public class TDengineDataSubcribe
  22. {
  23. private readonly ILogger<TDengineDataSubcribe> _logger;
  24. private IConsumer _consumer = default!;
  25. private IntPtr _conn = default!;
  26. private readonly MsgManager _msgManager;
  27. private readonly TDengineService _serviceTDengine;
  28. private readonly PersonCacheManager _personCacheMgr;
  29. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  30. private readonly IResolverFactory _resolverFactory;
  31. private int cnt = 0;
  32. public TDengineDataSubcribe(
  33. TDengineService serviceDengine,
  34. PersonCacheManager personCacheMgr,
  35. BloodPressReferenceValueCacheManager bpRefValCacheManager,
  36. IResolverFactory resolverFactory,
  37. MsgManager msgManager,
  38. ILogger<TDengineDataSubcribe> logger
  39. )
  40. {
  41. _serviceTDengine = serviceDengine;
  42. _personCacheMgr = personCacheMgr;
  43. _bpRefValCacheManager = bpRefValCacheManager;
  44. _logger = logger;
  45. _resolverFactory = resolverFactory;
  46. _msgManager = msgManager;
  47. _conn = GetConnection();
  48. }
  49. public void BeginListen(CancellationToken stoppingToken)
  50. {
  51. //var cfg = new ConsumerConfig
  52. //{
  53. // GourpId = "group_1",
  54. // TDConnectUser = "root",
  55. // TDConnectPasswd = "taosdata",
  56. // MsgWithTableName = "true",
  57. // TDConnectIp = "47.116.142.20",
  58. //};
  59. //var conn = GetConnection();
  60. //ProcessMsg(consumer);
  61. DoTDengineConnect();
  62. }
  63. public void DoTDengineConnect()
  64. {
  65. string host = "47.116.142.20";
  66. short port = 6030;
  67. string username = "root";
  68. string password = "taosdata";
  69. string dbname = "health_monitor";
  70. var conn = TDengine.Connect(host, username, password, dbname, port);
  71. if (conn == IntPtr.Zero)
  72. {
  73. throw new Exception("Connect to TDengine failed");
  74. }
  75. else
  76. {
  77. Console.WriteLine("Connect to TDengine success");
  78. }
  79. DoReceive(conn);
  80. }
  81. public void DoReceive(IntPtr Connection)
  82. {
  83. var cfg = new ConsumerConfig
  84. {
  85. GourpId = "group_1",
  86. TDConnectUser = "root",
  87. TDConnectPasswd = "taosdata",
  88. MsgWithTableName = "true",
  89. TDConnectIp = "47.116.142.20",
  90. };
  91. string topic = "topic_hm_bp_stats";
  92. //create topic
  93. IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
  94. if (TDengine.ErrorNo(res) != 0)
  95. {
  96. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  97. }
  98. // create consumer
  99. var consumer = new ConsumerBuilder(cfg)
  100. .Build();
  101. // subscribe
  102. consumer.Subscribe(topic);
  103. while (true)
  104. {
  105. var consumeRes = consumer.Consume(300);
  106. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
  107. {
  108. for (int i = 0; i < kv.Value.Datas.Count; i++)
  109. {
  110. if (((i + 1) % kv.Value.Metas.Count == 0))
  111. {
  112. //string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
  113. //string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
  114. //string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
  115. //int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
  116. //int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
  117. //DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
  118. //DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
  119. //int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
  120. //bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
  121. //IDictionary<string, object> row = new Dictionary<string, object>();
  122. //foreach (var meta in kv.Value.Metas)
  123. //{
  124. // int index = i-(kv.Value.Metas.Count-kv.Value.Metas.IndexOf(meta)-1);
  125. // //var value = kv.Value.Datas[index];
  126. // row.Add(meta.name, kv.Value.Datas[index]);
  127. //}
  128. //var body2 = JsonConvert.SerializeObject(row);
  129. //kv.Value.Metas.ForEach(meta =>
  130. //{
  131. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  132. //});
  133. //HisGpsBloodPress bp = new()
  134. //{
  135. // BloodPressId = bloodpress_id,
  136. // MessageId = message_id,
  137. // Serialno = serialno,
  138. // SystolicValue = systolic_value,
  139. // DiastolicValue = diastolic_value,
  140. // CreateTime = create_time,
  141. // LastUpdate = last_update,
  142. // Method = method,
  143. // IsDisplay = is_display ? 1 : 0,
  144. //};
  145. try
  146. {
  147. IDictionary<string, object> row = new Dictionary<string, object>();
  148. foreach (var meta in kv.Value.Metas)
  149. {
  150. int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
  151. //var value = kv.Value.Datas[index];
  152. row.Add(meta.name, kv.Value.Datas[index]);
  153. }
  154. var db = kv.Key.db;
  155. var table = kv.Key.table;
  156. var kvTopic = kv.Key.topic;
  157. var body = JsonConvert.SerializeObject(row);
  158. ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
  159. ParsePackage(msg);
  160. }
  161. catch (Exception ex)
  162. {
  163. Console.WriteLine(ex.Message);
  164. }
  165. }
  166. }
  167. }
  168. consumer.Commit(consumeRes);
  169. Console.WriteLine("consumer.Commit");
  170. }
  171. }
  172. public void ParsePackage(ReceiveMessageModel model)
  173. {
  174. var msg = _resolverFactory.ParseAndWrap(model);
  175. Console.WriteLine("msg");
  176. cnt++;
  177. Console.WriteLine(cnt);
  178. Console.WriteLine(msg!.MessageId);
  179. if (msg == null) return;
  180. // ConcurrentQueue<ReceiveMessageModel> messageQueue = new ConcurrentQueue<ReceiveMessageModel>();
  181. _msgManager.EnqueueMsg(msg!);
  182. }
  183. public void CreateConnection()
  184. {
  185. var cfg = new ConsumerConfig
  186. {
  187. GourpId = "group_1",
  188. TDConnectUser = "root",
  189. TDConnectPasswd = "taosdata",
  190. MsgWithTableName = "true",
  191. TDConnectIp = "47.116.142.20",
  192. };
  193. var conn = GetConnection();
  194. }
  195. // 创建消费者
  196. public void CreateConsumer()
  197. {
  198. var cfg = new ConsumerConfig
  199. {
  200. GourpId = "group_1",
  201. TDConnectUser = "root",
  202. TDConnectPasswd = "taosdata",
  203. MsgWithTableName = "true",
  204. TDConnectIp = "47.116.142.20",
  205. };
  206. //IntPtr conn = GetConnection();
  207. string topic = "topic_hm_bp_stats";
  208. //create topic
  209. IntPtr res = TDengine.Query(_conn, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
  210. if (TDengine.ErrorNo(res) != 0)
  211. {
  212. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  213. }
  214. // create consumer
  215. var consumer = new ConsumerBuilder(cfg)
  216. .Build();
  217. // subscribe
  218. consumer.Subscribe(topic);
  219. _consumer = consumer;
  220. }
  221. //public IConsumer CreateConsumer()
  222. //{
  223. // var cfg = new ConsumerConfig
  224. // {
  225. // GourpId = "group_1",
  226. // TDConnectUser = "root",
  227. // TDConnectPasswd = "taosdata",
  228. // MsgWithTableName = "true",
  229. // TDConnectIp = "47.116.142.20",
  230. // };
  231. // var conn = GetConnection();
  232. // //IntPtr conn = GetConnection();
  233. // string topic = "topic_name";
  234. // //create topic
  235. // IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from ctb1");
  236. // if (TDengine.ErrorNo(res) != 0)
  237. // {
  238. // throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  239. // }
  240. // // create consumer
  241. // var consumer = new ConsumerBuilder(cfg)
  242. // .Build();
  243. // // subscribe
  244. // consumer.Subscribe(topic);
  245. // return consumer;
  246. //}
  247. //public void ProcessMsg()
  248. //{
  249. // var consumerRes = _consumer.Consume(300);
  250. // // process ConsumeResult
  251. // foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  252. // {
  253. // Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  254. // kv.Value.Metas.ForEach(meta =>
  255. // {
  256. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  257. // });
  258. // Console.WriteLine("");
  259. // kv.Value.Datas.ForEach(data =>
  260. // {
  261. // Console.WriteLine(data.ToString());
  262. // });
  263. // }
  264. // _consumer.Commit(consumerRes);
  265. // // Console.WriteLine("\n================ {0} done ");
  266. //}
  267. /**
  268. public async Task ProcessMsg()
  269. {
  270. var consumerRes = _consumer.Consume(300);
  271. Console.WriteLine(consumerRes.Message.Count);
  272. // process ConsumeResult
  273. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  274. {
  275. //Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  276. //kv.Value.Metas.ForEach(meta =>
  277. //{
  278. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  279. //});
  280. //Console.WriteLine("----------");
  281. //kv.Value.Datas.ForEach(data =>
  282. //{
  283. // Console.WriteLine(data.ToString());
  284. //});
  285. //Console.WriteLine("----------");
  286. for (int i = 0; i < kv.Value.Datas.Count; i++)
  287. {
  288. // Console.Write($"|{kv.Value.Datas[i].ToString()} \t");
  289. //Console.WriteLine("{0},{1},{2}", i, resMeta.Count, (i) % resMeta.Count);
  290. if (((i + 1) % kv.Value.Metas.Count == 0))
  291. {
  292. try
  293. {
  294. cnt++;
  295. string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
  296. string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
  297. string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
  298. int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
  299. int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
  300. DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
  301. DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
  302. int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
  303. bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
  304. // Console.WriteLine("----------");
  305. HisGpsBloodPress bp = new()
  306. {
  307. //BloodPressId = (string)kv.Value.Datas[i -8],
  308. //MessageId = (string)kv.Value.Datas[i -7],
  309. //Serialno = (string)kv.Value.Datas[i -6],
  310. //SystolicValue = (int)kv.Value.Datas[i -5],
  311. //DiastolicValue = (int)kv.Value.Datas[i -4],
  312. //CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -3]/1000000),
  313. //LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -2]/1000000),
  314. //Method = (int)kv.Value.Datas[i -1],
  315. //IsDisplay = (bool)kv.Value.Datas[i] ? 1 : 0,
  316. BloodPressId = bloodpress_id,
  317. MessageId = message_id,
  318. Serialno = serialno,
  319. SystolicValue = systolic_value,
  320. DiastolicValue = diastolic_value,
  321. CreateTime = create_time,
  322. LastUpdate = last_update,
  323. Method = method,
  324. IsDisplay = is_display ? 1 : 0,
  325. };
  326. #region 获取个人信息
  327. var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false);
  328. // 验证这个信息是否存在
  329. if (person == null || person?.Person.BornDate == null)
  330. {
  331. Console.WriteLine("验证这个信息是否存在");
  332. }
  333. else
  334. {
  335. // 验证年龄是否在范围 (2 - 120)
  336. var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
  337. #endregion
  338. if (age < 1 || age > 120)
  339. {
  340. Console.WriteLine("验证年龄是否在范围 (2 - 120)");
  341. }
  342. else
  343. {
  344. var gender = person?.Person.Gender == true ? 1 : 2;
  345. var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
  346. var height = SafeType.SafeDouble(person?.Person.Height!);
  347. var weight = SafeType.SafeDouble(person?.Person.Weight!);
  348. #region 插入当次BP数据
  349. // 保存到TDengine
  350. //var bpSql = $"INSERT INTO health_monitor.hm_bloodpress VALUES(" +
  351. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  352. // $"'{bp.BloodPressId}'," +
  353. // $"'{bp.MessageId}'," +
  354. // $"'{bp.Serialno}'," +
  355. // $"{bp.SystolicValue}," +
  356. // $"{bp.DiastolicValue}," +
  357. // $"'{bp.CreateTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  358. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  359. // $"{bp.Method}," +
  360. // $"{bp.IsDisplay == 1})";
  361. //await _serviceTDengine.GernalRestSql(bpSql);
  362. #endregion
  363. #region 计算增量值
  364. var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);
  365. var systolicRefValue = bpRef?.Systolic;//?
  366. var diastolicRefValue = bpRef?.Diastolic;//?
  367. int duration = 30;
  368. // 获取历史数据
  369. //DateTime now = DateTime.Now;
  370. DateTime now = (DateTime)bp.LastUpdate; //测试
  371. DateTime startTime = now.AddDays(-duration);
  372. DateTime endTime = now;
  373. //
  374. var systolicAggregate = await _serviceTDengine.GetAggregateValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  375. var diastolicAggregate = await _serviceTDengine.GetAggregateValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  376. // 最大值
  377. var systolicMax = systolicAggregate.Max;
  378. var diastolicMax = diastolicAggregate.Max;
  379. // 最小值
  380. var systolicMin = systolicAggregate.Min;
  381. var diastolicMin = diastolicAggregate.Min;
  382. // 计算去除最大值和最小值和异常值的平均值
  383. var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} ");
  384. var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}");
  385. // 偏移参数
  386. var avgOffset = 0.25M;
  387. var systolicAvgOffset = avgOffset;
  388. var diastolicAvgOffset = avgOffset;
  389. // 增量值=(标定值-平均值)* 0.25
  390. var systolicInc = systolicAvg.Equals(0M) ? 0 : (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
  391. var diastolicInc = diastolicAvg.Equals(0M) ? 0 : (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;
  392. #endregion
  393. #region 插入BP增量值
  394. var sql = $"INSERT INTO health_monitor.hm_bloodpress_stats_inc VALUES(" +
  395. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  396. $"'{bp.BloodPressId}'," +
  397. $"'{bp.MessageId}'," +
  398. $"'{bp.Serialno}'," +
  399. $"{bp.SystolicValue}," +
  400. $"{systolicRefValue}," +
  401. $"{systolicAvg}," +
  402. $"{systolicMax}," +
  403. $"{systolicMin}," +
  404. $"{systolicAvgOffset}," +
  405. $"{systolicInc}," +
  406. $"{bp.DiastolicValue}," +
  407. $"{diastolicRefValue}," +
  408. $"{diastolicAvg}," +
  409. $"{diastolicMax}," +
  410. $"{diastolicMin}," +
  411. $"{diastolicAvgOffset}," +
  412. $"{diastolicInc}," +
  413. $"{gender}," +
  414. $"{age}," +
  415. $"{height}," +
  416. $"{weight}," +
  417. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  418. $"{duration}," +
  419. $"'{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  420. $"'{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  421. $"'{string.Empty}')";
  422. var res = await _serviceTDengine.GernalRestSql(sql);
  423. #endregion
  424. };
  425. // Console.WriteLine("----------");
  426. }
  427. }
  428. catch (Exception ex)
  429. {
  430. Console.WriteLine(ex.Message);
  431. }
  432. }
  433. Console.WriteLine("++++++++++++++++++++++");
  434. Console.Write($"总共增加{cnt}");
  435. }
  436. // Console.WriteLine("");
  437. }
  438. _consumer.Commit(consumerRes);
  439. // Console.WriteLine("\n================ {0} done ");
  440. }
  441. */
  442. public async Task ProcessMsg()
  443. {
  444. }
  445. public IntPtr GetConnection()
  446. {
  447. string host = "47.116.142.20";
  448. short port = 6030;
  449. string username = "root";
  450. string password = "taosdata";
  451. string dbname = "tmqdb";
  452. var conn = TDengine.Connect(host, username, password, dbname, port);
  453. if (conn == IntPtr.Zero)
  454. {
  455. throw new Exception("Connect to TDengine failed");
  456. }
  457. else
  458. {
  459. Console.WriteLine("Connect to TDengine success");
  460. }
  461. return conn;
  462. }
  463. // 关闭消费者
  464. //public void CloseConsumer(IConsumer consumer, IntPtr conn)
  465. //{
  466. // List<string> topics = consumer.Subscription();
  467. // topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  468. // // unsubscribe
  469. // consumer.Unsubscribe();
  470. // // close consumer after use.Otherwise will lead memory leak.
  471. // consumer.Close();
  472. // TDengine.Close(conn);
  473. //}
  474. public void CloseConsumer()
  475. {
  476. List<string> topics = _consumer.Subscription();
  477. topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  478. // unsubscribe
  479. _consumer.Unsubscribe();
  480. // close consumer after use.Otherwise will lead memory leak.
  481. _consumer.Close();
  482. TDengine.Close(_conn);
  483. }
  484. }
  485. }