Browse Source

调整解析器

td_orm
H Vs 1 year ago
parent
commit
a2a673efef
2 changed files with 34 additions and 21 deletions
  1. +24
    -8
      HealthMonitor.Service/Sub/MsgQueueManager.cs
  2. +10
    -13
      HealthMonitor.Service/Sub/TDengineDataSubcribe.cs

+ 24
- 8
HealthMonitor.Service/Sub/MsgQueueManager.cs View File

@@ -1,5 +1,6 @@
using HealthMonitor.Service.Resolver; using HealthMonitor.Service.Resolver;
using HealthMonitor.Service.Resolver.Interface; using HealthMonitor.Service.Resolver.Interface;
using HealthMonitor.Service.Sub.Topic.Model;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
@@ -13,11 +14,16 @@ namespace HealthMonitor.Service.Sub
public class MsgQueueManager: ConcurrentQueue<PackageMsgModel> public class MsgQueueManager: ConcurrentQueue<PackageMsgModel>
{ {
private readonly ILogger<MsgQueueManager> _logger; private readonly ILogger<MsgQueueManager> _logger;
private readonly BloodpressResolver _resolver;
public MsgQueueManager(ILogger<MsgQueueManager> logger, BloodpressResolver resolver)
private readonly BloodpressResolver _resolverBloodpress;

//private const string BP = nameof(TopicHmBloodPress).ToLower();


public MsgQueueManager(ILogger<MsgQueueManager> logger, BloodpressResolver resolverBloodpress)
{ {
_logger = logger; _logger = logger;
_resolver = resolver;
_resolverBloodpress = resolverBloodpress;
} }


//public IResolver? GetMsgResolver() //public IResolver? GetMsgResolver()
@@ -34,14 +40,24 @@ namespace HealthMonitor.Service.Sub


public IResolver? GetMsgResolver(PackageMsgModel msg) public IResolver? GetMsgResolver(PackageMsgModel msg)
{ {
//TryDequeue(out var msg);
if (msg == null)
if (msg == null) return null;
//switch (msg.Topic)
//{
// case BP:
// _resolverBloodpress.SetResolveInfo(msg);
// return _resolverBloodpress;
// default:
// return null;
//}

if (msg.Topic.Equals(nameof(TopicHmBloodPress).ToLower()))
{ {
return null;
_resolverBloodpress.SetResolveInfo(msg);
return _resolverBloodpress;
} }
_resolver.SetResolveInfo(msg);
return null;


return _resolver;
} }
} }
} }

+ 10
- 13
HealthMonitor.Service/Sub/TDengineDataSubcribe.cs View File

@@ -37,7 +37,6 @@ namespace HealthMonitor.Service.Sub
//private int cnt = 0; //private int cnt = 0;


public TDengineDataSubcribe( public TDengineDataSubcribe(

TDengineService serviceTDengine, TDengineService serviceTDengine,
PersonCacheManager personCacheMgr, PersonCacheManager personCacheMgr,
BloodPressReferenceValueCacheManager bpRefValCacheManager, BloodPressReferenceValueCacheManager bpRefValCacheManager,
@@ -109,14 +108,7 @@ namespace HealthMonitor.Service.Sub


public void DoReceive(IntPtr Connection) public void DoReceive(IntPtr Connection)
{ {
var cfg = new ConsumerConfig
{
GourpId = "group_1",
TDConnectUser = _configTDengineService.UserName,
TDConnectPasswd = _configTDengineService.Password,
MsgWithTableName = "true",
TDConnectIp = _configTDengineService.Host,
};
// string topic = "topic_hm_bp_stats"; // string topic = "topic_hm_bp_stats";
string topic = nameof(TopicHmBloodPress).ToLower(); string topic = nameof(TopicHmBloodPress).ToLower();
TopicHmBloodPress fields = new(); TopicHmBloodPress fields = new();
@@ -142,7 +134,14 @@ namespace HealthMonitor.Service.Sub
_logger.LogError($"create topic failed, reason:{TDengine.Error(res)}"); _logger.LogError($"create topic failed, reason:{TDengine.Error(res)}");
throw new Exception($"create topic failed, reason:{TDengine.Error(res)}"); throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
} }

var cfg = new ConsumerConfig
{
GourpId = "group_1",
TDConnectUser = _configTDengineService.UserName,
TDConnectPasswd = _configTDengineService.Password,
MsgWithTableName = "true",
TDConnectIp = _configTDengineService.Host,
};
// create consumer // create consumer
var consumer = new ConsumerBuilder(cfg) var consumer = new ConsumerBuilder(cfg)
.Build(); .Build();
@@ -155,7 +154,6 @@ namespace HealthMonitor.Service.Sub
var consumeRes = consumer.Consume(300); var consumeRes = consumer.Consume(300);
foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message) foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
{ {

for (int i = 0; i < kv.Value.Datas.Count; i++) for (int i = 0; i < kv.Value.Datas.Count; i++)
{ {
if (((i + 1) % kv.Value.Metas.Count == 0)) if (((i + 1) % kv.Value.Metas.Count == 0))
@@ -186,8 +184,7 @@ namespace HealthMonitor.Service.Sub
} }
} }
}
}
} }
consumer.Commit(consumeRes); consumer.Commit(consumeRes);
//_logger.LogInformation("监听中...."); //_logger.LogInformation("监听中....");


Loading…
Cancel
Save