From a2a673efefff8cd18196aa5659c5fecb3efe9a3d Mon Sep 17 00:00:00 2001 From: H Vs Date: Thu, 13 Jul 2023 17:26:58 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E8=A7=A3=E6=9E=90=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- HealthMonitor.Service/Sub/MsgQueueManager.cs | 32 ++++++++++++++----- .../Sub/TDengineDataSubcribe.cs | 23 ++++++------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/HealthMonitor.Service/Sub/MsgQueueManager.cs b/HealthMonitor.Service/Sub/MsgQueueManager.cs index e9f3ec4..6939450 100644 --- a/HealthMonitor.Service/Sub/MsgQueueManager.cs +++ b/HealthMonitor.Service/Sub/MsgQueueManager.cs @@ -1,5 +1,6 @@ using HealthMonitor.Service.Resolver; using HealthMonitor.Service.Resolver.Interface; +using HealthMonitor.Service.Sub.Topic.Model; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; @@ -13,11 +14,16 @@ namespace HealthMonitor.Service.Sub public class MsgQueueManager: ConcurrentQueue { private readonly ILogger _logger; - private readonly BloodpressResolver _resolver; - public MsgQueueManager(ILogger logger, BloodpressResolver resolver) + private readonly BloodpressResolver _resolverBloodpress; + + //private const string BP = nameof(TopicHmBloodPress).ToLower(); + + + + public MsgQueueManager(ILogger logger, BloodpressResolver resolverBloodpress) { _logger = logger; - _resolver = resolver; + _resolverBloodpress = resolverBloodpress; } //public IResolver? GetMsgResolver() @@ -34,14 +40,24 @@ namespace HealthMonitor.Service.Sub public IResolver? GetMsgResolver(PackageMsgModel msg) { - //TryDequeue(out var msg); - if (msg == null) + if (msg == null) return null; + //switch (msg.Topic) + //{ + // case BP: + // _resolverBloodpress.SetResolveInfo(msg); + // return _resolverBloodpress; + // default: + // return null; + //} + + if (msg.Topic.Equals(nameof(TopicHmBloodPress).ToLower())) { - return null; + _resolverBloodpress.SetResolveInfo(msg); + return _resolverBloodpress; } - _resolver.SetResolveInfo(msg); + return null; - return _resolver; + } } } diff --git a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs index d941244..4c6c3e1 100644 --- a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs +++ b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs @@ -37,7 +37,6 @@ namespace HealthMonitor.Service.Sub //private int cnt = 0; public TDengineDataSubcribe( - TDengineService serviceTDengine, PersonCacheManager personCacheMgr, BloodPressReferenceValueCacheManager bpRefValCacheManager, @@ -109,14 +108,7 @@ namespace HealthMonitor.Service.Sub public void DoReceive(IntPtr Connection) { - var cfg = new ConsumerConfig - { - GourpId = "group_1", - TDConnectUser = _configTDengineService.UserName, - TDConnectPasswd = _configTDengineService.Password, - MsgWithTableName = "true", - TDConnectIp = _configTDengineService.Host, - }; + // string topic = "topic_hm_bp_stats"; string topic = nameof(TopicHmBloodPress).ToLower(); TopicHmBloodPress fields = new(); @@ -142,7 +134,14 @@ namespace HealthMonitor.Service.Sub _logger.LogError($"create topic failed, reason:{TDengine.Error(res)}"); throw new Exception($"create topic failed, reason:{TDengine.Error(res)}"); } - + var cfg = new ConsumerConfig + { + GourpId = "group_1", + TDConnectUser = _configTDengineService.UserName, + TDConnectPasswd = _configTDengineService.Password, + MsgWithTableName = "true", + TDConnectIp = _configTDengineService.Host, + }; // create consumer var consumer = new ConsumerBuilder(cfg) .Build(); @@ -155,7 +154,6 @@ namespace HealthMonitor.Service.Sub var consumeRes = consumer.Consume(300); foreach (KeyValuePair kv in consumeRes.Message) { - for (int i = 0; i < kv.Value.Datas.Count; i++) { if (((i + 1) % kv.Value.Metas.Count == 0)) @@ -186,8 +184,7 @@ namespace HealthMonitor.Service.Sub } } - } - + } } consumer.Commit(consumeRes); //_logger.LogInformation("监听中....");