Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

215 Zeilen
8.8KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Core.Dal;
  3. using HealthMonitor.Model.Config;
  4. using HealthMonitor.Service.Biz.db;
  5. using HealthMonitor.Service.Cache;
  6. using HealthMonitor.Service.Resolver.Factory;
  7. using HealthMonitor.Service.Resolver.Interface;
  8. using HealthMonitor.Service.Sub.Topic.Model;
  9. using Microsoft.Extensions.Logging;
  10. using Microsoft.Extensions.Options;
  11. using Newtonsoft.Json;
  12. using System;
  13. using System.Collections.Concurrent;
  14. using System.Collections.Generic;
  15. using System.Linq;
  16. using System.Reflection;
  17. using System.Text;
  18. using System.Threading.Tasks;
  19. using TDengineDriver;
  20. using TDengineTMQ;
  21. using TelpoDataService.Util.Entities.GpsLocationHistory;
  22. using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database;
  23. namespace HealthMonitor.Service.Sub
  24. {
  25. public class TDengineDataSubcribe
  26. {
  27. private readonly ILogger<TDengineDataSubcribe> _logger;
  28. private readonly MsgQueueManager _msgQueueManager;
  29. private readonly TDengineService _serviceTDengine;
  30. private readonly PersonCacheManager _personCacheMgr;
  31. private readonly TDengineServiceConfig _configTDengineService;
  32. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  33. private readonly IResolverFactory _resolverFactory;
  34. private CancellationTokenSource? _tokenSource = null;
  35. //private int cnt = 0;
  36. public TDengineDataSubcribe(
  37. TDengineService serviceTDengine,
  38. PersonCacheManager personCacheMgr,
  39. BloodPressReferenceValueCacheManager bpRefValCacheManager,
  40. IResolverFactory resolverFactory,
  41. IOptions<TDengineServiceConfig> configTDengineService,
  42. MsgQueueManager msgQueueManager,
  43. ILogger<TDengineDataSubcribe> logger
  44. )
  45. {
  46. _serviceTDengine = serviceTDengine;
  47. _personCacheMgr = personCacheMgr;
  48. _bpRefValCacheManager = bpRefValCacheManager;
  49. _logger = logger;
  50. _resolverFactory = resolverFactory;
  51. _msgQueueManager = msgQueueManager;
  52. _configTDengineService = configTDengineService.Value;
  53. }
  54. public void BeginListen(CancellationToken stoppingToken)
  55. {
  56. _tokenSource?.Cancel();
  57. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
  58. DoTDengineConnect();
  59. }
  60. public void DoTDengineConnect()
  61. {
  62. var conn = _serviceTDengine.Connection();
  63. DoReceive(conn);
  64. }
  65. public void DoReceive(IntPtr connection)
  66. {
  67. #region topic 订阅
  68. // string topic = "topichmbpstats";
  69. //string bloodPressTopic = nameof(TopicHmBloodPress).ToLower();
  70. //TopicHmBloodPress fields = new();
  71. //PropertyInfo[] props = fields.GetType().GetProperties();
  72. //// 获取 fields
  73. //string attributes = "";
  74. //foreach (PropertyInfo prop in props)
  75. //{
  76. // JsonPropertyAttribute attr = prop.GetCustomAttribute<JsonPropertyAttribute>()!;
  77. // if (attr != null)
  78. // {
  79. // attributes += attr.PropertyName + ",";
  80. // }
  81. //}
  82. //attributes = attributes.TrimEnd(',');
  83. ////创建 topichmbpstats
  84. //IntPtr res = TDengine.Query(Connection, $"create topic if not exists {bloodPressTopic} as select {attributes} from health_monitor.stb_hm_bloodpress");
  85. ////创建 topichmpregnancyheartrate
  86. //var pregnancyHeartRateTopic = nameof(TopicHmPregnancyHeartRate).ToLower();
  87. //var pregnancyHeartateAttributes = typeof(TopicHmPregnancyHeartRate).GetType().GetProperties().Select(prop => prop.GetCustomAttribute<JsonPropertyAttribute>());
  88. //var pregnancyHeartateAttributesStr = string.Join(", ", pregnancyHeartateAttributes);
  89. //res = TDengine.Query(Connection, $"create topic if not exists {pregnancyHeartRateTopic} as select {pregnancyHeartateAttributesStr} from {_configTDengineService.DB}.stb_hm_pregnancy_heart_rate");
  90. //if (TDengine.ErrorNo(res) != 0)
  91. //{
  92. // _logger.LogError($"create topic failed, reason:{TDengine.Error(res)}");
  93. // throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  94. //}
  95. // 获取字段属性
  96. string GetAttributes(Type type)
  97. {
  98. var props = type.GetProperties();
  99. var attributeNames = props
  100. .Select(prop => prop.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName)
  101. .Where(attr => !string.IsNullOrEmpty(attr));
  102. return string.Join(",", attributeNames);
  103. }
  104. // 创建 topic
  105. void CreateTopic(IntPtr conn, string topicName, string attributes, string tableName)
  106. {
  107. string query = $"create topic if not exists {topicName} as select {attributes} from {tableName}";
  108. IntPtr res = TDengine.Query(conn, query);
  109. if (TDengine.ErrorNo(res) != 0)
  110. {
  111. string error = TDengine.Error(res);
  112. _logger.LogError($"Create topic {topicName} failed, reason: {error}");
  113. throw new Exception($"Create topic {topicName} failed, reason: {error}");
  114. }
  115. }
  116. // 血压 topic
  117. string bloodPressTopic = nameof(TopicHmBloodPress).ToLower();
  118. string bloodPressAttributes = GetAttributes(typeof(TopicHmBloodPress));
  119. CreateTopic(connection, bloodPressTopic, bloodPressAttributes, $"{_configTDengineService.DB}.stb_hm_bloodpress");
  120. // 孕期心率 topic
  121. string pregnancyHeartRateTopic = nameof(TopicHmPregnancyHeartRate).ToLower();
  122. string pregnancyHeartRateAttributes = GetAttributes(typeof(TopicHmPregnancyHeartRate));
  123. CreateTopic(connection, pregnancyHeartRateTopic, pregnancyHeartRateAttributes, $"{_configTDengineService.DB}.stb_hm_pregnancy_heart_rate");
  124. #endregion
  125. var cfg = new ConsumerConfig
  126. {
  127. GourpId = "group_1",
  128. TDConnectUser = _configTDengineService.UserName,
  129. TDConnectPasswd = _configTDengineService.Password,
  130. MsgWithTableName = "true",
  131. TDConnectIp = _configTDengineService.Host,
  132. };
  133. // create consumer
  134. var consumer = new ConsumerBuilder(cfg).Build();
  135. var topics = new string[] { bloodPressTopic, pregnancyHeartRateTopic };
  136. // subscribe
  137. consumer.Subscribe(topics);
  138. while (!_tokenSource!.IsCancellationRequested)
  139. {
  140. var consumeRes = consumer.Consume(300);
  141. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
  142. {
  143. for (int i = 0; i < kv.Value.Datas.Count; i++)
  144. {
  145. if (((i + 1) % kv.Value.Metas.Count == 0))
  146. {
  147. try
  148. {
  149. IDictionary<string, object> row = new Dictionary<string, object>();
  150. foreach (var meta in kv.Value.Metas)
  151. {
  152. int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
  153. row.Add(meta.name, kv.Value.Datas[index]);
  154. }
  155. var db = kv.Key.db;
  156. var table = kv.Key.table;
  157. var kvTopic = kv.Key.topic;
  158. var body = JsonConvert.SerializeObject(row);
  159. ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
  160. ParsePackage(msg);
  161. }
  162. catch (Exception ex)
  163. {
  164. _logger.LogError("解析包发生异常:{ex.Message}, {ex.StackTrace}", ex.Message, ex.StackTrace);
  165. }
  166. }
  167. }
  168. }
  169. consumer.Commit(consumeRes);
  170. }
  171. // close consumer after use.Otherwise will lead memory leak.
  172. consumer.Close();
  173. TDengine.Close(connection);
  174. }
  175. public void ParsePackage(ReceiveMessageModel model)
  176. {
  177. var msg = _resolverFactory.ParseAndWrap(model);
  178. if (msg == null) return;
  179. _msgQueueManager.Enqueue(msg);
  180. }
  181. }
  182. }