using HealthMonitor.Common; using HealthMonitor.Core.Dal; using HealthMonitor.Model.Config; using HealthMonitor.Service.Biz.db; using HealthMonitor.Service.Cache; using HealthMonitor.Service.Resolver.Factory; using HealthMonitor.Service.Resolver.Interface; using HealthMonitor.Service.Sub.Topic.Model; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; using TDengineDriver; using TDengineTMQ; using TelpoDataService.Util.Entities.GpsLocationHistory; namespace HealthMonitor.Service.Sub { public class TDengineDataSubcribe { private readonly ILogger _logger; private readonly MsgQueueManager _msgQueueManager; private readonly TDengineService _serviceTDengine; private readonly PersonCacheManager _personCacheMgr; private readonly TDengineServiceConfig _configTDengineService; private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager; private readonly IResolverFactory _resolverFactory; private CancellationTokenSource? _tokenSource = null; //private int cnt = 0; public TDengineDataSubcribe( TDengineService serviceTDengine, PersonCacheManager personCacheMgr, BloodPressReferenceValueCacheManager bpRefValCacheManager, IResolverFactory resolverFactory, IOptions configTDengineService, MsgQueueManager msgQueueManager, ILogger logger ) { _serviceTDengine = serviceTDengine; _personCacheMgr = personCacheMgr; _bpRefValCacheManager = bpRefValCacheManager; _logger = logger; _resolverFactory = resolverFactory; _msgQueueManager = msgQueueManager; _configTDengineService = configTDengineService.Value; } public void BeginListen(CancellationToken stoppingToken) { //var cfg = new ConsumerConfig //{ // GourpId = "group_1", // TDConnectUser = "root", // TDConnectPasswd = "taosdata", // MsgWithTableName = "true", // TDConnectIp = "47.116.142.20", //}; //var conn = GetConnection(); //ProcessMsg(consumer); //防止造成多线程运行 _tokenSource?.Cancel(); _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); DoTDengineConnect(); } public void DoTDengineConnect() { // string host = _configTDengineService.Host; // short port = 6030; // string username = _configTDengineService.UserName; // string password = _configTDengineService.Password; // string dbname = _configTDengineService.DB; ////#if DEBUG //// //string configDir = "C:/TDengine/cfg"; //// //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir); //// TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing"); ////#endif // var conn = TDengine.Connect(host, username, password, dbname, port); // if (conn == IntPtr.Zero) // { // _logger.LogError("reason:{TDengine.Error(conn)}", TDengine.Error(conn)); // } // else // { // _logger.LogInformation($"连接 TDengine 成功...."); // } var conn = _serviceTDengine.Connection(); DoReceive(conn); } public void DoReceive(IntPtr Connection) { // string topic = "topic_hm_bp_stats"; string topic = nameof(TopicHmBloodPress).ToLower(); TopicHmBloodPress fields = new(); PropertyInfo[] props = fields.GetType().GetProperties(); // 获取 fields string attributes = ""; foreach (PropertyInfo prop in props) { JsonPropertyAttribute attr = prop.GetCustomAttribute()!; if (attr != null) { attributes += attr.PropertyName + ","; } } attributes = attributes.TrimEnd(','); //create topic IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select {attributes} from health_monitor.stb_hm_bloodpress"); if (TDengine.ErrorNo(res) != 0) { _logger.LogError($"create topic failed, reason:{TDengine.Error(res)}"); throw new Exception($"create topic failed, reason:{TDengine.Error(res)}"); } var cfg = new ConsumerConfig { GourpId = "group_1", TDConnectUser = _configTDengineService.UserName, TDConnectPasswd = _configTDengineService.Password, MsgWithTableName = "true", TDConnectIp = _configTDengineService.Host, }; // create consumer var consumer = new ConsumerBuilder(cfg) .Build(); // subscribe consumer.Subscribe(topic); while (!_tokenSource!.IsCancellationRequested) { var consumeRes = consumer.Consume(300); foreach (KeyValuePair kv in consumeRes.Message) { for (int i = 0; i < kv.Value.Datas.Count; i++) { if (((i + 1) % kv.Value.Metas.Count == 0)) { try { IDictionary row = new Dictionary(); foreach (var meta in kv.Value.Metas) { int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1); //var value = kv.Value.Datas[index]; row.Add(meta.name, kv.Value.Datas[index]); } var db = kv.Key.db; var table = kv.Key.table; var kvTopic = kv.Key.topic; var body = JsonConvert.SerializeObject(row); ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body); ParsePackage(msg); } catch (Exception ex) { _logger.LogError("解析包发生异常:{ex.Message}, {ex.StackTrace}", ex.Message, ex.StackTrace); } } } } consumer.Commit(consumeRes); //_logger.LogInformation("监听中...."); // Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff},监听中...."); } // close consumer after use.Otherwise will lead memory leak. consumer.Close(); TDengine.Close(Connection); } public void ParsePackage(ReceiveMessageModel model) { var msg = _resolverFactory.ParseAndWrap(model); if (msg == null) return; _msgQueueManager.Enqueue(msg); } } }