diff --git a/HealthMonitor.Service/Resolver/BloodpressResolver.cs b/HealthMonitor.Service/Resolver/BloodpressResolver.cs index 6a58305..13babfd 100644 --- a/HealthMonitor.Service/Resolver/BloodpressResolver.cs +++ b/HealthMonitor.Service/Resolver/BloodpressResolver.cs @@ -27,8 +27,8 @@ namespace HealthMonitor.Service.Resolver private readonly TDengineService _serviceTDengine; private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager; - private AsyncLocal _messageId = new AsyncLocal(); - private AsyncLocal _msgData = new AsyncLocal(); + private AsyncLocal _messageId = new(); + private AsyncLocal _msgData = new(); public BloodpressResolver( TDengineService serviceDengine, @@ -58,8 +58,7 @@ namespace HealthMonitor.Service.Resolver Method= topicHmBloodPress!.Method, IsDisplay=topicHmBloodPress!.IsDisplay ? 1 : 0 }; - Console.WriteLine($"SetResolveInfo" + _messageId.Value); - + // Console.WriteLine($"SetResolveInfo" + _messageId.Value); } public override string ToString() @@ -70,7 +69,6 @@ namespace HealthMonitor.Service.Resolver public async Task ExecuteMessageAsync() { var messageId = _messageId.Value; - Console.WriteLine($"ExecuteMessageAsync" + messageId); var bp = _msgData.Value!; #region 获取个人信息 @@ -88,6 +86,7 @@ namespace HealthMonitor.Service.Resolver if (age < 1 || age > 120) { Console.WriteLine("验证年龄是否在范围 (2 - 120)"); + return; } #endregion diff --git a/HealthMonitor.Service/Sub/MsgManager.cs b/HealthMonitor.Service/Sub/MsgManager.cs deleted file mode 100644 index c41c5ee..0000000 --- a/HealthMonitor.Service/Sub/MsgManager.cs +++ /dev/null @@ -1,80 +0,0 @@ -using HealthMonitor.Service.Resolver; -using HealthMonitor.Service.Resolver.Interface; -using Microsoft.Extensions.Logging; -using System; -using System.Collections; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace HealthMonitor.Service.Sub -{ - - - - public class MsgManager - { - private readonly MsgQueueManager _msgQueueManager; - private readonly BloodpressResolver _resolver; - private readonly ILogger _logger; - private PackageMsgModel _msg = default!; - public MsgManager(ILogger logger, BloodpressResolver resolver, MsgQueueManager msgQueueManager) - { - _logger = logger; - _resolver = resolver; - _msgQueueManager = msgQueueManager; - } - - public void EnqueueMsg(PackageMsgModel msg) - { - _msg = msg; - Console.WriteLine($"GetMsgResolver-{_msg.MessageId}"); - } - - public IResolver? GetMsgResolver() - { - - //if (_msg == null) - //{ - // return null; - //} - //Console.WriteLine($"GetMsgResolver-{_msg.MessageId}"); - //_msgQueueManager.TryDequeue(out object obj); - _msgQueueManager.TryDequeue(out var msg); - if (msg == null) - { - return null; - } - Console.WriteLine($"GetMsgResolver-{msg.MessageId}"); - _resolver.SetResolveInfo(msg); - - return _resolver; - } - //public IReso - - //public IResolver? GetMsgResolver() - //{ - // if (_msg == null) - // { - // return null; - // } - // Console.WriteLine($"GetMsgResolver-{_msg.MessageId}"); - // _resolver.SetResolveInfo(_msg); - // _msg = default!; - // return _resolver; - //} - //public IResolver? GetMsgResolver(PackageMsgModel msg) - //{ - // if (_msg == null) - // { - // return null; - // } - // Console.WriteLine($"GetMsgResolver-{_msg.MessageId}"); - // _resolver.SetResolveInfo(msg); - // _msg = default!; - // return _resolver; - //} - } -} diff --git a/HealthMonitor.Service/Sub/MsgQueueManager.cs b/HealthMonitor.Service/Sub/MsgQueueManager.cs index 689d440..d1de6d9 100644 --- a/HealthMonitor.Service/Sub/MsgQueueManager.cs +++ b/HealthMonitor.Service/Sub/MsgQueueManager.cs @@ -1,4 +1,5 @@ using HealthMonitor.Service.Resolver; +using HealthMonitor.Service.Resolver.Interface; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; @@ -12,9 +13,23 @@ namespace HealthMonitor.Service.Sub public class MsgQueueManager: ConcurrentQueue { private readonly ILogger _logger; - public MsgQueueManager(ILogger logger) + private readonly BloodpressResolver _resolver; + public MsgQueueManager(ILogger logger, BloodpressResolver resolver) { _logger = logger; + _resolver = resolver; + } + + public IResolver? GetMsgResolver() + { + TryDequeue(out var msg); + if (msg == null) + { + return null; + } + _resolver.SetResolveInfo(msg); + + return _resolver; } } } diff --git a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs index 9e09281..96118e3 100644 --- a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs +++ b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs @@ -24,12 +24,12 @@ namespace HealthMonitor.Service.Sub private readonly ILogger _logger; private IConsumer _consumer = default!; private IntPtr _conn = default!; - private readonly MsgManager _msgManager; private readonly MsgQueueManager _msgQueueManager; private readonly TDengineService _serviceTDengine; private readonly PersonCacheManager _personCacheMgr; private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager; private readonly IResolverFactory _resolverFactory; + private CancellationTokenSource _tokenSource = null; private int cnt = 0; @@ -39,7 +39,7 @@ namespace HealthMonitor.Service.Sub PersonCacheManager personCacheMgr, BloodPressReferenceValueCacheManager bpRefValCacheManager, IResolverFactory resolverFactory, - MsgManager msgManager, + MsgQueueManager msgQueueManager, ILogger logger ) @@ -49,7 +49,6 @@ namespace HealthMonitor.Service.Sub _bpRefValCacheManager = bpRefValCacheManager; _logger = logger; _resolverFactory = resolverFactory; - _msgManager = msgManager; _msgQueueManager = msgQueueManager; _conn = GetConnection(); } @@ -67,6 +66,11 @@ namespace HealthMonitor.Service.Sub //ProcessMsg(consumer); + + //防止造成多线程运行 + _tokenSource?.Cancel(); + + _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); DoTDengineConnect(); } @@ -100,6 +104,7 @@ namespace HealthMonitor.Service.Sub TDConnectIp = "47.116.142.20", }; string topic = "topic_hm_bp_stats"; + //nameof(TopicHmBloodPress) //create topic IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress"); @@ -115,7 +120,7 @@ namespace HealthMonitor.Service.Sub // subscribe consumer.Subscribe(topic); - while (true) + while (!_tokenSource.IsCancellationRequested) { var consumeRes = consumer.Consume(300); foreach (KeyValuePair kv in consumeRes.Message) @@ -125,45 +130,6 @@ namespace HealthMonitor.Service.Sub { if (((i + 1) % kv.Value.Metas.Count == 0)) { - //string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]); - //string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]); - //string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]); - //int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]); - //int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]); - //DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000); - //DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000); - //int method = SafeType.SafeInt(kv.Value.Datas[i - 1]); - //bool is_display = SafeType.SafeBool(kv.Value.Datas[i]); - - //IDictionary row = new Dictionary(); - //foreach (var meta in kv.Value.Metas) - //{ - // int index = i-(kv.Value.Metas.Count-kv.Value.Metas.IndexOf(meta)-1); - // //var value = kv.Value.Datas[index]; - // row.Add(meta.name, kv.Value.Datas[index]); - //} - - //var body2 = JsonConvert.SerializeObject(row); - //kv.Value.Metas.ForEach(meta => - //{ - // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size); - //}); - - - - //HisGpsBloodPress bp = new() - //{ - // BloodPressId = bloodpress_id, - // MessageId = message_id, - // Serialno = serialno, - // SystolicValue = systolic_value, - // DiastolicValue = diastolic_value, - // CreateTime = create_time, - // LastUpdate = last_update, - // Method = method, - // IsDisplay = is_display ? 1 : 0, - //}; - try { @@ -193,9 +159,12 @@ namespace HealthMonitor.Service.Sub } consumer.Commit(consumeRes); - Console.WriteLine("consumer.Commit"); + Console.WriteLine("监听中...."); } + // close consumer after use.Otherwise will lead memory leak. + _consumer.Close(); + TDengine.Close(_conn); } @@ -203,10 +172,10 @@ namespace HealthMonitor.Service.Sub public void ParsePackage(ReceiveMessageModel model) { var msg = _resolverFactory.ParseAndWrap(model); - Console.WriteLine("msg"); - cnt++; - Console.WriteLine(cnt); - Console.WriteLine(msg!.MessageId); + //Console.WriteLine("msg"); + //cnt++; + //Console.WriteLine(cnt); + //Console.WriteLine(msg!.MessageId); if (msg == null) return; // ConcurrentQueue messageQueue = new ConcurrentQueue(); //_msgManager.EnqueueMsg(msg!); diff --git a/HealthMonitor.WebApi/PackageProcess.cs b/HealthMonitor.WebApi/PackageProcess.cs index af3ee47..1c62f6f 100644 --- a/HealthMonitor.WebApi/PackageProcess.cs +++ b/HealthMonitor.WebApi/PackageProcess.cs @@ -8,26 +8,24 @@ namespace HealthMonitor.WebApi public class PackageProcess : IDisposable { private readonly ILogger _logger; + + private readonly MsgQueueManager _msgQueueManager; - private readonly MsgManager _msgManager; - - public PackageProcess(ILogger logger, MsgManager msgManager) + public PackageProcess(ILogger logger, MsgQueueManager msgQueueManager) { _logger = logger; - _msgManager = msgManager; + + _msgQueueManager = msgQueueManager; } public async Task ResolveAsync() { - - // ConcurrentQueue messageQueue = new ConcurrentQueue(); - var resolver = _msgManager.GetMsgResolver(); + var resolver = _msgQueueManager.GetMsgResolver(); try { if (resolver != null) { - // resolver.SetResolveInfo(); await resolver.ExecuteMessageAsync().ConfigureAwait(false); } diff --git a/HealthMonitor.WebApi/Program.cs b/HealthMonitor.WebApi/Program.cs index 3e7dc06..1c0fd97 100644 --- a/HealthMonitor.WebApi/Program.cs +++ b/HealthMonitor.WebApi/Program.cs @@ -173,7 +173,6 @@ namespace HealthMonitor.WebApi builder.Services .AddSingleton() - .AddSingleton() .AddHostedService(); #endregion