You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
6.8KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Core.Dal;
  3. using HealthMonitor.Model.Config;
  4. using HealthMonitor.Service.Biz.db;
  5. using HealthMonitor.Service.Cache;
  6. using HealthMonitor.Service.Resolver.Factory;
  7. using HealthMonitor.Service.Resolver.Interface;
  8. using HealthMonitor.Service.Sub.Topic.Model;
  9. using Microsoft.Extensions.Logging;
  10. using Microsoft.Extensions.Options;
  11. using Newtonsoft.Json;
  12. using System;
  13. using System.Collections.Concurrent;
  14. using System.Collections.Generic;
  15. using System.Linq;
  16. using System.Reflection;
  17. using System.Text;
  18. using System.Threading.Tasks;
  19. using TDengineDriver;
  20. using TDengineTMQ;
  21. using TelpoDataService.Util.Entities.GpsLocationHistory;
  22. namespace HealthMonitor.Service.Sub
  23. {
  24. public class TDengineDataSubcribe
  25. {
  26. private readonly ILogger<TDengineDataSubcribe> _logger;
  27. private readonly MsgQueueManager _msgQueueManager;
  28. private readonly TDengineService _serviceTDengine;
  29. private readonly PersonCacheManager _personCacheMgr;
  30. private readonly TDengineServiceConfig _configTDengineService;
  31. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  32. private readonly IResolverFactory _resolverFactory;
  33. private CancellationTokenSource? _tokenSource = null;
  34. //private int cnt = 0;
  35. public TDengineDataSubcribe(
  36. TDengineService serviceTDengine,
  37. PersonCacheManager personCacheMgr,
  38. BloodPressReferenceValueCacheManager bpRefValCacheManager,
  39. IResolverFactory resolverFactory,
  40. IOptions<TDengineServiceConfig> configTDengineService,
  41. MsgQueueManager msgQueueManager,
  42. ILogger<TDengineDataSubcribe> logger
  43. )
  44. {
  45. _serviceTDengine = serviceTDengine;
  46. _personCacheMgr = personCacheMgr;
  47. _bpRefValCacheManager = bpRefValCacheManager;
  48. _logger = logger;
  49. _resolverFactory = resolverFactory;
  50. _msgQueueManager = msgQueueManager;
  51. _configTDengineService = configTDengineService.Value;
  52. }
  53. public void BeginListen(CancellationToken stoppingToken)
  54. {
  55. _tokenSource?.Cancel();
  56. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
  57. DoTDengineConnect();
  58. }
  59. public void DoTDengineConnect()
  60. {
  61. var conn = _serviceTDengine.Connection();
  62. DoReceive(conn);
  63. }
  64. public void DoReceive(IntPtr Connection)
  65. {
  66. #region topichmbpstats 订阅
  67. // string topic = "topichmbpstats";
  68. string topic = nameof(TopicHmBloodPress).ToLower();
  69. TopicHmBloodPress fields = new();
  70. PropertyInfo[] props = fields.GetType().GetProperties();
  71. // 获取 fields
  72. string attributes = "";
  73. foreach (PropertyInfo prop in props)
  74. {
  75. JsonPropertyAttribute attr = prop.GetCustomAttribute<JsonPropertyAttribute>()!;
  76. if (attr != null)
  77. {
  78. attributes += attr.PropertyName + ",";
  79. }
  80. }
  81. attributes = attributes.TrimEnd(',');
  82. //创建 topichmbpstats
  83. IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select {attributes} from health_monitor.stb_hm_bloodpress");
  84. //创建 topichmfetalheartrate
  85. //var fetalHeartRateTopic = "topichmfetalheartrate";
  86. //var fetalHeartRateFields = "ts,fetal_heart_rate_id,message_id,person_id,serialno,fetal_heart_rate,create_time,method,last_update,is_display,device_key";
  87. //res = TDengine.Query(Connection, $"create topic if not exists {fetalHeartRateTopic} as select {fetalHeartRateFields} from health_monitor.stb_hm_fetal_heart_rate_test");
  88. #endregion
  89. if (TDengine.ErrorNo(res) != 0)
  90. {
  91. _logger.LogError($"create topic failed, reason:{TDengine.Error(res)}");
  92. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  93. }
  94. var cfg = new ConsumerConfig
  95. {
  96. GourpId = "group_1",
  97. TDConnectUser = _configTDengineService.UserName,
  98. TDConnectPasswd = _configTDengineService.Password,
  99. MsgWithTableName = "true",
  100. TDConnectIp = _configTDengineService.Host,
  101. };
  102. // create consumer
  103. var consumer = new ConsumerBuilder(cfg)
  104. .Build();
  105. // subscribe
  106. consumer.Subscribe(topic);
  107. while (!_tokenSource!.IsCancellationRequested)
  108. {
  109. var consumeRes = consumer.Consume(300);
  110. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
  111. {
  112. for (int i = 0; i < kv.Value.Datas.Count; i++)
  113. {
  114. if (((i + 1) % kv.Value.Metas.Count == 0))
  115. {
  116. try
  117. {
  118. IDictionary<string, object> row = new Dictionary<string, object>();
  119. foreach (var meta in kv.Value.Metas)
  120. {
  121. int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
  122. row.Add(meta.name, kv.Value.Datas[index]);
  123. }
  124. var db = kv.Key.db;
  125. var table = kv.Key.table;
  126. var kvTopic = kv.Key.topic;
  127. var body = JsonConvert.SerializeObject(row);
  128. ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
  129. ParsePackage(msg);
  130. }
  131. catch (Exception ex)
  132. {
  133. _logger.LogError("解析包发生异常:{ex.Message}, {ex.StackTrace}", ex.Message, ex.StackTrace);
  134. }
  135. }
  136. }
  137. }
  138. consumer.Commit(consumeRes);
  139. //_logger.LogInformation("监听中....");
  140. // Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff},监听中....");
  141. }
  142. // close consumer after use.Otherwise will lead memory leak.
  143. consumer.Close();
  144. TDengine.Close(Connection);
  145. }
  146. public void ParsePackage(ReceiveMessageModel model)
  147. {
  148. var msg = _resolverFactory.ParseAndWrap(model);
  149. if (msg == null) return;
  150. _msgQueueManager.Enqueue(msg);
  151. }
  152. }
  153. }