You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

417 lines
18KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Core.Dal;
  3. using HealthMonitor.Service.Biz.db;
  4. using HealthMonitor.Service.Cache;
  5. using Microsoft.Extensions.Logging;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Text;
  10. using System.Threading.Tasks;
  11. using TDengineDriver;
  12. using TDengineTMQ;
  13. using TelpoDataService.Util.Entities.GpsLocationHistory;
  14. namespace HealthMonitor.Service.Sub
  15. {
  16. public class TDengineDataSubcribe
  17. {
  18. private readonly ILogger<TDengineDataSubcribe> _logger;
  19. private IConsumer _consumer = default!;
  20. private IntPtr _conn = default!;
  21. private readonly TDengineService _serviceTDengine;
  22. private readonly PersonCacheManager _personCacheMgr;
  23. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  24. private int cnt = 0;
  25. public TDengineDataSubcribe(
  26. TDengineService serviceDengine,
  27. PersonCacheManager personCacheMgr,
  28. BloodPressReferenceValueCacheManager bpRefValCacheManager,
  29. ILogger<TDengineDataSubcribe> logger
  30. )
  31. {
  32. _serviceTDengine = serviceDengine;
  33. _personCacheMgr = personCacheMgr;
  34. _bpRefValCacheManager = bpRefValCacheManager;
  35. _logger = logger;
  36. _conn = GetConnection();
  37. }
  38. public void BeginListen(CancellationToken stoppingToken)
  39. {
  40. //var cfg = new ConsumerConfig
  41. //{
  42. // GourpId = "group_1",
  43. // TDConnectUser = "root",
  44. // TDConnectPasswd = "taosdata",
  45. // MsgWithTableName = "true",
  46. // TDConnectIp = "47.116.142.20",
  47. //};
  48. //var conn = GetConnection();
  49. //var consumer = CreateConsumer(cfg, conn);
  50. //ProcessMsg(consumer);
  51. }
  52. public void CreateConnection()
  53. {
  54. var cfg = new ConsumerConfig
  55. {
  56. GourpId = "group_1",
  57. TDConnectUser = "root",
  58. TDConnectPasswd = "taosdata",
  59. MsgWithTableName = "true",
  60. TDConnectIp = "47.116.142.20",
  61. };
  62. var conn = GetConnection();
  63. }
  64. // 创建消费者
  65. public void CreateConsumer()
  66. {
  67. var cfg = new ConsumerConfig
  68. {
  69. GourpId = "group_1",
  70. TDConnectUser = "root",
  71. TDConnectPasswd = "taosdata",
  72. MsgWithTableName = "true",
  73. TDConnectIp = "47.116.142.20",
  74. };
  75. //IntPtr conn = GetConnection();
  76. string topic = "topic_hm_bp_stats";
  77. //create topic
  78. IntPtr res = TDengine.Query(_conn, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
  79. if (TDengine.ErrorNo(res) != 0)
  80. {
  81. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  82. }
  83. // create consumer
  84. var consumer = new ConsumerBuilder(cfg)
  85. .Build();
  86. // subscribe
  87. consumer.Subscribe(topic);
  88. _consumer = consumer;
  89. }
  90. //public IConsumer CreateConsumer()
  91. //{
  92. // var cfg = new ConsumerConfig
  93. // {
  94. // GourpId = "group_1",
  95. // TDConnectUser = "root",
  96. // TDConnectPasswd = "taosdata",
  97. // MsgWithTableName = "true",
  98. // TDConnectIp = "47.116.142.20",
  99. // };
  100. // var conn = GetConnection();
  101. // //IntPtr conn = GetConnection();
  102. // string topic = "topic_name";
  103. // //create topic
  104. // IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from ctb1");
  105. // if (TDengine.ErrorNo(res) != 0)
  106. // {
  107. // throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  108. // }
  109. // // create consumer
  110. // var consumer = new ConsumerBuilder(cfg)
  111. // .Build();
  112. // // subscribe
  113. // consumer.Subscribe(topic);
  114. // return consumer;
  115. //}
  116. //public void ProcessMsg()
  117. //{
  118. // var consumerRes = _consumer.Consume(300);
  119. // // process ConsumeResult
  120. // foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  121. // {
  122. // Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  123. // kv.Value.Metas.ForEach(meta =>
  124. // {
  125. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  126. // });
  127. // Console.WriteLine("");
  128. // kv.Value.Datas.ForEach(data =>
  129. // {
  130. // Console.WriteLine(data.ToString());
  131. // });
  132. // }
  133. // _consumer.Commit(consumerRes);
  134. // // Console.WriteLine("\n================ {0} done ");
  135. //}
  136. public async Task ProcessMsg()
  137. {
  138. var consumerRes = _consumer.Consume(300);
  139. Console.WriteLine(consumerRes.Message.Count);
  140. // process ConsumeResult
  141. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
  142. {
  143. //Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
  144. //kv.Value.Metas.ForEach(meta =>
  145. //{
  146. // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
  147. //});
  148. //Console.WriteLine("----------");
  149. //kv.Value.Datas.ForEach(data =>
  150. //{
  151. // Console.WriteLine(data.ToString());
  152. //});
  153. //Console.WriteLine("----------");
  154. for (int i = 0; i < kv.Value.Datas.Count; i++)
  155. {
  156. // Console.Write($"|{kv.Value.Datas[i].ToString()} \t");
  157. //Console.WriteLine("{0},{1},{2}", i, resMeta.Count, (i) % resMeta.Count);
  158. if (((i + 1) % kv.Value.Metas.Count == 0))
  159. {
  160. try
  161. {
  162. cnt++;
  163. string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
  164. string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
  165. string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
  166. int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
  167. int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
  168. DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
  169. DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
  170. int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
  171. bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
  172. // Console.WriteLine("----------");
  173. HisGpsBloodPress bp = new()
  174. {
  175. //BloodPressId = (string)kv.Value.Datas[i -8],
  176. //MessageId = (string)kv.Value.Datas[i -7],
  177. //Serialno = (string)kv.Value.Datas[i -6],
  178. //SystolicValue = (int)kv.Value.Datas[i -5],
  179. //DiastolicValue = (int)kv.Value.Datas[i -4],
  180. //CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -3]/1000000),
  181. //LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -2]/1000000),
  182. //Method = (int)kv.Value.Datas[i -1],
  183. //IsDisplay = (bool)kv.Value.Datas[i] ? 1 : 0,
  184. BloodPressId = bloodpress_id,
  185. MessageId = message_id,
  186. Serialno = serialno,
  187. SystolicValue = systolic_value,
  188. DiastolicValue = diastolic_value,
  189. CreateTime = create_time,
  190. LastUpdate = last_update,
  191. Method = method,
  192. IsDisplay = is_display ? 1 : 0,
  193. };
  194. #region 获取个人信息
  195. var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false);
  196. // 验证这个信息是否存在
  197. if (person == null || person?.Person.BornDate == null)
  198. {
  199. Console.WriteLine("验证这个信息是否存在");
  200. }
  201. else
  202. {
  203. // 验证年龄是否在范围 (2 - 120)
  204. var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
  205. #endregion
  206. if (age < 1 || age > 120)
  207. {
  208. Console.WriteLine("验证年龄是否在范围 (2 - 120)");
  209. }
  210. else
  211. {
  212. var gender = person?.Person.Gender == true ? 1 : 2;
  213. var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
  214. var height = SafeType.SafeDouble(person?.Person.Height!);
  215. var weight = SafeType.SafeDouble(person?.Person.Weight!);
  216. #region 插入当次BP数据
  217. // 保存到TDengine
  218. //var bpSql = $"INSERT INTO health_monitor.hm_bloodpress VALUES(" +
  219. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  220. // $"'{bp.BloodPressId}'," +
  221. // $"'{bp.MessageId}'," +
  222. // $"'{bp.Serialno}'," +
  223. // $"{bp.SystolicValue}," +
  224. // $"{bp.DiastolicValue}," +
  225. // $"'{bp.CreateTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  226. // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  227. // $"{bp.Method}," +
  228. // $"{bp.IsDisplay == 1})";
  229. //await _serviceTDengine.GernalRestSql(bpSql);
  230. #endregion
  231. #region 计算增量值
  232. var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);
  233. var systolicRefValue = bpRef?.Systolic;//?
  234. var diastolicRefValue = bpRef?.Diastolic;//?
  235. int duration = 30;
  236. // 获取历史数据
  237. //DateTime now = DateTime.Now;
  238. DateTime now = (DateTime)bp.LastUpdate; //测试
  239. DateTime startTime = now.AddDays(-duration);
  240. DateTime endTime = now;
  241. //
  242. var systolicAggregate = await _serviceTDengine.GetAggregateValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  243. var diastolicAggregate = await _serviceTDengine.GetAggregateValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
  244. // 最大值
  245. var systolicMax = systolicAggregate.Max;
  246. var diastolicMax = diastolicAggregate.Max;
  247. // 最小值
  248. var systolicMin = systolicAggregate.Min;
  249. var diastolicMin = diastolicAggregate.Min;
  250. // 计算去除最大值和最小值和异常值的平均值
  251. var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} ");
  252. var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}");
  253. // 偏移参数
  254. var avgOffset = 0.25M;
  255. var systolicAvgOffset = avgOffset;
  256. var diastolicAvgOffset = avgOffset;
  257. // 增量值=(标定值-平均值)* 0.25
  258. var systolicInc = systolicAvg.Equals(0M) ? 0 : (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
  259. var diastolicInc = diastolicAvg.Equals(0M) ? 0 : (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;
  260. #endregion
  261. #region 插入BP增量值
  262. var sql = $"INSERT INTO health_monitor.hm_bloodpress_stats_inc VALUES(" +
  263. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  264. $"'{bp.BloodPressId}'," +
  265. $"'{bp.MessageId}'," +
  266. $"'{bp.Serialno}'," +
  267. $"{bp.SystolicValue}," +
  268. $"{systolicRefValue}," +
  269. $"{systolicAvg}," +
  270. $"{systolicMax}," +
  271. $"{systolicMin}," +
  272. $"{systolicAvgOffset}," +
  273. $"{systolicInc}," +
  274. $"{bp.DiastolicValue}," +
  275. $"{diastolicRefValue}," +
  276. $"{diastolicAvg}," +
  277. $"{diastolicMax}," +
  278. $"{diastolicMin}," +
  279. $"{diastolicAvgOffset}," +
  280. $"{diastolicInc}," +
  281. $"{gender}," +
  282. $"{age}," +
  283. $"{height}," +
  284. $"{weight}," +
  285. $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  286. $"{duration}," +
  287. $"'{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  288. $"'{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
  289. $"'{string.Empty}')";
  290. var res = await _serviceTDengine.GernalRestSql(sql);
  291. #endregion
  292. };
  293. // Console.WriteLine("----------");
  294. }
  295. }
  296. catch (Exception ex)
  297. {
  298. Console.WriteLine(ex.Message);
  299. }
  300. }
  301. Console.WriteLine("++++++++++++++++++++++");
  302. Console.Write($"总共增加{cnt}");
  303. }
  304. // Console.WriteLine("");
  305. }
  306. _consumer.Commit(consumerRes);
  307. // Console.WriteLine("\n================ {0} done ");
  308. }
  309. public IntPtr GetConnection()
  310. {
  311. string host = "47.116.142.20";
  312. short port = 6030;
  313. string username = "root";
  314. string password = "taosdata";
  315. string dbname = "tmqdb";
  316. var conn = TDengine.Connect(host, username, password, dbname, port);
  317. if (conn == IntPtr.Zero)
  318. {
  319. throw new Exception("Connect to TDengine failed");
  320. }
  321. else
  322. {
  323. Console.WriteLine("Connect to TDengine success");
  324. }
  325. return conn;
  326. }
  327. // 关闭消费者
  328. //public void CloseConsumer(IConsumer consumer, IntPtr conn)
  329. //{
  330. // List<string> topics = consumer.Subscription();
  331. // topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  332. // // unsubscribe
  333. // consumer.Unsubscribe();
  334. // // close consumer after use.Otherwise will lead memory leak.
  335. // consumer.Close();
  336. // TDengine.Close(conn);
  337. //}
  338. public void CloseConsumer()
  339. {
  340. List<string> topics = _consumer.Subscription();
  341. topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
  342. // unsubscribe
  343. _consumer.Unsubscribe();
  344. // close consumer after use.Otherwise will lead memory leak.
  345. _consumer.Close();
  346. TDengine.Close(_conn);
  347. }
  348. }
  349. }