You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

553 line
23KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Core.Dal;
  3. using HealthMonitor.Service.Biz.db;
  4. using HealthMonitor.Service.Cache;
  5. using HealthMonitor.Service.Resolver.Factory;
  6. using HealthMonitor.Service.Resolver.Interface;
  7. using Microsoft.Extensions.Logging;
  8. using Newtonsoft.Json;
  9. using System;
  10. using System.Collections.Generic;
  11. using System.Linq;
  12. using System.Text;
  13. using System.Threading.Tasks;
  14. using TDengineDriver;
  15. using TDengineTMQ;
  16. using TelpoDataService.Util.Entities.GpsLocationHistory;
  17. namespace HealthMonitor.Service.Sub
  18. {
  19. public class TDengineDataSubcribe
  20. {
  21. private readonly ILogger<TDengineDataSubcribe> _logger;
  22. private IConsumer _consumer = default!;
  23. private IntPtr _conn = default!;
  24. private readonly MsgManager _msgManager;
  25. private readonly TDengineService _serviceTDengine;
  26. private readonly PersonCacheManager _personCacheMgr;
  27. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  28. private readonly IResolverFactory _resolverFactory;
  29. private int cnt = 0;
  30. public TDengineDataSubcribe(
  31. TDengineService serviceDengine,
  32. PersonCacheManager personCacheMgr,
  33. BloodPressReferenceValueCacheManager bpRefValCacheManager,
  34. IResolverFactory resolverFactory,
  35. MsgManager msgManager,
  36. ILogger<TDengineDataSubcribe> logger
  37. )
  38. {
  39. _serviceTDengine = serviceDengine;
  40. _personCacheMgr = personCacheMgr;
  41. _bpRefValCacheManager = bpRefValCacheManager;
  42. _logger = logger;
  43. _resolverFactory = resolverFactory;
  44. _msgManager = msgManager;
  45. _conn = GetConnection();
  46. }
  47. public void BeginListen(CancellationToken stoppingToken)
  48. {
  49. //var cfg = new ConsumerConfig
  50. //{
  51. // GourpId = "group_1",
  52. // TDConnectUser = "root",
  53. // TDConnectPasswd = "taosdata",
  54. // MsgWithTableName = "true",
  55. // TDConnectIp = "47.116.142.20",
  56. //};
  57. //var conn = GetConnection();
  58. //ProcessMsg(consumer);
  59. DoTDengineConnect();
  60. }
  61. public void DoTDengineConnect()
  62. {
  63. string host = "47.116.142.20";
  64. short port = 6030;
  65. string username = "root";
  66. string password = "taosdata";
  67. string dbname = "health_monitor";
  68. var conn = TDengine.Connect(host, username, password, dbname, port);
  69. if (conn == IntPtr.Zero)
  70. {
  71. throw new Exception("Connect to TDengine failed");
  72. }
  73. else
  74. {
  75. Console.WriteLine("Connect to TDengine success");
  76. }
  77. DoReceive(conn);
  78. }
  79. public void DoReceive(IntPtr Connection)
  80. {
  81. var cfg = new ConsumerConfig
  82. {
  83. GourpId = "group_1",
  84. TDConnectUser = "root",
  85. TDConnectPasswd = "taosdata",
  86. MsgWithTableName = "true",
  87. TDConnectIp = "47.116.142.20",
  88. };
  89. string topic = "topic_hm_bp_stats";
  90. //create topic
  91. IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
  92. if (TDengine.ErrorNo(res) != 0)
  93. {
  94. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  95. }
  96. // create consumer
  97. var consumer = new ConsumerBuilder(cfg)
  98. .Build();
  99. // subscribe
  100. consumer.Subscribe(topic);
  101. while (true)
  102. {
  103. var consumeRes = consumer.Consume(300);
  104. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
  105. {
  106. for (int i = 0; i < kv.Value.Datas.Count; i++)
  107. {
  108. if (((i + 1) % kv.Value.Metas.Count == 0))
  109. {
  110. string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
  111. string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
  112. string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
  113. int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
  114. int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
  115. DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
  116. DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
  117. int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
  118. bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
  119. HisGpsBloodPress bp = new()
  120. {
  121. BloodPressId = bloodpress_id,
  122. MessageId = message_id,
  123. Serialno = serialno,
  124. SystolicValue = systolic_value,
  125. DiastolicValue = diastolic_value,
  126. CreateTime = create_time,
  127. LastUpdate = last_update,
  128. Method = method,
  129. IsDisplay = is_display ? 1 : 0,
  130. };
  131. try
  132. {
  133. var db = kv.Key.db;
  134. var table = kv.Key.table;
  135. var kvTopic = kv.Key.topic;
  136. var body = JsonConvert.SerializeObject(bp);
  137. ReceiveMessageModel msg = new(db, table, kvTopic, Guid.NewGuid().ToString("N"), body);
  138. ParsePackage(msg);
  139. }
  140. catch (Exception ex)
  141. {
  142. Console.WriteLine(ex.Message);
  143. }
  144. }
  145. }
  146. }
  147. consumer.Commit(consumeRes);
  148. Console.WriteLine("consumer.Commit");
  149. }
  150. }
  151. public void ParsePackage(ReceiveMessageModel model)
  152. {
  153. var msg = _resolverFactory.ParseAndWrap(model);
  154. Console.WriteLine("msg");
  155. if (msg == null) return;
  156. _msgManager.AddMsg(msg);
  157. }
  158. public void CreateConnection()
  159. {
  160. var cfg = new ConsumerConfig
  161. {
  162. GourpId = "group_1",
  163. TDConnectUser = "root",
  164. TDConnectPasswd = "taosdata",
  165. MsgWithTableName = "true",
  166. TDConnectIp = "47.116.142.20",
  167. };
  168. var conn = GetConnection();
  169. }
  170. // 创建消费者
  171. public void CreateConsumer()
  172. {
  173. var cfg = new ConsumerConfig
  174. {
  175. GourpId = "group_1",
  176. TDConnectUser = "root",
  177. TDConnectPasswd = "taosdata",
  178. MsgWithTableName = "true",
  179. TDConnectIp = "47.116.142.20",
  180. };
  181. //IntPtr conn = GetConnection();
  182. string topic = "topic_hm_bp_stats";
  183. //create topic
  184. IntPtr res = TDengine.Query(_conn, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
  185. if (TDengine.ErrorNo(res) != 0)
  186. {
  187. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  188. }
  189. // create consumer
  190. var consumer = new ConsumerBuilder(cfg)
  191. .Build();
  192. // subscribe
  193. consumer.Subscribe(topic);
  194. _consumer = consumer;
  195. }
  196. //public IConsumer CreateConsumer()
  197. //{
  198. // var cfg = new ConsumerConfig
  199. // {
  200. // GourpId = "group_1",
  201. // TDConnectUser = "root",
  202. // TDConnectPasswd = "taosdata",
  203. // MsgWithTableName = "true",
  204. // TDConnectIp = "47.116.142.20",
  205. // };
  206. // var conn = GetConnection();
  207. // //IntPtr conn = GetConnection();
  208. // string topic = "topic_name";
  209. // //create topic
  210. // IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from ctb1");
  211. // if (TDengine.ErrorNo(res) != 0)
  212. // {
  213. // throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  214. // }
  215. // // create consumer
  216. // var consumer = new ConsumerBuilder(cfg)
  217. // .Build();
  218. // // subscribe
  219. // consumer.Subscribe(topic);
  220. // return consumer;
  221. //}
  222. //public void ProcessMsg()
  223. //{
  224. // var consumerRes = _consumer.Consume(300);
  225. // // process ConsumeResult
  226. // foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  227. // {
  228. // Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  229. // kv.Value.Metas.ForEach(meta =>
  230. // {
  231. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  232. // });
  233. // Console.WriteLine("");
  234. // kv.Value.Datas.ForEach(data =>
  235. // {
  236. // Console.WriteLine(data.ToString());
  237. // });
  238. // }
  239. // _consumer.Commit(consumerRes);
  240. // // Console.WriteLine("\n================ {0} done ");
  241. //}
  242. /**
  243. public async Task ProcessMsg()
  244. {
  245. var consumerRes = _consumer.Consume(300);
  246. Console.WriteLine(consumerRes.Message.Count);
  247. // process ConsumeResult
  248. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  249. {
  250. //Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  251. //kv.Value.Metas.ForEach(meta =>
  252. //{
  253. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  254. //});
  255. //Console.WriteLine("----------");
  256. //kv.Value.Datas.ForEach(data =>
  257. //{
  258. // Console.WriteLine(data.ToString());
  259. //});
  260. //Console.WriteLine("----------");
  261. for (int i = 0; i < kv.Value.Datas.Count; i++)
  262. {
  263. // Console.Write($"|{kv.Value.Datas[i].ToString()} \t");
  264. //Console.WriteLine("{0},{1},{2}", i, resMeta.Count, (i) % resMeta.Count);
  265. if (((i + 1) % kv.Value.Metas.Count == 0))
  266. {
  267. try
  268. {
  269. cnt++;
  270. string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
  271. string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
  272. string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
  273. int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
  274. int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
  275. DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
  276. DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
  277. int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
  278. bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
  279. // Console.WriteLine("----------");
  280. HisGpsBloodPress bp = new()
  281. {
  282. //BloodPressId = (string)kv.Value.Datas[i -8],
  283. //MessageId = (string)kv.Value.Datas[i -7],
  284. //Serialno = (string)kv.Value.Datas[i -6],
  285. //SystolicValue = (int)kv.Value.Datas[i -5],
  286. //DiastolicValue = (int)kv.Value.Datas[i -4],
  287. //CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -3]/1000000),
  288. //LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -2]/1000000),
  289. //Method = (int)kv.Value.Datas[i -1],
  290. //IsDisplay = (bool)kv.Value.Datas[i] ? 1 : 0,
  291. BloodPressId = bloodpress_id,
  292. MessageId = message_id,
  293. Serialno = serialno,
  294. SystolicValue = systolic_value,
  295. DiastolicValue = diastolic_value,
  296. CreateTime = create_time,
  297. LastUpdate = last_update,
  298. Method = method,
  299. IsDisplay = is_display ? 1 : 0,
  300. };
  301. #region 获取个人信息
  302. var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false);
  303. // 验证这个信息是否存在
  304. if (person == null || person?.Person.BornDate == null)
  305. {
  306. Console.WriteLine("验证这个信息是否存在");
  307. }
  308. else
  309. {
  310. // 验证年龄是否在范围 (2 - 120)
  311. var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
  312. #endregion
  313. if (age < 1 || age > 120)
  314. {
  315. Console.WriteLine("验证年龄是否在范围 (2 - 120)");
  316. }
  317. else
  318. {
  319. var gender = person?.Person.Gender == true ? 1 : 2;
  320. var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
  321. var height = SafeType.SafeDouble(person?.Person.Height!);
  322. var weight = SafeType.SafeDouble(person?.Person.Weight!);
  323. #region 插入当次BP数据
  324. // 保存到TDengine
  325. //var bpSql = $"INSERT INTO health_monitor.hm_bloodpress VALUES(" +
  326. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  327. // $"'{bp.BloodPressId}'," +
  328. // $"'{bp.MessageId}'," +
  329. // $"'{bp.Serialno}'," +
  330. // $"{bp.SystolicValue}," +
  331. // $"{bp.DiastolicValue}," +
  332. // $"'{bp.CreateTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  333. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  334. // $"{bp.Method}," +
  335. // $"{bp.IsDisplay == 1})";
  336. //await _serviceTDengine.GernalRestSql(bpSql);
  337. #endregion
  338. #region 计算增量值
  339. var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);
  340. var systolicRefValue = bpRef?.Systolic;//?
  341. var diastolicRefValue = bpRef?.Diastolic;//?
  342. int duration = 30;
  343. // 获取历史数据
  344. //DateTime now = DateTime.Now;
  345. DateTime now = (DateTime)bp.LastUpdate; //测试
  346. DateTime startTime = now.AddDays(-duration);
  347. DateTime endTime = now;
  348. //
  349. var systolicAggregate = await _serviceTDengine.GetAggregateValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  350. var diastolicAggregate = await _serviceTDengine.GetAggregateValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  351. // 最大值
  352. var systolicMax = systolicAggregate.Max;
  353. var diastolicMax = diastolicAggregate.Max;
  354. // 最小值
  355. var systolicMin = systolicAggregate.Min;
  356. var diastolicMin = diastolicAggregate.Min;
  357. // 计算去除最大值和最小值和异常值的平均值
  358. var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} ");
  359. var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}");
  360. // 偏移参数
  361. var avgOffset = 0.25M;
  362. var systolicAvgOffset = avgOffset;
  363. var diastolicAvgOffset = avgOffset;
  364. // 增量值=(标定值-平均值)* 0.25
  365. var systolicInc = systolicAvg.Equals(0M) ? 0 : (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
  366. var diastolicInc = diastolicAvg.Equals(0M) ? 0 : (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;
  367. #endregion
  368. #region 插入BP增量值
  369. var sql = $"INSERT INTO health_monitor.hm_bloodpress_stats_inc VALUES(" +
  370. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  371. $"'{bp.BloodPressId}'," +
  372. $"'{bp.MessageId}'," +
  373. $"'{bp.Serialno}'," +
  374. $"{bp.SystolicValue}," +
  375. $"{systolicRefValue}," +
  376. $"{systolicAvg}," +
  377. $"{systolicMax}," +
  378. $"{systolicMin}," +
  379. $"{systolicAvgOffset}," +
  380. $"{systolicInc}," +
  381. $"{bp.DiastolicValue}," +
  382. $"{diastolicRefValue}," +
  383. $"{diastolicAvg}," +
  384. $"{diastolicMax}," +
  385. $"{diastolicMin}," +
  386. $"{diastolicAvgOffset}," +
  387. $"{diastolicInc}," +
  388. $"{gender}," +
  389. $"{age}," +
  390. $"{height}," +
  391. $"{weight}," +
  392. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  393. $"{duration}," +
  394. $"'{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  395. $"'{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  396. $"'{string.Empty}')";
  397. var res = await _serviceTDengine.GernalRestSql(sql);
  398. #endregion
  399. };
  400. // Console.WriteLine("----------");
  401. }
  402. }
  403. catch (Exception ex)
  404. {
  405. Console.WriteLine(ex.Message);
  406. }
  407. }
  408. Console.WriteLine("++++++++++++++++++++++");
  409. Console.Write($"总共增加{cnt}");
  410. }
  411. // Console.WriteLine("");
  412. }
  413. _consumer.Commit(consumerRes);
  414. // Console.WriteLine("\n================ {0} done ");
  415. }
  416. */
  417. public async Task ProcessMsg()
  418. {
  419. }
  420. public IntPtr GetConnection()
  421. {
  422. string host = "47.116.142.20";
  423. short port = 6030;
  424. string username = "root";
  425. string password = "taosdata";
  426. string dbname = "tmqdb";
  427. var conn = TDengine.Connect(host, username, password, dbname, port);
  428. if (conn == IntPtr.Zero)
  429. {
  430. throw new Exception("Connect to TDengine failed");
  431. }
  432. else
  433. {
  434. Console.WriteLine("Connect to TDengine success");
  435. }
  436. return conn;
  437. }
  438. // 关闭消费者
  439. //public void CloseConsumer(IConsumer consumer, IntPtr conn)
  440. //{
  441. // List<string> topics = consumer.Subscription();
  442. // topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  443. // // unsubscribe
  444. // consumer.Unsubscribe();
  445. // // close consumer after use.Otherwise will lead memory leak.
  446. // consumer.Close();
  447. // TDengine.Close(conn);
  448. //}
  449. public void CloseConsumer()
  450. {
  451. List<string> topics = _consumer.Subscription();
  452. topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  453. // unsubscribe
  454. _consumer.Unsubscribe();
  455. // close consumer after use.Otherwise will lead memory leak.
  456. _consumer.Close();
  457. TDengine.Close(_conn);
  458. }
  459. }
  460. }