diff --git a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs new file mode 100644 index 0000000..8ca2903 --- /dev/null +++ b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs @@ -0,0 +1,185 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using TDengineDriver; +using TDengineTMQ; + +namespace HealthMonitor.Service.Sub +{ + public class TDengineDataSubcribe + { + private readonly ILogger _logger; + private IConsumer _consumer = default!; + private IntPtr _conn = default!; + + public TDengineDataSubcribe( + ILogger logger + ) + { + _logger = logger; + _conn = GetConnection(); + } + public void BeginListen(CancellationToken stoppingToken) + { + //var cfg = new ConsumerConfig + //{ + // GourpId = "group_1", + // TDConnectUser = "root", + // TDConnectPasswd = "taosdata", + // MsgWithTableName = "true", + // TDConnectIp = "47.116.142.20", + //}; + //var conn = GetConnection(); + //var consumer = CreateConsumer(cfg, conn); + + //ProcessMsg(consumer); + } + + public void CreateConnection() + { + var cfg = new ConsumerConfig + { + GourpId = "group_1", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + MsgWithTableName = "true", + TDConnectIp = "47.116.142.20", + }; + var conn = GetConnection(); + } + + + // 创建消费者 + public void CreateConsumer() + { + var cfg = new ConsumerConfig + { + GourpId = "group_1", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + MsgWithTableName = "true", + TDConnectIp = "47.116.142.20", + }; + //IntPtr conn = GetConnection(); + string topic = "topic_name"; + //create topic + IntPtr res = TDengine.Query(_conn, $"create topic if not exists {topic} as select * from ctb1"); + + if (TDengine.ErrorNo(res) != 0) + { + throw new Exception($"create topic failed, reason:{TDengine.Error(res)}"); + } + + // create consumer + var consumer = new ConsumerBuilder(cfg) + .Build(); + + // subscribe + consumer.Subscribe(topic); + + _consumer = consumer; + } + + //public IConsumer CreateConsumer() + //{ + // var cfg = new ConsumerConfig + // { + // GourpId = "group_1", + // TDConnectUser = "root", + // TDConnectPasswd = "taosdata", + // MsgWithTableName = "true", + // TDConnectIp = "47.116.142.20", + // }; + // var conn = GetConnection(); + // //IntPtr conn = GetConnection(); + // string topic = "topic_name"; + // //create topic + // IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from ctb1"); + + // if (TDengine.ErrorNo(res) != 0) + // { + // throw new Exception($"create topic failed, reason:{TDengine.Error(res)}"); + // } + + // // create consumer + // var consumer = new ConsumerBuilder(cfg) + // .Build(); + + // // subscribe + // consumer.Subscribe(topic); + + // return consumer; + //} + + public void ProcessMsg() + { + var consumerRes = _consumer.Consume(300); + // process ConsumeResult + foreach (KeyValuePair kv in consumerRes.Message) + { + Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString()); + + kv.Value.Metas.ForEach(meta => + { + Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size); + }); + Console.WriteLine(""); + kv.Value.Datas.ForEach(data => + { + Console.WriteLine(data.ToString()); + }); + } + + _consumer.Commit(consumerRes); + Console.WriteLine("\n================ {0} done "); + } + + public IntPtr GetConnection() + { + string host = "47.116.142.20"; + short port = 6030; + string username = "root"; + string password = "taosdata"; + string dbname = "tmqdb"; + var conn = TDengine.Connect(host, username, password, dbname, port); + if (conn == IntPtr.Zero) + { + throw new Exception("Connect to TDengine failed"); + } + else + { + Console.WriteLine("Connect to TDengine success"); + } + return conn; + } + // 关闭消费者 + //public void CloseConsumer(IConsumer consumer, IntPtr conn) + //{ + // List topics = consumer.Subscription(); + // topics.ForEach(t => Console.WriteLine("topic name:{0}", t)); + + // // unsubscribe + // consumer.Unsubscribe(); + + // // close consumer after use.Otherwise will lead memory leak. + // consumer.Close(); + // TDengine.Close(conn); + //} + + public void CloseConsumer() + { + List topics = _consumer.Subscription(); + topics.ForEach(t => Console.WriteLine("topic name:{0}", t)); + + // unsubscribe + _consumer.Unsubscribe(); + + // close consumer after use.Otherwise will lead memory leak. + _consumer.Close(); + TDengine.Close(_conn); + } + } +} diff --git a/HealthMonitor.WebApi/Program.cs b/HealthMonitor.WebApi/Program.cs index ff49b69..9460f30 100644 --- a/HealthMonitor.WebApi/Program.cs +++ b/HealthMonitor.WebApi/Program.cs @@ -22,6 +22,7 @@ using Microsoft.OpenApi.Models; using HealthMonitor.WebApi.Swagger; using HealthMonitor.Service.Cache; using TelpoDataService.Util.Clients; +using HealthMonitor.Service.Sub; namespace HealthMonitor.WebApi { @@ -77,7 +78,7 @@ namespace HealthMonitor.WebApi { var context = sp.GetRequiredService(); return new EfCoreDataAccessor(context); - }); ; + }); //builder.Services.AddDbContextPool((sp, options) => @@ -96,7 +97,7 @@ namespace HealthMonitor.WebApi #endregion - #region + #region AOP //builder.Services.Configure(builder.Configuration.GetSection("Redis")); builder.Services @@ -147,7 +148,7 @@ namespace HealthMonitor.WebApi #endregion - #region + #region TelpoDataServices builder.Services.AddTelpoDataServices(opt => { @@ -156,6 +157,13 @@ namespace HealthMonitor.WebApi #endregion + #region Worker + + builder.Services + .AddSingleton() + .AddHostedService(); + #endregion + // Register the Swagger generator, defining 1 or more Swagger documents builder.Services.AddSwaggerGen(c => { diff --git a/HealthMonitor.WebApi/Worker.cs b/HealthMonitor.WebApi/Worker.cs new file mode 100644 index 0000000..eed4c97 --- /dev/null +++ b/HealthMonitor.WebApi/Worker.cs @@ -0,0 +1,62 @@ +using HealthMonitor.Common; +using HealthMonitor.Core.Common.Extensions; +using HealthMonitor.Service.Sub; +using TDengineDriver; +using TDengineTMQ; + +namespace HealthMonitor.WebApi +{ + public class Worker : BackgroundService + { + private readonly ILogger _logger; + private readonly TDengineDataSubcribe _tdEngineDataSubcribe; + + private CancellationTokenSource _tokenSource=default!; + + public Worker(ILogger logger,TDengineDataSubcribe tdEngineDataSubcribe) + { + _logger = logger; + _tdEngineDataSubcribe = tdEngineDataSubcribe; + } + + public override Task StartAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("------StartAsync"); + _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + // 创建消费者 + _tdEngineDataSubcribe.CreateConsumer(); + + + return base.StartAsync(cancellationToken); + } + + public override Task StopAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("------StopAsync"); + _tokenSource.Cancel(); //停止工作线程 + // 关闭消费者 + _tdEngineDataSubcribe.CloseConsumer(); + return base.StopAsync(cancellationToken); + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + + TaskFactory factory = new(_tokenSource.Token); + factory.StartNew(() => + { + if (_tokenSource.IsCancellationRequested) + _logger.LogWarning("Worker exit"); + + while (!_tokenSource.IsCancellationRequested) + { + + _tdEngineDataSubcribe.ProcessMsg(); + } + + }, TaskCreationOptions.LongRunning); + return Task.Delay(1000, _tokenSource.Token); + } + + } +}