|
- using HealthMonitor.Common;
- using HealthMonitor.Core.Dal;
- using HealthMonitor.Model.Config;
- using HealthMonitor.Service.Biz.db;
- using HealthMonitor.Service.Cache;
- using HealthMonitor.Service.Resolver.Factory;
- using HealthMonitor.Service.Resolver.Interface;
- using HealthMonitor.Service.Sub.Topic.Model;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
-
- using Newtonsoft.Json;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reflection;
- using System.Text;
- using System.Threading.Tasks;
- using TDengineDriver;
- using TDengineTMQ;
- using TelpoDataService.Util.Entities.GpsLocationHistory;
-
- namespace HealthMonitor.Service.Sub
- {
- public class TDengineDataSubcribe
- {
- private readonly ILogger<TDengineDataSubcribe> _logger;
- private readonly MsgQueueManager _msgQueueManager;
- private readonly TDengineService _serviceTDengine;
- private readonly PersonCacheManager _personCacheMgr;
- private readonly TDengineServiceConfig _configTDengineService;
- private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
- private readonly IResolverFactory _resolverFactory;
- private CancellationTokenSource? _tokenSource = null;
-
- //private int cnt = 0;
-
- public TDengineDataSubcribe(
-
- TDengineService serviceDengine,
- PersonCacheManager personCacheMgr,
- BloodPressReferenceValueCacheManager bpRefValCacheManager,
- IResolverFactory resolverFactory,
- IOptions<TDengineServiceConfig> configTDengineService,
- MsgQueueManager msgQueueManager,
- ILogger<TDengineDataSubcribe> logger
- )
- {
- _serviceTDengine = serviceDengine;
- _personCacheMgr = personCacheMgr;
- _bpRefValCacheManager = bpRefValCacheManager;
- _logger = logger;
- _resolverFactory = resolverFactory;
- _msgQueueManager = msgQueueManager;
- _configTDengineService = configTDengineService.Value;
-
- }
- public void BeginListen(CancellationToken stoppingToken)
- {
- //var cfg = new ConsumerConfig
- //{
- // GourpId = "group_1",
- // TDConnectUser = "root",
- // TDConnectPasswd = "taosdata",
- // MsgWithTableName = "true",
- // TDConnectIp = "47.116.142.20",
- //};
- //var conn = GetConnection();
-
-
- //ProcessMsg(consumer);
-
- //防止造成多线程运行
- _tokenSource?.Cancel();
-
- _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
- DoTDengineConnect();
- }
-
- public void DoTDengineConnect()
- {
- string host = _configTDengineService.Host;
- short port = 6030;
- string username = _configTDengineService.UserName;
- string password = _configTDengineService.Password;
- string dbname = _configTDengineService.DB;
- //#if DEBUG
- // //string configDir = "C:/TDengine/cfg";
- // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
- // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
- //#endif
- var conn = TDengine.Connect(host, username, password, dbname, port);
-
- if (conn == IntPtr.Zero)
- {
- _logger.LogError("reason:{TDengine.Error(conn)}", TDengine.Error(conn));
-
- }
- else
- {
- _logger.LogInformation($"连接 TDengine 成功....");
-
- }
- DoReceive(conn);
- }
-
- public void DoReceive(IntPtr Connection)
- {
- var cfg = new ConsumerConfig
- {
- GourpId = "group_1",
- TDConnectUser = _configTDengineService.UserName,
- TDConnectPasswd = _configTDengineService.Password,
- MsgWithTableName = "true",
- TDConnectIp = _configTDengineService.Host,
- };
- // string topic = "topic_hm_bp_stats";
- string topic = nameof(TopicHmBloodPress).ToLower();
- TopicHmBloodPress fields = new();
- PropertyInfo[] props = fields.GetType().GetProperties();
-
- // 获取 fields
- string attributes = "";
- foreach (PropertyInfo prop in props)
- {
- JsonPropertyAttribute attr = prop.GetCustomAttribute<JsonPropertyAttribute>()!;
- if (attr != null)
- {
- attributes += attr.PropertyName + ",";
- }
- }
- attributes = attributes.TrimEnd(',');
-
- //create topic
- IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select {attributes} from health_monitor.stb_hm_bloodpress");
-
- if (TDengine.ErrorNo(res) != 0)
- {
- throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
- }
-
- // create consumer
- var consumer = new ConsumerBuilder(cfg)
- .Build();
-
- // subscribe
- consumer.Subscribe(topic);
-
- while (!_tokenSource!.IsCancellationRequested)
- {
- var consumeRes = consumer.Consume(300);
- foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
- {
-
- for (int i = 0; i < kv.Value.Datas.Count; i++)
- {
- if (((i + 1) % kv.Value.Metas.Count == 0))
- {
- try
- {
-
- IDictionary<string, object> row = new Dictionary<string, object>();
- foreach (var meta in kv.Value.Metas)
- {
- int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
- //var value = kv.Value.Datas[index];
- row.Add(meta.name, kv.Value.Datas[index]);
- }
-
-
- var db = kv.Key.db;
- var table = kv.Key.table;
- var kvTopic = kv.Key.topic;
- var body = JsonConvert.SerializeObject(row);
-
- ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
- ParsePackage(msg);
- }
- catch (Exception ex)
- {
- _logger.LogError("解析包发生异常:{ex.Message}, {ex.StackTrace}", ex.Message, ex.StackTrace);
-
- }
- }
- }
-
- }
- consumer.Commit(consumeRes);
- _logger.LogInformation("监听中....");
- }
-
- // close consumer after use.Otherwise will lead memory leak.
- consumer.Close();
- TDengine.Close(Connection);
- }
-
- public void ParsePackage(ReceiveMessageModel model)
- {
- var msg = _resolverFactory.ParseAndWrap(model);
- if (msg == null) return;
- _msgQueueManager.Enqueue(msg);
- }
-
- }
- }
|