You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 line
8.0KB

  1. extern alias CustomTypes;
  2. using HealthMonitor.Common;
  3. using HealthMonitor.Core.Dal;
  4. using HealthMonitor.Model.Config;
  5. using HealthMonitor.Service.Biz.db;
  6. using HealthMonitor.Service.Cache;
  7. using HealthMonitor.Service.Resolver.Factory;
  8. using HealthMonitor.Service.Resolver.Interface;
  9. using HealthMonitor.Service.Sub.Topic.Model;
  10. using Microsoft.Extensions.Logging;
  11. using Microsoft.Extensions.Options;
  12. using Newtonsoft.Json;
  13. using System;
  14. using System.Collections.Concurrent;
  15. using System.Collections.Generic;
  16. using System.Linq;
  17. using System.Reflection;
  18. using System.Text;
  19. using System.Threading.Tasks;
  20. using CustomTypes.TDengineDriver;
  21. using CustomTypes.TDengineTMQ;
  22. using TelpoDataService.Util.Entities.GpsLocationHistory;
  23. namespace HealthMonitor.Service.Sub
  24. {
  25. public class TDengineDataSubcribe
  26. {
  27. private readonly ILogger<TDengineDataSubcribe> _logger;
  28. private readonly MsgQueueManager _msgQueueManager;
  29. private readonly TDengineService _serviceTDengine;
  30. private readonly PersonCacheManager _personCacheMgr;
  31. private readonly TDengineServiceConfig _configTDengineService;
  32. private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
  33. private readonly IResolverFactory _resolverFactory;
  34. private CancellationTokenSource? _tokenSource = null;
  35. //private int cnt = 0;
  36. public TDengineDataSubcribe(
  37. TDengineService serviceTDengine,
  38. PersonCacheManager personCacheMgr,
  39. BloodPressReferenceValueCacheManager bpRefValCacheManager,
  40. IResolverFactory resolverFactory,
  41. IOptions<TDengineServiceConfig> configTDengineService,
  42. MsgQueueManager msgQueueManager,
  43. ILogger<TDengineDataSubcribe> logger
  44. )
  45. {
  46. _serviceTDengine = serviceTDengine;
  47. _personCacheMgr = personCacheMgr;
  48. _bpRefValCacheManager = bpRefValCacheManager;
  49. _logger = logger;
  50. _resolverFactory = resolverFactory;
  51. _msgQueueManager = msgQueueManager;
  52. _configTDengineService = configTDengineService.Value;
  53. }
  54. public void BeginListen(CancellationToken stoppingToken)
  55. {
  56. //var cfg = new ConsumerConfig
  57. //{
  58. // GourpId = "group_1",
  59. // TDConnectUser = "root",
  60. // TDConnectPasswd = "taosdata",
  61. // MsgWithTableName = "true",
  62. // TDConnectIp = "47.116.142.20",
  63. //};
  64. //var conn = GetConnection();
  65. //ProcessMsg(consumer);
  66. //防止造成多线程运行
  67. _tokenSource?.Cancel();
  68. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
  69. DoTDengineConnect();
  70. }
  71. public void DoTDengineConnect()
  72. {
  73. // string host = _configTDengineService.Host;
  74. // short port = 6030;
  75. // string username = _configTDengineService.UserName;
  76. // string password = _configTDengineService.Password;
  77. // string dbname = _configTDengineService.DB;
  78. ////#if DEBUG
  79. //// //string configDir = "C:/TDengine/cfg";
  80. //// //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  81. //// TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  82. ////#endif
  83. // var conn = TDengine.Connect(host, username, password, dbname, port);
  84. // if (conn == IntPtr.Zero)
  85. // {
  86. // _logger.LogError("reason:{TDengine.Error(conn)}", TDengine.Error(conn));
  87. // }
  88. // else
  89. // {
  90. // _logger.LogInformation($"连接 TDengine 成功....");
  91. // }
  92. var conn = _serviceTDengine.Connection();
  93. DoReceive(conn);
  94. }
  95. public void DoReceive(IntPtr Connection)
  96. {
  97. // string topic = "topic_hm_bp_stats";
  98. string topic = nameof(TopicHmBloodPress).ToLower();
  99. TopicHmBloodPress fields = new();
  100. PropertyInfo[] props = fields.GetType().GetProperties();
  101. // 获取 fields
  102. string attributes = "";
  103. foreach (PropertyInfo prop in props)
  104. {
  105. JsonPropertyAttribute attr = prop.GetCustomAttribute<JsonPropertyAttribute>()!;
  106. if (attr != null)
  107. {
  108. attributes += attr.PropertyName + ",";
  109. }
  110. }
  111. attributes = attributes.TrimEnd(',');
  112. //create topic
  113. IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select {attributes} from health_monitor.stb_hm_bloodpress");
  114. if (TDengine.ErrorNo(res) != 0)
  115. {
  116. _logger.LogError($"create topic failed, reason:{TDengine.Error(res)}");
  117. throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
  118. }
  119. var cfg = new ConsumerConfig
  120. {
  121. GourpId = "group_1",
  122. TDConnectUser = _configTDengineService.UserName,
  123. TDConnectPasswd = _configTDengineService.Password,
  124. MsgWithTableName = "true",
  125. TDConnectIp = _configTDengineService.Host,
  126. };
  127. // create consumer
  128. var consumer = new ConsumerBuilder(cfg)
  129. .Build();
  130. // subscribe
  131. consumer.Subscribe(topic);
  132. while (!_tokenSource!.IsCancellationRequested)
  133. {
  134. var consumeRes = consumer.Consume(300);
  135. foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
  136. {
  137. for (int i = 0; i < kv.Value.Datas.Count; i++)
  138. {
  139. if (((i + 1) % kv.Value.Metas.Count == 0))
  140. {
  141. try
  142. {
  143. IDictionary<string, object> row = new Dictionary<string, object>();
  144. foreach (var meta in kv.Value.Metas)
  145. {
  146. int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
  147. //var value = kv.Value.Datas[index];
  148. row.Add(meta.name, kv.Value.Datas[index]);
  149. }
  150. var db = kv.Key.db;
  151. var table = kv.Key.table;
  152. var kvTopic = kv.Key.topic;
  153. var body = JsonConvert.SerializeObject(row);
  154. ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
  155. ParsePackage(msg);
  156. }
  157. catch (Exception ex)
  158. {
  159. _logger.LogError("解析包发生异常:{ex.Message}, {ex.StackTrace}", ex.Message, ex.StackTrace);
  160. }
  161. }
  162. }
  163. }
  164. consumer.Commit(consumeRes);
  165. //_logger.LogInformation("监听中....");
  166. Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff},监听中....");
  167. }
  168. // close consumer after use.Otherwise will lead memory leak.
  169. consumer.Close();
  170. TDengine.Close(Connection);
  171. }
  172. public void ParsePackage(ReceiveMessageModel model)
  173. {
  174. var msg = _resolverFactory.ParseAndWrap(model);
  175. if (msg == null) return;
  176. _msgQueueManager.Enqueue(msg);
  177. }
  178. }
  179. }