From 868dee9585b872d1918e117776e42b7c48036231 Mon Sep 17 00:00:00 2001 From: H Vs Date: Mon, 26 Jun 2023 17:34:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B6=88=E6=81=AF=E5=B7=A5?= =?UTF-8?q?=E5=8E=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../HealthMonitor.Service.csproj | 2 +- .../Resolver/BloodpressResolver.cs | 2 +- .../Resolver/Factory/ResolverFactory.cs | 21 ++- .../Resolver/Interface/IResolver.cs | 2 +- .../Resolver/Interface/IResolverFactory.cs | 5 +- HealthMonitor.Service/Sub/Interface/IMsg.cs | 14 ++ HealthMonitor.Service/Sub/MsgManager.cs | 40 +++++ .../Sub/ReceiveMessageModel.cs | 27 ++++ .../Sub/TDengineDataSubcribe.cs | 140 +++++++++++++++++- HealthMonitor.WebApi/PackageProcess.cs | 45 ++++++ HealthMonitor.WebApi/Program.cs | 12 +- HealthMonitor.WebApi/Worker.cs | 26 ++-- 12 files changed, 316 insertions(+), 20 deletions(-) create mode 100644 HealthMonitor.Service/Sub/Interface/IMsg.cs create mode 100644 HealthMonitor.Service/Sub/MsgManager.cs create mode 100644 HealthMonitor.Service/Sub/ReceiveMessageModel.cs create mode 100644 HealthMonitor.WebApi/PackageProcess.cs diff --git a/HealthMonitor.Service/HealthMonitor.Service.csproj b/HealthMonitor.Service/HealthMonitor.Service.csproj index 62eea06..1254ad6 100644 --- a/HealthMonitor.Service/HealthMonitor.Service.csproj +++ b/HealthMonitor.Service/HealthMonitor.Service.csproj @@ -1,4 +1,4 @@ - + net6.0 diff --git a/HealthMonitor.Service/Resolver/BloodpressResolver.cs b/HealthMonitor.Service/Resolver/BloodpressResolver.cs index 3df1dbe..dfb6b03 100644 --- a/HealthMonitor.Service/Resolver/BloodpressResolver.cs +++ b/HealthMonitor.Service/Resolver/BloodpressResolver.cs @@ -22,7 +22,7 @@ namespace HealthMonitor.Service.Resolver _logger = logger; } - public void SetResolveInfo(IConsumer msg) + public void SetResolveInfo(object msg) { throw new NotImplementedException(); } diff --git a/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs b/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs index 2461b94..b936a1c 100644 --- a/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs +++ b/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs @@ -1,11 +1,14 @@ using HealthMonitor.Service.Resolver.Interface; +using HealthMonitor.Service.Sub; using Microsoft.Extensions.Logging; +using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using TDengineTMQ; +using TelpoDataService.Util.Entities.GpsLocationHistory; namespace HealthMonitor.Service.Resolver.Factory { @@ -15,10 +18,22 @@ namespace HealthMonitor.Service.Resolver.Factory public ResolverFactory(ILogger logger) { _logger = logger; - } - public void ParseAndWrap(IConsumer msg) + } + + public dynamic ParseAndWrap(ReceiveMessageModel msg) { - throw new NotImplementedException(); + var table=msg.Topic; + switch (table) + { + case "topic_hm_bp_stats": + + HisGpsBloodPress bloodPress = JsonConvert.DeserializeObject(msg.Body)!; + return bloodPress!; + + default: + break; + } + return false; } } } diff --git a/HealthMonitor.Service/Resolver/Interface/IResolver.cs b/HealthMonitor.Service/Resolver/Interface/IResolver.cs index a494120..92cc9a2 100644 --- a/HealthMonitor.Service/Resolver/Interface/IResolver.cs +++ b/HealthMonitor.Service/Resolver/Interface/IResolver.cs @@ -9,7 +9,7 @@ namespace HealthMonitor.Service.Resolver.Interface { public interface IResolver { - void SetResolveInfo(IConsumer msg); + void SetResolveInfo(object msg); Task ExecuteMessageAsync(); } diff --git a/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs b/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs index 5a19a24..093d72e 100644 --- a/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs +++ b/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs @@ -1,4 +1,5 @@ -using System; +using HealthMonitor.Service.Sub; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -9,6 +10,6 @@ namespace HealthMonitor.Service.Resolver.Interface { public interface IResolverFactory { - void ParseAndWrap(IConsumer msg); + dynamic ParseAndWrap(ReceiveMessageModel msg); } } diff --git a/HealthMonitor.Service/Sub/Interface/IMsg.cs b/HealthMonitor.Service/Sub/Interface/IMsg.cs new file mode 100644 index 0000000..a1327b1 --- /dev/null +++ b/HealthMonitor.Service/Sub/Interface/IMsg.cs @@ -0,0 +1,14 @@ +using HealthMonitor.Service.Resolver.Interface; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Service.Sub.Interface +{ + public interface IMsg + { + IResolver GetMsgResolver(); + } +} diff --git a/HealthMonitor.Service/Sub/MsgManager.cs b/HealthMonitor.Service/Sub/MsgManager.cs new file mode 100644 index 0000000..2c6ffc6 --- /dev/null +++ b/HealthMonitor.Service/Sub/MsgManager.cs @@ -0,0 +1,40 @@ +using HealthMonitor.Service.Resolver; +using HealthMonitor.Service.Resolver.Interface; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Service.Sub +{ + + public class MsgManager + { + private readonly BloodpressResolver _resolver; + private readonly ILogger _logger; + private object _obj = default!; + public MsgManager(ILogger logger, BloodpressResolver resolver) + { + _logger = logger; + _resolver = resolver; + } + + public void AddMsg(object msg) + { + _obj = msg; + } + + + public IResolver? GetMsgResolver() + { + if (_obj == null) + { + return null; + } + _resolver.SetResolveInfo(_obj); + return _resolver; + } + } +} diff --git a/HealthMonitor.Service/Sub/ReceiveMessageModel.cs b/HealthMonitor.Service/Sub/ReceiveMessageModel.cs new file mode 100644 index 0000000..a099f45 --- /dev/null +++ b/HealthMonitor.Service/Sub/ReceiveMessageModel.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Service.Sub +{ + public class ReceiveMessageModel + { + public ReceiveMessageModel(string db,string table, string topic, string messageId,string body) + { + DB = db; + Table = table; + MessageId = messageId; + Topic = topic; + Body = body; + } + + public string MessageId { get; set; } + public string Topic { get; set; } + public string Body { get; set; } + public string Table { get; set; } + public string DB { get; set; } + + } +} diff --git a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs index 3a1e21f..231e051 100644 --- a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs +++ b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs @@ -2,7 +2,10 @@ using HealthMonitor.Core.Dal; using HealthMonitor.Service.Biz.db; using HealthMonitor.Service.Cache; +using HealthMonitor.Service.Resolver.Factory; +using HealthMonitor.Service.Resolver.Interface; using Microsoft.Extensions.Logging; +using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; @@ -19,16 +22,21 @@ namespace HealthMonitor.Service.Sub private readonly ILogger _logger; private IConsumer _consumer = default!; private IntPtr _conn = default!; + private readonly MsgManager _msgManager; private readonly TDengineService _serviceTDengine; private readonly PersonCacheManager _personCacheMgr; private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager; + private readonly IResolverFactory _resolverFactory; private int cnt = 0; public TDengineDataSubcribe( + TDengineService serviceDengine, PersonCacheManager personCacheMgr, BloodPressReferenceValueCacheManager bpRefValCacheManager, + IResolverFactory resolverFactory, + MsgManager msgManager, ILogger logger ) { @@ -36,6 +44,8 @@ namespace HealthMonitor.Service.Sub _personCacheMgr = personCacheMgr; _bpRefValCacheManager = bpRefValCacheManager; _logger = logger; + _resolverFactory = resolverFactory; + _msgManager = msgManager; _conn = GetConnection(); } public void BeginListen(CancellationToken stoppingToken) @@ -49,9 +59,124 @@ namespace HealthMonitor.Service.Sub // TDConnectIp = "47.116.142.20", //}; //var conn = GetConnection(); - //var consumer = CreateConsumer(cfg, conn); + //ProcessMsg(consumer); + DoTDengineConnect(); + } + + public void DoTDengineConnect() + { + string host = "47.116.142.20"; + short port = 6030; + string username = "root"; + string password = "taosdata"; + string dbname = "health_monitor"; + var conn = TDengine.Connect(host, username, password, dbname, port); + if (conn == IntPtr.Zero) + { + throw new Exception("Connect to TDengine failed"); + } + else + { + Console.WriteLine("Connect to TDengine success"); + } + DoReceive(conn); + } + + public void DoReceive(IntPtr Connection) + { + var cfg = new ConsumerConfig + { + GourpId = "group_1", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + MsgWithTableName = "true", + TDConnectIp = "47.116.142.20", + }; + string topic = "topic_hm_bp_stats"; + //create topic + IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress"); + + if (TDengine.ErrorNo(res) != 0) + { + throw new Exception($"create topic failed, reason:{TDengine.Error(res)}"); + } + + // create consumer + var consumer = new ConsumerBuilder(cfg) + .Build(); + + // subscribe + consumer.Subscribe(topic); + + while (true) + { + var consumeRes = consumer.Consume(300); + foreach (KeyValuePair kv in consumeRes.Message) + { + + for (int i = 0; i < kv.Value.Datas.Count; i++) + { + if (((i + 1) % kv.Value.Metas.Count == 0)) + { + string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]); + string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]); + string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]); + int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]); + int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]); + DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000); + DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000); + int method = SafeType.SafeInt(kv.Value.Datas[i - 1]); + bool is_display = SafeType.SafeBool(kv.Value.Datas[i]); + + HisGpsBloodPress bp = new() + { + BloodPressId = bloodpress_id, + MessageId = message_id, + Serialno = serialno, + SystolicValue = systolic_value, + DiastolicValue = diastolic_value, + CreateTime = create_time, + LastUpdate = last_update, + Method = method, + IsDisplay = is_display ? 1 : 0, + }; + + try + { + var db = kv.Key.db; + var table = kv.Key.table; + var kvTopic = kv.Key.topic; + var body = JsonConvert.SerializeObject(bp); + + ReceiveMessageModel msg = new(db, table, kvTopic, Guid.NewGuid().ToString("N"), body); + ParsePackage(msg); + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); + } + } + } + + } + consumer.Commit(consumeRes); + Console.WriteLine("consumer.Commit"); + } + + } + + + + public void ParsePackage(ReceiveMessageModel model) + { + var msg = _resolverFactory.ParseAndWrap(model); + Console.WriteLine("msg"); + if (msg == null) return; + _msgManager.AddMsg(msg); + + } public void CreateConnection() @@ -65,6 +190,8 @@ namespace HealthMonitor.Service.Sub TDConnectIp = "47.116.142.20", }; var conn = GetConnection(); + + } @@ -153,7 +280,7 @@ namespace HealthMonitor.Service.Sub // // Console.WriteLine("\n================ {0} done "); //} - + /** public async Task ProcessMsg() { var consumerRes = _consumer.Consume(300); @@ -367,6 +494,15 @@ namespace HealthMonitor.Service.Sub _consumer.Commit(consumerRes); // Console.WriteLine("\n================ {0} done "); } + */ + + + + public async Task ProcessMsg() + { + + } + public IntPtr GetConnection() { diff --git a/HealthMonitor.WebApi/PackageProcess.cs b/HealthMonitor.WebApi/PackageProcess.cs new file mode 100644 index 0000000..57b0f7a --- /dev/null +++ b/HealthMonitor.WebApi/PackageProcess.cs @@ -0,0 +1,45 @@ +using HealthMonitor.Service.Resolver.Interface; +using HealthMonitor.Service.Sub; + +namespace HealthMonitor.WebApi +{ + public class PackageProcess : IDisposable + { + private readonly ILogger _logger; + + private readonly MsgManager _msgManager; + + public PackageProcess(ILogger logger, MsgManager msgManager) + { + _logger = logger; + _msgManager = msgManager; + } + + public async Task ResolveAsync() + { + try + { + var resolver = _msgManager.GetMsgResolver(); + if (resolver != null) + { + await resolver.ExecuteMessageAsync().ConfigureAwait(false); + } + + } + catch (Exception ex) + { + + Console.WriteLine(ex.Message); + //_logger.LogError($"[{msg.MessageId}] 未处理异常 message: {ex.Message}\n {ex.StackTrace}"); + } + + return true; + + } + + public void Dispose() + { + throw new NotImplementedException(); + } + } +} diff --git a/HealthMonitor.WebApi/Program.cs b/HealthMonitor.WebApi/Program.cs index fff8641..2f46e24 100644 --- a/HealthMonitor.WebApi/Program.cs +++ b/HealthMonitor.WebApi/Program.cs @@ -23,6 +23,9 @@ using HealthMonitor.WebApi.Swagger; using HealthMonitor.Service.Cache; using TelpoDataService.Util.Clients; using HealthMonitor.Service.Sub; +using HealthMonitor.Service.Resolver; +using HealthMonitor.Service.Resolver.Factory; +using HealthMonitor.Service.Resolver.Interface; namespace HealthMonitor.WebApi { @@ -163,12 +166,19 @@ namespace HealthMonitor.WebApi #endregion #region Worker - + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services .AddSingleton() + .AddSingleton() .AddHostedService(); #endregion + + + // Register the Swagger generator, defining 1 or more Swagger documents builder.Services.AddSwaggerGen(c => { diff --git a/HealthMonitor.WebApi/Worker.cs b/HealthMonitor.WebApi/Worker.cs index eed4c97..716b46e 100644 --- a/HealthMonitor.WebApi/Worker.cs +++ b/HealthMonitor.WebApi/Worker.cs @@ -9,14 +9,18 @@ namespace HealthMonitor.WebApi public class Worker : BackgroundService { private readonly ILogger _logger; + private readonly IServiceProvider _services; private readonly TDengineDataSubcribe _tdEngineDataSubcribe; + private readonly PackageProcess _processor; private CancellationTokenSource _tokenSource=default!; - public Worker(ILogger logger,TDengineDataSubcribe tdEngineDataSubcribe) + public Worker(ILogger logger, IServiceProvider services, PackageProcess processor,TDengineDataSubcribe tdEngineDataSubcribe) { _logger = logger; _tdEngineDataSubcribe = tdEngineDataSubcribe; + _services = services; + _processor = processor; } public override Task StartAsync(CancellationToken cancellationToken) @@ -24,9 +28,7 @@ namespace HealthMonitor.WebApi _logger.LogInformation("------StartAsync"); _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); // 创建消费者 - _tdEngineDataSubcribe.CreateConsumer(); - - + // _tdEngineDataSubcribe.CreateConsumer(); return base.StartAsync(cancellationToken); } @@ -35,27 +37,33 @@ namespace HealthMonitor.WebApi _logger.LogInformation("------StopAsync"); _tokenSource.Cancel(); //停止工作线程 // 关闭消费者 - _tdEngineDataSubcribe.CloseConsumer(); + // _tdEngineDataSubcribe.CloseConsumer(); return base.StopAsync(cancellationToken); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { - + // var processor = _services.GetService(); TaskFactory factory = new(_tokenSource.Token); - factory.StartNew(() => + factory.StartNew(async () => { if (_tokenSource.IsCancellationRequested) _logger.LogWarning("Worker exit"); while (!_tokenSource.IsCancellationRequested) { - - _tdEngineDataSubcribe.ProcessMsg(); + await _processor.ResolveAsync().ConfigureAwait(false); + // await _tdEngineDataSubcribe.ProcessMsg(); } }, TaskCreationOptions.LongRunning); + + while (!_tokenSource.IsCancellationRequested) + { + _tdEngineDataSubcribe.BeginListen(_tokenSource.Token); + } return Task.Delay(1000, _tokenSource.Token); + } }