Browse Source

增加消费者worker

td_orm
H Vs 1 year ago
parent
commit
e1e0df3790
3 changed files with 258 additions and 3 deletions
  1. +185
    -0
      HealthMonitor.Service/Sub/TDengineDataSubcribe.cs
  2. +11
    -3
      HealthMonitor.WebApi/Program.cs
  3. +62
    -0
      HealthMonitor.WebApi/Worker.cs

+ 185
- 0
HealthMonitor.Service/Sub/TDengineDataSubcribe.cs View File

@@ -0,0 +1,185 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TDengineDriver;
using TDengineTMQ;

namespace HealthMonitor.Service.Sub
{
public class TDengineDataSubcribe
{
private readonly ILogger<TDengineDataSubcribe> _logger;
private IConsumer _consumer = default!;
private IntPtr _conn = default!;

public TDengineDataSubcribe(
ILogger<TDengineDataSubcribe> logger
)
{
_logger = logger;
_conn = GetConnection();
}
public void BeginListen(CancellationToken stoppingToken)
{
//var cfg = new ConsumerConfig
//{
// GourpId = "group_1",
// TDConnectUser = "root",
// TDConnectPasswd = "taosdata",
// MsgWithTableName = "true",
// TDConnectIp = "47.116.142.20",
//};
//var conn = GetConnection();
//var consumer = CreateConsumer(cfg, conn);

//ProcessMsg(consumer);
}

public void CreateConnection()
{
var cfg = new ConsumerConfig
{
GourpId = "group_1",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
MsgWithTableName = "true",
TDConnectIp = "47.116.142.20",
};
var conn = GetConnection();
}


// 创建消费者
public void CreateConsumer()
{
var cfg = new ConsumerConfig
{
GourpId = "group_1",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
MsgWithTableName = "true",
TDConnectIp = "47.116.142.20",
};
//IntPtr conn = GetConnection();
string topic = "topic_name";
//create topic
IntPtr res = TDengine.Query(_conn, $"create topic if not exists {topic} as select * from ctb1");

if (TDengine.ErrorNo(res) != 0)
{
throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
}

// create consumer
var consumer = new ConsumerBuilder(cfg)
.Build();

// subscribe
consumer.Subscribe(topic);

_consumer = consumer;
}

//public IConsumer CreateConsumer()
//{
// var cfg = new ConsumerConfig
// {
// GourpId = "group_1",
// TDConnectUser = "root",
// TDConnectPasswd = "taosdata",
// MsgWithTableName = "true",
// TDConnectIp = "47.116.142.20",
// };
// var conn = GetConnection();
// //IntPtr conn = GetConnection();
// string topic = "topic_name";
// //create topic
// IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from ctb1");

// if (TDengine.ErrorNo(res) != 0)
// {
// throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
// }

// // create consumer
// var consumer = new ConsumerBuilder(cfg)
// .Build();

// // subscribe
// consumer.Subscribe(topic);

// return consumer;
//}

public void ProcessMsg()
{
var consumerRes = _consumer.Consume(300);
// process ConsumeResult
foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
{
Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());

kv.Value.Metas.ForEach(meta =>
{
Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
});
Console.WriteLine("");
kv.Value.Datas.ForEach(data =>
{
Console.WriteLine(data.ToString());
});
}

_consumer.Commit(consumerRes);
Console.WriteLine("\n================ {0} done ");
}

public IntPtr GetConnection()
{
string host = "47.116.142.20";
short port = 6030;
string username = "root";
string password = "taosdata";
string dbname = "tmqdb";
var conn = TDengine.Connect(host, username, password, dbname, port);
if (conn == IntPtr.Zero)
{
throw new Exception("Connect to TDengine failed");
}
else
{
Console.WriteLine("Connect to TDengine success");
}
return conn;
}
// 关闭消费者
//public void CloseConsumer(IConsumer consumer, IntPtr conn)
//{
// List<string> topics = consumer.Subscription();
// topics.ForEach(t => Console.WriteLine("topic name:{0}", t));

// // unsubscribe
// consumer.Unsubscribe();

// // close consumer after use.Otherwise will lead memory leak.
// consumer.Close();
// TDengine.Close(conn);
//}

public void CloseConsumer()
{
List<string> topics = _consumer.Subscription();
topics.ForEach(t => Console.WriteLine("topic name:{0}", t));

// unsubscribe
_consumer.Unsubscribe();

// close consumer after use.Otherwise will lead memory leak.
_consumer.Close();
TDengine.Close(_conn);
}
}
}

+ 11
- 3
HealthMonitor.WebApi/Program.cs View File

@@ -22,6 +22,7 @@ using Microsoft.OpenApi.Models;
using HealthMonitor.WebApi.Swagger;
using HealthMonitor.Service.Cache;
using TelpoDataService.Util.Clients;
using HealthMonitor.Service.Sub;

namespace HealthMonitor.WebApi
{
@@ -77,7 +78,7 @@ namespace HealthMonitor.WebApi
{
var context = sp.GetRequiredService<HealthMonitorContext>();
return new EfCoreDataAccessor(context);
}); ;
});


//builder.Services.AddDbContextPool<HealthMonitorContext>((sp, options) =>
@@ -96,7 +97,7 @@ namespace HealthMonitor.WebApi

#endregion

#region
#region AOP
//builder.Services.Configure<RedisConfig>(builder.Configuration.GetSection("Redis"));

builder.Services
@@ -147,7 +148,7 @@ namespace HealthMonitor.WebApi

#endregion

#region
#region TelpoDataServices

builder.Services.AddTelpoDataServices(opt =>
{
@@ -156,6 +157,13 @@ namespace HealthMonitor.WebApi

#endregion

#region Worker
builder.Services
.AddSingleton<TDengineDataSubcribe>()
.AddHostedService<Worker>();
#endregion

// Register the Swagger generator, defining 1 or more Swagger documents
builder.Services.AddSwaggerGen(c =>
{


+ 62
- 0
HealthMonitor.WebApi/Worker.cs View File

@@ -0,0 +1,62 @@
using HealthMonitor.Common;
using HealthMonitor.Core.Common.Extensions;
using HealthMonitor.Service.Sub;
using TDengineDriver;
using TDengineTMQ;

namespace HealthMonitor.WebApi
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly TDengineDataSubcribe _tdEngineDataSubcribe;

private CancellationTokenSource _tokenSource=default!;

public Worker(ILogger<Worker> logger,TDengineDataSubcribe tdEngineDataSubcribe)
{
_logger = logger;
_tdEngineDataSubcribe = tdEngineDataSubcribe;
}

public override Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("------StartAsync");
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
// 创建消费者
_tdEngineDataSubcribe.CreateConsumer();


return base.StartAsync(cancellationToken);
}

public override Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("------StopAsync");
_tokenSource.Cancel(); //停止工作线程
// 关闭消费者
_tdEngineDataSubcribe.CloseConsumer();
return base.StopAsync(cancellationToken);
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{

TaskFactory factory = new(_tokenSource.Token);
factory.StartNew(() =>
{
if (_tokenSource.IsCancellationRequested)
_logger.LogWarning("Worker exit");

while (!_tokenSource.IsCancellationRequested)
{

_tdEngineDataSubcribe.ProcessMsg();
}

}, TaskCreationOptions.LongRunning);
return Task.Delay(1000, _tokenSource.Token);
}

}
}

Loading…
Cancel
Save