You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

TDengineDataSubcribe.cs 7.7KB

1 anno fa
1 anno fa
1 anno fa
1 anno fa
1 anno fa
1 anno fa
1 anno fa
1 anno fa
1 anno fa
1 anno fa
1 anno fa
1 anno fa
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. using HealthMonitor.Common;
  2. using HealthMonitor.Core.Dal;
  3. using HealthMonitor.Model.Config;
  4. using HealthMonitor.Service.Biz.db;
  5. using HealthMonitor.Service.Cache;
  6. using HealthMonitor.Service.Resolver.Factory;
  7. using HealthMonitor.Service.Resolver.Interface;
  8. using HealthMonitor.Service.Sub.Topic.Model;
  9. using Microsoft.Extensions.Logging;
  10. using Microsoft.Extensions.Options;
  11. using Newtonsoft.Json;
  12. using System;
  13. using System.Collections.Concurrent;
  14. using System.Collections.Generic;
  15. using System.Linq;
  16. using System.Reflection;
  17. using System.Text;
  18. using System.Threading.Tasks;
  19. using TDengineDriver;
  20. using TDengineTMQ;
  21. using TelpoDataService.Util.Entities.GpsLocationHistory;
  22. namespace HealthMonitor.Service.Sub
  23. {
  24. public class TDengineDataSubcribe
  25. {
  26. private readonly ILogger<TDengineDataSubcribe> _logger;
  27. private readonly MsgQueueManager _msgQueueManager;
  28. private readonly TDengineService _serviceTDengine;
  29. private readonly PersonCacheManager _personCacheMgr;
  30. private readonly TDengineServiceConfig _configTDengineService;
  31. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  32. private readonly IResolverFactory _resolverFactory;
  33. private CancellationTokenSource? _tokenSource = null;
  34. //private int cnt = 0;
  35. public TDengineDataSubcribe(
  36. TDengineService serviceDengine,
  37. PersonCacheManager personCacheMgr,
  38. BloodPressReferenceValueCacheManager bpRefValCacheManager,
  39. IResolverFactory resolverFactory,
  40. IOptions<TDengineServiceConfig> configTDengineService,
  41. MsgQueueManager msgQueueManager,
  42. ILogger<TDengineDataSubcribe> logger
  43. )
  44. {
  45. _serviceTDengine = serviceDengine;
  46. _personCacheMgr = personCacheMgr;
  47. _bpRefValCacheManager = bpRefValCacheManager;
  48. _logger = logger;
  49. _resolverFactory = resolverFactory;
  50. _msgQueueManager = msgQueueManager;
  51. _configTDengineService = configTDengineService.Value;
  52. }
  53. public void BeginListen(CancellationToken stoppingToken)
  54. {
  55. //var cfg = new ConsumerConfig
  56. //{
  57. // GourpId = "group_1",
  58. // TDConnectUser = "root",
  59. // TDConnectPasswd = "taosdata",
  60. // MsgWithTableName = "true",
  61. // TDConnectIp = "47.116.142.20",
  62. //};
  63. //var conn = GetConnection();
  64. //ProcessMsg(consumer);
  65. //防止造成多线程运行
  66. _tokenSource?.Cancel();
  67. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
  68. DoTDengineConnect();
  69. }
  70. public void DoTDengineConnect()
  71. {
  72. string host = _configTDengineService.Host;
  73. short port = 6030;
  74. string username = _configTDengineService.UserName;
  75. string password = _configTDengineService.Password;
  76. string dbname = _configTDengineService.DB;
  77. var conn = TDengine.Connect(host, username, password, dbname, port);
  78. if (conn == IntPtr.Zero)
  79. {
  80. _logger.LogError("reason:" + TDengine.Error(conn));
  81. //throw new Exception("Connect to TDengine failed"+DateTime.Now.ToString());
  82. }
  83. else
  84. {
  85. _logger.LogInformation($"连接 TDengine 成功....");
  86. //Console.WriteLine("Connect to TDengine success");
  87. }
  88. DoReceive(conn);
  89. }
  90. public void DoReceive(IntPtr Connection)
  91. {
  92. var cfg = new ConsumerConfig
  93. {
  94. GourpId = "group_1",
  95. TDConnectUser = _configTDengineService.UserName,
  96. TDConnectPasswd = _configTDengineService.Password,
  97. MsgWithTableName = "true",
  98. TDConnectIp = _configTDengineService.Host,
  99. };
  100. // string topic = "topic_hm_bp_stats";
  101. string topic = nameof(TopicHmBloodPress).ToLower();
  102. TopicHmBloodPress fields = new();
  103. PropertyInfo[] props = fields.GetType().GetProperties();
  104. // 获取 fields
  105. string attributes = "";
  106. foreach (PropertyInfo prop in props)
  107. {
  108. JsonPropertyAttribute attr = prop.GetCustomAttribute<JsonPropertyAttribute>()!;
  109. if (attr != null)
  110. {
  111. attributes += attr.PropertyName + ",";
  112. }
  113. }
  114. attributes = attributes.TrimEnd(',');
  115. //create topic
  116. IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select {attributes} from health_monitor.hm_bloodpress");
  117. if (TDengine.ErrorNo(res) != 0)
  118. {
  119. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  120. }
  121. // create consumer
  122. var consumer = new ConsumerBuilder(cfg)
  123. .Build();
  124. // subscribe
  125. consumer.Subscribe(topic);
  126. while (!_tokenSource!.IsCancellationRequested)
  127. {
  128. var consumeRes = consumer.Consume(300);
  129. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
  130. {
  131. for (int i = 0; i < kv.Value.Datas.Count; i++)
  132. {
  133. if (((i + 1) % kv.Value.Metas.Count == 0))
  134. {
  135. try
  136. {
  137. IDictionary<string, object> row = new Dictionary<string, object>();
  138. foreach (var meta in kv.Value.Metas)
  139. {
  140. int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
  141. //var value = kv.Value.Datas[index];
  142. row.Add(meta.name, kv.Value.Datas[index]);
  143. }
  144. var db = kv.Key.db;
  145. var table = kv.Key.table;
  146. var kvTopic = kv.Key.topic;
  147. var body = JsonConvert.SerializeObject(row);
  148. ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
  149. ParsePackage(msg);
  150. }
  151. catch (Exception ex)
  152. {
  153. _logger.LogError($"解析包发生异常:{ex.Message}, {ex.StackTrace}");
  154. // Console.WriteLine(ex.Message);
  155. }
  156. }
  157. }
  158. }
  159. consumer.Commit(consumeRes);
  160. // Console.WriteLine("监听中....");
  161. _logger.LogInformation($"监听中....");
  162. }
  163. // close consumer after use.Otherwise will lead memory leak.
  164. consumer.Close();
  165. TDengine.Close(Connection);
  166. }
  167. public void ParsePackage(ReceiveMessageModel model)
  168. {
  169. var msg = _resolverFactory.ParseAndWrap(model);
  170. //Console.WriteLine("msg");
  171. //cnt++;
  172. //Console.WriteLine(cnt);
  173. //Console.WriteLine(msg!.MessageId);
  174. if (msg == null) return;
  175. // ConcurrentQueue<ReceiveMessageModel> messageQueue = new ConcurrentQueue<ReceiveMessageModel>();
  176. //_msgManager.EnqueueMsg(msg!);
  177. _msgQueueManager.Enqueue(msg);
  178. }
  179. }
  180. }