You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

557 lines
23KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Core.Dal;
  3. using HealthMonitor.Service.Biz.db;
  4. using HealthMonitor.Service.Cache;
  5. using HealthMonitor.Service.Resolver.Factory;
  6. using HealthMonitor.Service.Resolver.Interface;
  7. using HealthMonitor.Service.Sub.Topic.Model;
  8. using Microsoft.Extensions.Logging;
  9. using Newtonsoft.Json;
  10. using System;
  11. using System.Collections.Concurrent;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Text;
  15. using System.Threading.Tasks;
  16. using TDengineDriver;
  17. using TDengineTMQ;
  18. using TelpoDataService.Util.Entities.GpsLocationHistory;
  19. namespace HealthMonitor.Service.Sub
  20. {
  21. public class TDengineDataSubcribe
  22. {
  23. private readonly ILogger<TDengineDataSubcribe> _logger;
  24. private IConsumer _consumer = default!;
  25. private IntPtr _conn = default!;
  26. private readonly MsgQueueManager _msgQueueManager;
  27. private readonly TDengineService _serviceTDengine;
  28. private readonly PersonCacheManager _personCacheMgr;
  29. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  30. private readonly IResolverFactory _resolverFactory;
  31. private CancellationTokenSource _tokenSource = null;
  32. private int cnt = 0;
  33. public TDengineDataSubcribe(
  34. TDengineService serviceDengine,
  35. PersonCacheManager personCacheMgr,
  36. BloodPressReferenceValueCacheManager bpRefValCacheManager,
  37. IResolverFactory resolverFactory,
  38. MsgQueueManager msgQueueManager,
  39. ILogger<TDengineDataSubcribe> logger
  40. )
  41. {
  42. _serviceTDengine = serviceDengine;
  43. _personCacheMgr = personCacheMgr;
  44. _bpRefValCacheManager = bpRefValCacheManager;
  45. _logger = logger;
  46. _resolverFactory = resolverFactory;
  47. _msgQueueManager = msgQueueManager;
  48. _conn = GetConnection();
  49. }
  50. public void BeginListen(CancellationToken stoppingToken)
  51. {
  52. //var cfg = new ConsumerConfig
  53. //{
  54. // GourpId = "group_1",
  55. // TDConnectUser = "root",
  56. // TDConnectPasswd = "taosdata",
  57. // MsgWithTableName = "true",
  58. // TDConnectIp = "47.116.142.20",
  59. //};
  60. //var conn = GetConnection();
  61. //ProcessMsg(consumer);
  62. //防止造成多线程运行
  63. _tokenSource?.Cancel();
  64. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
  65. DoTDengineConnect();
  66. }
  67. public void DoTDengineConnect()
  68. {
  69. string host = "47.116.142.20";
  70. short port = 6030;
  71. string username = "root";
  72. string password = "taosdata";
  73. string dbname = "health_monitor";
  74. var conn = TDengine.Connect(host, username, password, dbname, port);
  75. if (conn == IntPtr.Zero)
  76. {
  77. throw new Exception("Connect to TDengine failed");
  78. }
  79. else
  80. {
  81. Console.WriteLine("Connect to TDengine success");
  82. }
  83. DoReceive(conn);
  84. }
  85. public void DoReceive(IntPtr Connection)
  86. {
  87. var cfg = new ConsumerConfig
  88. {
  89. GourpId = "group_1",
  90. TDConnectUser = "root",
  91. TDConnectPasswd = "taosdata",
  92. MsgWithTableName = "true",
  93. TDConnectIp = "47.116.142.20",
  94. };
  95. string topic = "topic_hm_bp_stats";
  96. //nameof(TopicHmBloodPress)
  97. //create topic
  98. IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
  99. if (TDengine.ErrorNo(res) != 0)
  100. {
  101. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  102. }
  103. // create consumer
  104. var consumer = new ConsumerBuilder(cfg)
  105. .Build();
  106. // subscribe
  107. consumer.Subscribe(topic);
  108. while (!_tokenSource.IsCancellationRequested)
  109. {
  110. var consumeRes = consumer.Consume(300);
  111. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
  112. {
  113. for (int i = 0; i < kv.Value.Datas.Count; i++)
  114. {
  115. if (((i + 1) % kv.Value.Metas.Count == 0))
  116. {
  117. try
  118. {
  119. IDictionary<string, object> row = new Dictionary<string, object>();
  120. foreach (var meta in kv.Value.Metas)
  121. {
  122. int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
  123. //var value = kv.Value.Datas[index];
  124. row.Add(meta.name, kv.Value.Datas[index]);
  125. }
  126. var db = kv.Key.db;
  127. var table = kv.Key.table;
  128. var kvTopic = kv.Key.topic;
  129. var body = JsonConvert.SerializeObject(row);
  130. ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
  131. ParsePackage(msg);
  132. }
  133. catch (Exception ex)
  134. {
  135. Console.WriteLine(ex.Message);
  136. }
  137. }
  138. }
  139. }
  140. consumer.Commit(consumeRes);
  141. Console.WriteLine("监听中....");
  142. }
  143. // close consumer after use.Otherwise will lead memory leak.
  144. _consumer.Close();
  145. TDengine.Close(_conn);
  146. }
  147. public void ParsePackage(ReceiveMessageModel model)
  148. {
  149. var msg = _resolverFactory.ParseAndWrap(model);
  150. //Console.WriteLine("msg");
  151. //cnt++;
  152. //Console.WriteLine(cnt);
  153. //Console.WriteLine(msg!.MessageId);
  154. if (msg == null) return;
  155. // ConcurrentQueue<ReceiveMessageModel> messageQueue = new ConcurrentQueue<ReceiveMessageModel>();
  156. //_msgManager.EnqueueMsg(msg!);
  157. _msgQueueManager.Enqueue(msg);
  158. }
  159. public void CreateConnection()
  160. {
  161. var cfg = new ConsumerConfig
  162. {
  163. GourpId = "group_1",
  164. TDConnectUser = "root",
  165. TDConnectPasswd = "taosdata",
  166. MsgWithTableName = "true",
  167. TDConnectIp = "47.116.142.20",
  168. };
  169. var conn = GetConnection();
  170. }
  171. // 创建消费者
  172. public void CreateConsumer()
  173. {
  174. var cfg = new ConsumerConfig
  175. {
  176. GourpId = "group_1",
  177. TDConnectUser = "root",
  178. TDConnectPasswd = "taosdata",
  179. MsgWithTableName = "true",
  180. TDConnectIp = "47.116.142.20",
  181. };
  182. //IntPtr conn = GetConnection();
  183. string topic = "topic_hm_bp_stats";
  184. //create topic
  185. IntPtr res = TDengine.Query(_conn, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
  186. if (TDengine.ErrorNo(res) != 0)
  187. {
  188. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  189. }
  190. // create consumer
  191. var consumer = new ConsumerBuilder(cfg)
  192. .Build();
  193. // subscribe
  194. consumer.Subscribe(topic);
  195. _consumer = consumer;
  196. }
  197. //public IConsumer CreateConsumer()
  198. //{
  199. // var cfg = new ConsumerConfig
  200. // {
  201. // GourpId = "group_1",
  202. // TDConnectUser = "root",
  203. // TDConnectPasswd = "taosdata",
  204. // MsgWithTableName = "true",
  205. // TDConnectIp = "47.116.142.20",
  206. // };
  207. // var conn = GetConnection();
  208. // //IntPtr conn = GetConnection();
  209. // string topic = "topic_name";
  210. // //create topic
  211. // IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from ctb1");
  212. // if (TDengine.ErrorNo(res) != 0)
  213. // {
  214. // throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  215. // }
  216. // // create consumer
  217. // var consumer = new ConsumerBuilder(cfg)
  218. // .Build();
  219. // // subscribe
  220. // consumer.Subscribe(topic);
  221. // return consumer;
  222. //}
  223. //public void ProcessMsg()
  224. //{
  225. // var consumerRes = _consumer.Consume(300);
  226. // // process ConsumeResult
  227. // foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  228. // {
  229. // Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  230. // kv.Value.Metas.ForEach(meta =>
  231. // {
  232. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  233. // });
  234. // Console.WriteLine("");
  235. // kv.Value.Datas.ForEach(data =>
  236. // {
  237. // Console.WriteLine(data.ToString());
  238. // });
  239. // }
  240. // _consumer.Commit(consumerRes);
  241. // // Console.WriteLine("\n================ {0} done ");
  242. //}
  243. /**
  244. public async Task ProcessMsg()
  245. {
  246. var consumerRes = _consumer.Consume(300);
  247. Console.WriteLine(consumerRes.Message.Count);
  248. // process ConsumeResult
  249. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  250. {
  251. //Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  252. //kv.Value.Metas.ForEach(meta =>
  253. //{
  254. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  255. //});
  256. //Console.WriteLine("----------");
  257. //kv.Value.Datas.ForEach(data =>
  258. //{
  259. // Console.WriteLine(data.ToString());
  260. //});
  261. //Console.WriteLine("----------");
  262. for (int i = 0; i < kv.Value.Datas.Count; i++)
  263. {
  264. // Console.Write($"|{kv.Value.Datas[i].ToString()} \t");
  265. //Console.WriteLine("{0},{1},{2}", i, resMeta.Count, (i) % resMeta.Count);
  266. if (((i + 1) % kv.Value.Metas.Count == 0))
  267. {
  268. try
  269. {
  270. cnt++;
  271. string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
  272. string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
  273. string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
  274. int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
  275. int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
  276. DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
  277. DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
  278. int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
  279. bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
  280. // Console.WriteLine("----------");
  281. HisGpsBloodPress bp = new()
  282. {
  283. //BloodPressId = (string)kv.Value.Datas[i -8],
  284. //MessageId = (string)kv.Value.Datas[i -7],
  285. //Serialno = (string)kv.Value.Datas[i -6],
  286. //SystolicValue = (int)kv.Value.Datas[i -5],
  287. //DiastolicValue = (int)kv.Value.Datas[i -4],
  288. //CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -3]/1000000),
  289. //LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -2]/1000000),
  290. //Method = (int)kv.Value.Datas[i -1],
  291. //IsDisplay = (bool)kv.Value.Datas[i] ? 1 : 0,
  292. BloodPressId = bloodpress_id,
  293. MessageId = message_id,
  294. Serialno = serialno,
  295. SystolicValue = systolic_value,
  296. DiastolicValue = diastolic_value,
  297. CreateTime = create_time,
  298. LastUpdate = last_update,
  299. Method = method,
  300. IsDisplay = is_display ? 1 : 0,
  301. };
  302. #region 获取个人信息
  303. var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false);
  304. // 验证这个信息是否存在
  305. if (person == null || person?.Person.BornDate == null)
  306. {
  307. Console.WriteLine("验证这个信息是否存在");
  308. }
  309. else
  310. {
  311. // 验证年龄是否在范围 (2 - 120)
  312. var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
  313. #endregion
  314. if (age < 1 || age > 120)
  315. {
  316. Console.WriteLine("验证年龄是否在范围 (2 - 120)");
  317. }
  318. else
  319. {
  320. var gender = person?.Person.Gender == true ? 1 : 2;
  321. var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
  322. var height = SafeType.SafeDouble(person?.Person.Height!);
  323. var weight = SafeType.SafeDouble(person?.Person.Weight!);
  324. #region 插入当次BP数据
  325. // 保存到TDengine
  326. //var bpSql = $"INSERT INTO health_monitor.hm_bloodpress VALUES(" +
  327. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  328. // $"'{bp.BloodPressId}'," +
  329. // $"'{bp.MessageId}'," +
  330. // $"'{bp.Serialno}'," +
  331. // $"{bp.SystolicValue}," +
  332. // $"{bp.DiastolicValue}," +
  333. // $"'{bp.CreateTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  334. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  335. // $"{bp.Method}," +
  336. // $"{bp.IsDisplay == 1})";
  337. //await _serviceTDengine.GernalRestSql(bpSql);
  338. #endregion
  339. #region 计算增量值
  340. var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);
  341. var systolicRefValue = bpRef?.Systolic;//?
  342. var diastolicRefValue = bpRef?.Diastolic;//?
  343. int duration = 30;
  344. // 获取历史数据
  345. //DateTime now = DateTime.Now;
  346. DateTime now = (DateTime)bp.LastUpdate; //测试
  347. DateTime startTime = now.AddDays(-duration);
  348. DateTime endTime = now;
  349. //
  350. var systolicAggregate = await _serviceTDengine.GetAggregateValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  351. var diastolicAggregate = await _serviceTDengine.GetAggregateValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  352. // 最大值
  353. var systolicMax = systolicAggregate.Max;
  354. var diastolicMax = diastolicAggregate.Max;
  355. // 最小值
  356. var systolicMin = systolicAggregate.Min;
  357. var diastolicMin = diastolicAggregate.Min;
  358. // 计算去除最大值和最小值和异常值的平均值
  359. var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} ");
  360. var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}");
  361. // 偏移参数
  362. var avgOffset = 0.25M;
  363. var systolicAvgOffset = avgOffset;
  364. var diastolicAvgOffset = avgOffset;
  365. // 增量值=(标定值-平均值)* 0.25
  366. var systolicInc = systolicAvg.Equals(0M) ? 0 : (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
  367. var diastolicInc = diastolicAvg.Equals(0M) ? 0 : (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;
  368. #endregion
  369. #region 插入BP增量值
  370. var sql = $"INSERT INTO health_monitor.hm_bloodpress_stats_inc VALUES(" +
  371. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  372. $"'{bp.BloodPressId}'," +
  373. $"'{bp.MessageId}'," +
  374. $"'{bp.Serialno}'," +
  375. $"{bp.SystolicValue}," +
  376. $"{systolicRefValue}," +
  377. $"{systolicAvg}," +
  378. $"{systolicMax}," +
  379. $"{systolicMin}," +
  380. $"{systolicAvgOffset}," +
  381. $"{systolicInc}," +
  382. $"{bp.DiastolicValue}," +
  383. $"{diastolicRefValue}," +
  384. $"{diastolicAvg}," +
  385. $"{diastolicMax}," +
  386. $"{diastolicMin}," +
  387. $"{diastolicAvgOffset}," +
  388. $"{diastolicInc}," +
  389. $"{gender}," +
  390. $"{age}," +
  391. $"{height}," +
  392. $"{weight}," +
  393. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  394. $"{duration}," +
  395. $"'{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  396. $"'{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  397. $"'{string.Empty}')";
  398. var res = await _serviceTDengine.GernalRestSql(sql);
  399. #endregion
  400. };
  401. // Console.WriteLine("----------");
  402. }
  403. }
  404. catch (Exception ex)
  405. {
  406. Console.WriteLine(ex.Message);
  407. }
  408. }
  409. Console.WriteLine("++++++++++++++++++++++");
  410. Console.Write($"总共增加{cnt}");
  411. }
  412. // Console.WriteLine("");
  413. }
  414. _consumer.Commit(consumerRes);
  415. // Console.WriteLine("\n================ {0} done ");
  416. }
  417. */
  418. public async Task ProcessMsg()
  419. {
  420. }
  421. public IntPtr GetConnection()
  422. {
  423. string host = "47.116.142.20";
  424. short port = 6030;
  425. string username = "root";
  426. string password = "taosdata";
  427. string dbname = "tmqdb";
  428. var conn = TDengine.Connect(host, username, password, dbname, port);
  429. if (conn == IntPtr.Zero)
  430. {
  431. throw new Exception("Connect to TDengine failed");
  432. }
  433. else
  434. {
  435. Console.WriteLine("Connect to TDengine success");
  436. }
  437. return conn;
  438. }
  439. // 关闭消费者
  440. //public void CloseConsumer(IConsumer consumer, IntPtr conn)
  441. //{
  442. // List<string> topics = consumer.Subscription();
  443. // topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  444. // // unsubscribe
  445. // consumer.Unsubscribe();
  446. // // close consumer after use.Otherwise will lead memory leak.
  447. // consumer.Close();
  448. // TDengine.Close(conn);
  449. //}
  450. public void CloseConsumer()
  451. {
  452. List<string> topics = _consumer.Subscription();
  453. topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  454. // unsubscribe
  455. _consumer.Unsubscribe();
  456. // close consumer after use.Otherwise will lead memory leak.
  457. _consumer.Close();
  458. TDengine.Close(_conn);
  459. }
  460. }
  461. }