You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

588 lines
25KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Core.Dal;
  3. using HealthMonitor.Service.Biz.db;
  4. using HealthMonitor.Service.Cache;
  5. using HealthMonitor.Service.Resolver.Factory;
  6. using HealthMonitor.Service.Resolver.Interface;
  7. using HealthMonitor.Service.Sub.Topic.Model;
  8. using Microsoft.Extensions.Logging;
  9. using Newtonsoft.Json;
  10. using System;
  11. using System.Collections.Concurrent;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Text;
  15. using System.Threading.Tasks;
  16. using TDengineDriver;
  17. using TDengineTMQ;
  18. using TelpoDataService.Util.Entities.GpsLocationHistory;
  19. namespace HealthMonitor.Service.Sub
  20. {
  21. public class TDengineDataSubcribe
  22. {
  23. private readonly ILogger<TDengineDataSubcribe> _logger;
  24. private IConsumer _consumer = default!;
  25. private IntPtr _conn = default!;
  26. private readonly MsgManager _msgManager;
  27. private readonly MsgQueueManager _msgQueueManager;
  28. private readonly TDengineService _serviceTDengine;
  29. private readonly PersonCacheManager _personCacheMgr;
  30. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  31. private readonly IResolverFactory _resolverFactory;
  32. private int cnt = 0;
  33. public TDengineDataSubcribe(
  34. TDengineService serviceDengine,
  35. PersonCacheManager personCacheMgr,
  36. BloodPressReferenceValueCacheManager bpRefValCacheManager,
  37. IResolverFactory resolverFactory,
  38. MsgManager msgManager,
  39. MsgQueueManager msgQueueManager,
  40. ILogger<TDengineDataSubcribe> logger
  41. )
  42. {
  43. _serviceTDengine = serviceDengine;
  44. _personCacheMgr = personCacheMgr;
  45. _bpRefValCacheManager = bpRefValCacheManager;
  46. _logger = logger;
  47. _resolverFactory = resolverFactory;
  48. _msgManager = msgManager;
  49. _msgQueueManager = msgQueueManager;
  50. _conn = GetConnection();
  51. }
  52. public void BeginListen(CancellationToken stoppingToken)
  53. {
  54. //var cfg = new ConsumerConfig
  55. //{
  56. // GourpId = "group_1",
  57. // TDConnectUser = "root",
  58. // TDConnectPasswd = "taosdata",
  59. // MsgWithTableName = "true",
  60. // TDConnectIp = "47.116.142.20",
  61. //};
  62. //var conn = GetConnection();
  63. //ProcessMsg(consumer);
  64. DoTDengineConnect();
  65. }
  66. public void DoTDengineConnect()
  67. {
  68. string host = "47.116.142.20";
  69. short port = 6030;
  70. string username = "root";
  71. string password = "taosdata";
  72. string dbname = "health_monitor";
  73. var conn = TDengine.Connect(host, username, password, dbname, port);
  74. if (conn == IntPtr.Zero)
  75. {
  76. throw new Exception("Connect to TDengine failed");
  77. }
  78. else
  79. {
  80. Console.WriteLine("Connect to TDengine success");
  81. }
  82. DoReceive(conn);
  83. }
  84. public void DoReceive(IntPtr Connection)
  85. {
  86. var cfg = new ConsumerConfig
  87. {
  88. GourpId = "group_1",
  89. TDConnectUser = "root",
  90. TDConnectPasswd = "taosdata",
  91. MsgWithTableName = "true",
  92. TDConnectIp = "47.116.142.20",
  93. };
  94. string topic = "topic_hm_bp_stats";
  95. //create topic
  96. IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
  97. if (TDengine.ErrorNo(res) != 0)
  98. {
  99. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  100. }
  101. // create consumer
  102. var consumer = new ConsumerBuilder(cfg)
  103. .Build();
  104. // subscribe
  105. consumer.Subscribe(topic);
  106. while (true)
  107. {
  108. var consumeRes = consumer.Consume(300);
  109. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
  110. {
  111. for (int i = 0; i < kv.Value.Datas.Count; i++)
  112. {
  113. if (((i + 1) % kv.Value.Metas.Count == 0))
  114. {
  115. //string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
  116. //string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
  117. //string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
  118. //int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
  119. //int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
  120. //DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
  121. //DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
  122. //int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
  123. //bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
  124. //IDictionary<string, object> row = new Dictionary<string, object>();
  125. //foreach (var meta in kv.Value.Metas)
  126. //{
  127. // int index = i-(kv.Value.Metas.Count-kv.Value.Metas.IndexOf(meta)-1);
  128. // //var value = kv.Value.Datas[index];
  129. // row.Add(meta.name, kv.Value.Datas[index]);
  130. //}
  131. //var body2 = JsonConvert.SerializeObject(row);
  132. //kv.Value.Metas.ForEach(meta =>
  133. //{
  134. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  135. //});
  136. //HisGpsBloodPress bp = new()
  137. //{
  138. // BloodPressId = bloodpress_id,
  139. // MessageId = message_id,
  140. // Serialno = serialno,
  141. // SystolicValue = systolic_value,
  142. // DiastolicValue = diastolic_value,
  143. // CreateTime = create_time,
  144. // LastUpdate = last_update,
  145. // Method = method,
  146. // IsDisplay = is_display ? 1 : 0,
  147. //};
  148. try
  149. {
  150. IDictionary<string, object> row = new Dictionary<string, object>();
  151. foreach (var meta in kv.Value.Metas)
  152. {
  153. int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
  154. //var value = kv.Value.Datas[index];
  155. row.Add(meta.name, kv.Value.Datas[index]);
  156. }
  157. var db = kv.Key.db;
  158. var table = kv.Key.table;
  159. var kvTopic = kv.Key.topic;
  160. var body = JsonConvert.SerializeObject(row);
  161. ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
  162. ParsePackage(msg);
  163. }
  164. catch (Exception ex)
  165. {
  166. Console.WriteLine(ex.Message);
  167. }
  168. }
  169. }
  170. }
  171. consumer.Commit(consumeRes);
  172. Console.WriteLine("consumer.Commit");
  173. }
  174. }
  175. public void ParsePackage(ReceiveMessageModel model)
  176. {
  177. var msg = _resolverFactory.ParseAndWrap(model);
  178. Console.WriteLine("msg");
  179. cnt++;
  180. Console.WriteLine(cnt);
  181. Console.WriteLine(msg!.MessageId);
  182. if (msg == null) return;
  183. // ConcurrentQueue<ReceiveMessageModel> messageQueue = new ConcurrentQueue<ReceiveMessageModel>();
  184. //_msgManager.EnqueueMsg(msg!);
  185. _msgQueueManager.Enqueue(msg);
  186. }
  187. public void CreateConnection()
  188. {
  189. var cfg = new ConsumerConfig
  190. {
  191. GourpId = "group_1",
  192. TDConnectUser = "root",
  193. TDConnectPasswd = "taosdata",
  194. MsgWithTableName = "true",
  195. TDConnectIp = "47.116.142.20",
  196. };
  197. var conn = GetConnection();
  198. }
  199. // 创建消费者
  200. public void CreateConsumer()
  201. {
  202. var cfg = new ConsumerConfig
  203. {
  204. GourpId = "group_1",
  205. TDConnectUser = "root",
  206. TDConnectPasswd = "taosdata",
  207. MsgWithTableName = "true",
  208. TDConnectIp = "47.116.142.20",
  209. };
  210. //IntPtr conn = GetConnection();
  211. string topic = "topic_hm_bp_stats";
  212. //create topic
  213. IntPtr res = TDengine.Query(_conn, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
  214. if (TDengine.ErrorNo(res) != 0)
  215. {
  216. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  217. }
  218. // create consumer
  219. var consumer = new ConsumerBuilder(cfg)
  220. .Build();
  221. // subscribe
  222. consumer.Subscribe(topic);
  223. _consumer = consumer;
  224. }
  225. //public IConsumer CreateConsumer()
  226. //{
  227. // var cfg = new ConsumerConfig
  228. // {
  229. // GourpId = "group_1",
  230. // TDConnectUser = "root",
  231. // TDConnectPasswd = "taosdata",
  232. // MsgWithTableName = "true",
  233. // TDConnectIp = "47.116.142.20",
  234. // };
  235. // var conn = GetConnection();
  236. // //IntPtr conn = GetConnection();
  237. // string topic = "topic_name";
  238. // //create topic
  239. // IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from ctb1");
  240. // if (TDengine.ErrorNo(res) != 0)
  241. // {
  242. // throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  243. // }
  244. // // create consumer
  245. // var consumer = new ConsumerBuilder(cfg)
  246. // .Build();
  247. // // subscribe
  248. // consumer.Subscribe(topic);
  249. // return consumer;
  250. //}
  251. //public void ProcessMsg()
  252. //{
  253. // var consumerRes = _consumer.Consume(300);
  254. // // process ConsumeResult
  255. // foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  256. // {
  257. // Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  258. // kv.Value.Metas.ForEach(meta =>
  259. // {
  260. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  261. // });
  262. // Console.WriteLine("");
  263. // kv.Value.Datas.ForEach(data =>
  264. // {
  265. // Console.WriteLine(data.ToString());
  266. // });
  267. // }
  268. // _consumer.Commit(consumerRes);
  269. // // Console.WriteLine("\n================ {0} done ");
  270. //}
  271. /**
  272. public async Task ProcessMsg()
  273. {
  274. var consumerRes = _consumer.Consume(300);
  275. Console.WriteLine(consumerRes.Message.Count);
  276. // process ConsumeResult
  277. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  278. {
  279. //Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  280. //kv.Value.Metas.ForEach(meta =>
  281. //{
  282. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  283. //});
  284. //Console.WriteLine("----------");
  285. //kv.Value.Datas.ForEach(data =>
  286. //{
  287. // Console.WriteLine(data.ToString());
  288. //});
  289. //Console.WriteLine("----------");
  290. for (int i = 0; i < kv.Value.Datas.Count; i++)
  291. {
  292. // Console.Write($"|{kv.Value.Datas[i].ToString()} \t");
  293. //Console.WriteLine("{0},{1},{2}", i, resMeta.Count, (i) % resMeta.Count);
  294. if (((i + 1) % kv.Value.Metas.Count == 0))
  295. {
  296. try
  297. {
  298. cnt++;
  299. string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
  300. string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
  301. string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
  302. int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
  303. int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
  304. DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
  305. DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
  306. int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
  307. bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
  308. // Console.WriteLine("----------");
  309. HisGpsBloodPress bp = new()
  310. {
  311. //BloodPressId = (string)kv.Value.Datas[i -8],
  312. //MessageId = (string)kv.Value.Datas[i -7],
  313. //Serialno = (string)kv.Value.Datas[i -6],
  314. //SystolicValue = (int)kv.Value.Datas[i -5],
  315. //DiastolicValue = (int)kv.Value.Datas[i -4],
  316. //CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -3]/1000000),
  317. //LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -2]/1000000),
  318. //Method = (int)kv.Value.Datas[i -1],
  319. //IsDisplay = (bool)kv.Value.Datas[i] ? 1 : 0,
  320. BloodPressId = bloodpress_id,
  321. MessageId = message_id,
  322. Serialno = serialno,
  323. SystolicValue = systolic_value,
  324. DiastolicValue = diastolic_value,
  325. CreateTime = create_time,
  326. LastUpdate = last_update,
  327. Method = method,
  328. IsDisplay = is_display ? 1 : 0,
  329. };
  330. #region 获取个人信息
  331. var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false);
  332. // 验证这个信息是否存在
  333. if (person == null || person?.Person.BornDate == null)
  334. {
  335. Console.WriteLine("验证这个信息是否存在");
  336. }
  337. else
  338. {
  339. // 验证年龄是否在范围 (2 - 120)
  340. var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
  341. #endregion
  342. if (age < 1 || age > 120)
  343. {
  344. Console.WriteLine("验证年龄是否在范围 (2 - 120)");
  345. }
  346. else
  347. {
  348. var gender = person?.Person.Gender == true ? 1 : 2;
  349. var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
  350. var height = SafeType.SafeDouble(person?.Person.Height!);
  351. var weight = SafeType.SafeDouble(person?.Person.Weight!);
  352. #region 插入当次BP数据
  353. // 保存到TDengine
  354. //var bpSql = $"INSERT INTO health_monitor.hm_bloodpress VALUES(" +
  355. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  356. // $"'{bp.BloodPressId}'," +
  357. // $"'{bp.MessageId}'," +
  358. // $"'{bp.Serialno}'," +
  359. // $"{bp.SystolicValue}," +
  360. // $"{bp.DiastolicValue}," +
  361. // $"'{bp.CreateTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  362. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  363. // $"{bp.Method}," +
  364. // $"{bp.IsDisplay == 1})";
  365. //await _serviceTDengine.GernalRestSql(bpSql);
  366. #endregion
  367. #region 计算增量值
  368. var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);
  369. var systolicRefValue = bpRef?.Systolic;//?
  370. var diastolicRefValue = bpRef?.Diastolic;//?
  371. int duration = 30;
  372. // 获取历史数据
  373. //DateTime now = DateTime.Now;
  374. DateTime now = (DateTime)bp.LastUpdate; //测试
  375. DateTime startTime = now.AddDays(-duration);
  376. DateTime endTime = now;
  377. //
  378. var systolicAggregate = await _serviceTDengine.GetAggregateValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  379. var diastolicAggregate = await _serviceTDengine.GetAggregateValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  380. // 最大值
  381. var systolicMax = systolicAggregate.Max;
  382. var diastolicMax = diastolicAggregate.Max;
  383. // 最小值
  384. var systolicMin = systolicAggregate.Min;
  385. var diastolicMin = diastolicAggregate.Min;
  386. // 计算去除最大值和最小值和异常值的平均值
  387. var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} ");
  388. var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}");
  389. // 偏移参数
  390. var avgOffset = 0.25M;
  391. var systolicAvgOffset = avgOffset;
  392. var diastolicAvgOffset = avgOffset;
  393. // 增量值=(标定值-平均值)* 0.25
  394. var systolicInc = systolicAvg.Equals(0M) ? 0 : (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
  395. var diastolicInc = diastolicAvg.Equals(0M) ? 0 : (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;
  396. #endregion
  397. #region 插入BP增量值
  398. var sql = $"INSERT INTO health_monitor.hm_bloodpress_stats_inc VALUES(" +
  399. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  400. $"'{bp.BloodPressId}'," +
  401. $"'{bp.MessageId}'," +
  402. $"'{bp.Serialno}'," +
  403. $"{bp.SystolicValue}," +
  404. $"{systolicRefValue}," +
  405. $"{systolicAvg}," +
  406. $"{systolicMax}," +
  407. $"{systolicMin}," +
  408. $"{systolicAvgOffset}," +
  409. $"{systolicInc}," +
  410. $"{bp.DiastolicValue}," +
  411. $"{diastolicRefValue}," +
  412. $"{diastolicAvg}," +
  413. $"{diastolicMax}," +
  414. $"{diastolicMin}," +
  415. $"{diastolicAvgOffset}," +
  416. $"{diastolicInc}," +
  417. $"{gender}," +
  418. $"{age}," +
  419. $"{height}," +
  420. $"{weight}," +
  421. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  422. $"{duration}," +
  423. $"'{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  424. $"'{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  425. $"'{string.Empty}')";
  426. var res = await _serviceTDengine.GernalRestSql(sql);
  427. #endregion
  428. };
  429. // Console.WriteLine("----------");
  430. }
  431. }
  432. catch (Exception ex)
  433. {
  434. Console.WriteLine(ex.Message);
  435. }
  436. }
  437. Console.WriteLine("++++++++++++++++++++++");
  438. Console.Write($"总共增加{cnt}");
  439. }
  440. // Console.WriteLine("");
  441. }
  442. _consumer.Commit(consumerRes);
  443. // Console.WriteLine("\n================ {0} done ");
  444. }
  445. */
  446. public async Task ProcessMsg()
  447. {
  448. }
  449. public IntPtr GetConnection()
  450. {
  451. string host = "47.116.142.20";
  452. short port = 6030;
  453. string username = "root";
  454. string password = "taosdata";
  455. string dbname = "tmqdb";
  456. var conn = TDengine.Connect(host, username, password, dbname, port);
  457. if (conn == IntPtr.Zero)
  458. {
  459. throw new Exception("Connect to TDengine failed");
  460. }
  461. else
  462. {
  463. Console.WriteLine("Connect to TDengine success");
  464. }
  465. return conn;
  466. }
  467. // 关闭消费者
  468. //public void CloseConsumer(IConsumer consumer, IntPtr conn)
  469. //{
  470. // List<string> topics = consumer.Subscription();
  471. // topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  472. // // unsubscribe
  473. // consumer.Unsubscribe();
  474. // // close consumer after use.Otherwise will lead memory leak.
  475. // consumer.Close();
  476. // TDengine.Close(conn);
  477. //}
  478. public void CloseConsumer()
  479. {
  480. List<string> topics = _consumer.Subscription();
  481. topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  482. // unsubscribe
  483. _consumer.Unsubscribe();
  484. // close consumer after use.Otherwise will lead memory leak.
  485. _consumer.Close();
  486. TDengine.Close(_conn);
  487. }
  488. }
  489. }