From 65df0977c0ba8a603a2afdc8c2aeeb493ba59dfe Mon Sep 17 00:00:00 2001 From: H Vs Date: Tue, 27 Jun 2023 11:24:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=9A=E5=8A=A1=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../HealthMonitor.Service.csproj | 1 + .../Resolver/BloodpressResolver.cs | 156 +++++++++++++++++- .../Resolver/Factory/ResolverFactory.cs | 23 ++- .../Resolver/Interface/IResolver.cs | 5 +- .../Resolver/Interface/IResolverFactory.cs | 2 +- HealthMonitor.Service/Sub/MsgManager.cs | 29 +++- HealthMonitor.Service/Sub/MsgQueueManager.cs | 14 ++ HealthMonitor.Service/Sub/PackageMsgModel.cs | 17 ++ .../Sub/TDengineDataSubcribe.cs | 82 ++++++--- .../Sub/Topic/Model/TopicHmBloodPress.cs | 34 ++++ HealthMonitor.WebApi/PackageProcess.cs | 8 +- 11 files changed, 321 insertions(+), 50 deletions(-) create mode 100644 HealthMonitor.Service/Sub/MsgQueueManager.cs create mode 100644 HealthMonitor.Service/Sub/PackageMsgModel.cs create mode 100644 HealthMonitor.Service/Sub/Topic/Model/TopicHmBloodPress.cs diff --git a/HealthMonitor.Service/HealthMonitor.Service.csproj b/HealthMonitor.Service/HealthMonitor.Service.csproj index 1254ad6..26b3e93 100644 --- a/HealthMonitor.Service/HealthMonitor.Service.csproj +++ b/HealthMonitor.Service/HealthMonitor.Service.csproj @@ -10,6 +10,7 @@ + diff --git a/HealthMonitor.Service/Resolver/BloodpressResolver.cs b/HealthMonitor.Service/Resolver/BloodpressResolver.cs index dfb6b03..dcf1f9d 100644 --- a/HealthMonitor.Service/Resolver/BloodpressResolver.cs +++ b/HealthMonitor.Service/Resolver/BloodpressResolver.cs @@ -1,12 +1,21 @@  +using HealthMonitor.Common; +using HealthMonitor.Service.Biz.db; +using HealthMonitor.Service.Cache; using HealthMonitor.Service.Resolver.Interface; +using HealthMonitor.Service.Sub; +using HealthMonitor.Service.Sub.Topic.Model; using Microsoft.Extensions.Logging; +using Newtonsoft.Json; using System; using System.Collections.Generic; +using System.Data.Common; using System.Linq; using System.Text; +using System.Text.Json.Serialization; using System.Threading.Tasks; using TDengineTMQ; +using TelpoDataService.Util.Entities.GpsLocationHistory; namespace HealthMonitor.Service.Resolver { @@ -14,24 +23,153 @@ namespace HealthMonitor.Service.Resolver public class BloodpressResolver: IResolver { private readonly ILogger _logger; - public BloodpressResolver - ( - ILogger logger - ) + private readonly PersonCacheManager _personCacheMgr; + private readonly TDengineService _serviceTDengine; + private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager; + + private AsyncLocal _messageId = new AsyncLocal(); + private AsyncLocal _msgData = new AsyncLocal(); + + public BloodpressResolver( + TDengineService serviceDengine, + BloodPressReferenceValueCacheManager bpRefValCacheManager, + PersonCacheManager personCacheMgr, + ILogger logger) { + _serviceTDengine = serviceDengine; + _bpRefValCacheManager = bpRefValCacheManager; _logger = logger; + _personCacheMgr = personCacheMgr; + } + + public void SetResolveInfo(PackageMsgModel msg) + { + var topicHmBloodPress = JsonConvert.DeserializeObject(msg.DetailData.ToString()!); + _messageId.Value = msg.MessageId; + _msgData.Value = new HisGpsBloodPress() + { + BloodPressId = topicHmBloodPress!.BloodPressId, + MessageId = topicHmBloodPress!.MessageId, + Serialno= topicHmBloodPress!.Serialno, + SystolicValue = topicHmBloodPress!.SystolicValue, + DiastolicValue= topicHmBloodPress!.DiastolicValue, + LastUpdate= DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmBloodPress.LastUpdate) / 1000000), + CreateTime= DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmBloodPress.CreateTime) / 1000000), + Method= topicHmBloodPress!.Method, + IsDisplay=topicHmBloodPress!.IsDisplay ? 1 : 0 + }; + Console.WriteLine($"SetResolveInfo" + _messageId.Value); + } - public void SetResolveInfo(object msg) + public override string ToString() { - throw new NotImplementedException(); + return $"{nameof(BloodpressResolver)}[{_messageId.Value}]"; } - public Task ExecuteMessageAsync() + public async Task ExecuteMessageAsync() { - throw new NotImplementedException(); + var messageId = _messageId.Value; + Console.WriteLine($"ExecuteMessageAsync" + messageId); + var bp = _msgData.Value!; + + //#region 获取个人信息 + + var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false); + Console.WriteLine(person?.Person.PersonName); + // 验证这个信息是否存在 + //if (person == null || person?.Person.BornDate == null) + //{ + // Console.WriteLine("验证这个信息是否存在"); + // return; + //} + //// 验证年龄是否在范围 (2 - 120) + //var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!); + //if (age < 1 || age > 120) + //{ + // Console.WriteLine("验证年龄是否在范围 (2 - 120)"); + //} + //#endregion + + //var gender = person?.Person.Gender == true ? 1 : 2; + //var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!); + //var height = SafeType.SafeDouble(person?.Person.Height!); + //var weight = SafeType.SafeDouble(person?.Person.Weight!); + + //#region 计算增量值 + //var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension); + + //var systolicRefValue = bpRef?.Systolic;//? + //var diastolicRefValue = bpRef?.Diastolic;//? + //int duration = 30; + //// 获取历史数据 + ////DateTime now = DateTime.Now; + //DateTime now = (DateTime)bp.LastUpdate!; //测试 + //DateTime startTime = now.AddDays(-duration); + //DateTime endTime = now; + + //// + //var systolicAggregate = await _serviceTDengine.GetAggregateValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'"); + //var diastolicAggregate = await _serviceTDengine.GetAggregateValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'"); + + //// 最大值 + //var systolicMax = systolicAggregate.Max; + //var diastolicMax = diastolicAggregate.Max; + //// 最小值 + //var systolicMin = systolicAggregate.Min; + //var diastolicMin = diastolicAggregate.Min; + + + //// 计算去除最大值和最小值和异常值的平均值 + //var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} "); + //var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}"); + + //// 偏移参数 + //var avgOffset = 0.25M; + + //var systolicAvgOffset = avgOffset; + //var diastolicAvgOffset = avgOffset; + + + //// 增量值=(标定值-平均值)* 0.25 + //var systolicInc = systolicAvg.Equals(0M) ? 0 : (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!; + //var diastolicInc = diastolicAvg.Equals(0M) ? 0 : (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!; + + //#endregion + + //#region 插入BP增量值 hm_bloodpress_stats_inc + //var sql = $"INSERT INTO health_monitor.hm_bloodpress_stats_inc VALUES(" + + // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," + + // $"'{bp.BloodPressId}'," + + // $"'{bp.MessageId}'," + + // $"'{bp.Serialno}'," + + // $"{bp.SystolicValue}," + + // $"{systolicRefValue}," + + // $"{systolicAvg}," + + // $"{systolicMax}," + + // $"{systolicMin}," + + // $"{systolicAvgOffset}," + + // $"{systolicInc}," + + // $"{bp.DiastolicValue}," + + // $"{diastolicRefValue}," + + // $"{diastolicAvg}," + + // $"{diastolicMax}," + + // $"{diastolicMin}," + + // $"{diastolicAvgOffset}," + + // $"{diastolicInc}," + + // $"{gender}," + + // $"{age}," + + // $"{height}," + + // $"{weight}," + + // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," + + // $"{duration}," + + // $"'{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," + + // $"'{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," + + // $"'{string.Empty}')"; + //var res = await _serviceTDengine.GernalRestSql(sql); + //#endregion } - + } } diff --git a/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs b/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs index b936a1c..eae133c 100644 --- a/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs +++ b/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs @@ -1,5 +1,6 @@ using HealthMonitor.Service.Resolver.Interface; using HealthMonitor.Service.Sub; +using HealthMonitor.Service.Sub.Topic.Model; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using System; @@ -20,20 +21,30 @@ namespace HealthMonitor.Service.Resolver.Factory _logger = logger; } - public dynamic ParseAndWrap(ReceiveMessageModel msg) + public PackageMsgModel? ParseAndWrap(ReceiveMessageModel msg) { - var table=msg.Topic; - switch (table) + var topic=msg.Topic; + switch (topic) { case "topic_hm_bp_stats": - HisGpsBloodPress bloodPress = JsonConvert.DeserializeObject(msg.Body)!; - return bloodPress!; + //TopicHmBloodPress bloodPress = JsonConvert.DeserializeObject(msg.Body)!; + + return new PackageMsgModel() + { + DB= msg.DB, + MessageId=msg.MessageId, + Topic= topic, + DetailData= msg.Body, + + }; + + // return bloodPress!; default: break; } - return false; + return null; } } } diff --git a/HealthMonitor.Service/Resolver/Interface/IResolver.cs b/HealthMonitor.Service/Resolver/Interface/IResolver.cs index 92cc9a2..4f715e7 100644 --- a/HealthMonitor.Service/Resolver/Interface/IResolver.cs +++ b/HealthMonitor.Service/Resolver/Interface/IResolver.cs @@ -1,4 +1,5 @@ -using System; +using HealthMonitor.Service.Sub; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -9,7 +10,7 @@ namespace HealthMonitor.Service.Resolver.Interface { public interface IResolver { - void SetResolveInfo(object msg); + void SetResolveInfo(PackageMsgModel msg); Task ExecuteMessageAsync(); } diff --git a/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs b/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs index 093d72e..6ba5a1c 100644 --- a/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs +++ b/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs @@ -10,6 +10,6 @@ namespace HealthMonitor.Service.Resolver.Interface { public interface IResolverFactory { - dynamic ParseAndWrap(ReceiveMessageModel msg); + PackageMsgModel? ParseAndWrap(ReceiveMessageModel msg); } } diff --git a/HealthMonitor.Service/Sub/MsgManager.cs b/HealthMonitor.Service/Sub/MsgManager.cs index 2c6ffc6..54a038b 100644 --- a/HealthMonitor.Service/Sub/MsgManager.cs +++ b/HealthMonitor.Service/Sub/MsgManager.cs @@ -2,6 +2,7 @@ using HealthMonitor.Service.Resolver.Interface; using Microsoft.Extensions.Logging; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; @@ -10,31 +11,47 @@ using System.Threading.Tasks; namespace HealthMonitor.Service.Sub { + + public class MsgManager { private readonly BloodpressResolver _resolver; private readonly ILogger _logger; - private object _obj = default!; + private PackageMsgModel _msg = default!; public MsgManager(ILogger logger, BloodpressResolver resolver) { _logger = logger; _resolver = resolver; } - public void AddMsg(object msg) + public void EnqueueMsg(PackageMsgModel msg) { - _obj = msg; + _msg = msg; + Console.WriteLine($"GetMsgResolver-{_msg.MessageId}"); } public IResolver? GetMsgResolver() { - if (_obj == null) + if (_msg == null) { - return null; + return null; } - _resolver.SetResolveInfo(_obj); + Console.WriteLine($"GetMsgResolver-{_msg.MessageId}"); + _resolver.SetResolveInfo(_msg); + _msg = default!; return _resolver; } + //public IResolver? GetMsgResolver(PackageMsgModel msg) + //{ + // if (_msg == null) + // { + // return null; + // } + // Console.WriteLine($"GetMsgResolver-{_msg.MessageId}"); + // _resolver.SetResolveInfo(msg); + // _msg = default!; + // return _resolver; + //} } } diff --git a/HealthMonitor.Service/Sub/MsgQueueManager.cs b/HealthMonitor.Service/Sub/MsgQueueManager.cs new file mode 100644 index 0000000..9df1a0e --- /dev/null +++ b/HealthMonitor.Service/Sub/MsgQueueManager.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Service.Sub +{ + public class MsgQueueManager: ConcurrentQueue + { + + } +} diff --git a/HealthMonitor.Service/Sub/PackageMsgModel.cs b/HealthMonitor.Service/Sub/PackageMsgModel.cs new file mode 100644 index 0000000..01aa570 --- /dev/null +++ b/HealthMonitor.Service/Sub/PackageMsgModel.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Service.Sub +{ + public class PackageMsgModel + { + public object DetailData { get; set; } = default!; + public string MessageId { get; set; } = default!; + public string Topic { get; set; } = default!; + public string Table { get; set; } = default!; + public string DB { get; set; } = default!; + } +} diff --git a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs index 231e051..1d5e086 100644 --- a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs +++ b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs @@ -4,9 +4,11 @@ using HealthMonitor.Service.Biz.db; using HealthMonitor.Service.Cache; using HealthMonitor.Service.Resolver.Factory; using HealthMonitor.Service.Resolver.Interface; +using HealthMonitor.Service.Sub.Topic.Model; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; @@ -120,37 +122,63 @@ namespace HealthMonitor.Service.Sub { if (((i + 1) % kv.Value.Metas.Count == 0)) { - string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]); - string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]); - string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]); - int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]); - int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]); - DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000); - DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000); - int method = SafeType.SafeInt(kv.Value.Datas[i - 1]); - bool is_display = SafeType.SafeBool(kv.Value.Datas[i]); - - HisGpsBloodPress bp = new() - { - BloodPressId = bloodpress_id, - MessageId = message_id, - Serialno = serialno, - SystolicValue = systolic_value, - DiastolicValue = diastolic_value, - CreateTime = create_time, - LastUpdate = last_update, - Method = method, - IsDisplay = is_display ? 1 : 0, - }; + //string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]); + //string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]); + //string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]); + //int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]); + //int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]); + //DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000); + //DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000); + //int method = SafeType.SafeInt(kv.Value.Datas[i - 1]); + //bool is_display = SafeType.SafeBool(kv.Value.Datas[i]); + + //IDictionary row = new Dictionary(); + //foreach (var meta in kv.Value.Metas) + //{ + // int index = i-(kv.Value.Metas.Count-kv.Value.Metas.IndexOf(meta)-1); + // //var value = kv.Value.Datas[index]; + // row.Add(meta.name, kv.Value.Datas[index]); + //} + + //var body2 = JsonConvert.SerializeObject(row); + //kv.Value.Metas.ForEach(meta => + //{ + // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size); + //}); + + + + //HisGpsBloodPress bp = new() + //{ + // BloodPressId = bloodpress_id, + // MessageId = message_id, + // Serialno = serialno, + // SystolicValue = systolic_value, + // DiastolicValue = diastolic_value, + // CreateTime = create_time, + // LastUpdate = last_update, + // Method = method, + // IsDisplay = is_display ? 1 : 0, + //}; try { + + IDictionary row = new Dictionary(); + foreach (var meta in kv.Value.Metas) + { + int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1); + //var value = kv.Value.Datas[index]; + row.Add(meta.name, kv.Value.Datas[index]); + } + + var db = kv.Key.db; var table = kv.Key.table; var kvTopic = kv.Key.topic; - var body = JsonConvert.SerializeObject(bp); + var body = JsonConvert.SerializeObject(row); - ReceiveMessageModel msg = new(db, table, kvTopic, Guid.NewGuid().ToString("N"), body); + ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body); ParsePackage(msg); } catch (Exception ex) @@ -173,8 +201,12 @@ namespace HealthMonitor.Service.Sub { var msg = _resolverFactory.ParseAndWrap(model); Console.WriteLine("msg"); + cnt++; + Console.WriteLine(cnt); + Console.WriteLine(msg!.MessageId); if (msg == null) return; - _msgManager.AddMsg(msg); + // ConcurrentQueue messageQueue = new ConcurrentQueue(); + _msgManager.EnqueueMsg(msg!); } diff --git a/HealthMonitor.Service/Sub/Topic/Model/TopicHmBloodPress.cs b/HealthMonitor.Service/Sub/Topic/Model/TopicHmBloodPress.cs new file mode 100644 index 0000000..0f1a2c9 --- /dev/null +++ b/HealthMonitor.Service/Sub/Topic/Model/TopicHmBloodPress.cs @@ -0,0 +1,34 @@ +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Service.Sub.Topic.Model +{ + public class TopicHmBloodPress + { + [JsonProperty("ts")] + public long Ts { get; set; } = default!; + [JsonProperty("bloodpress_id")] + public string BloodPressId { get; set; } = default!; + [JsonProperty("message_id")] + public string MessageId { get; set; } = default!; + [JsonProperty("serialno")] + public string Serialno { get; set; } = default!; + [JsonProperty("systolic_value")] + public int SystolicValue { get; set; } + [JsonProperty("diastolic_value")] + public int DiastolicValue { get; set; } + [JsonProperty("create_time")] + public long LastUpdate { get; set; } + [JsonProperty("last_update")] + public long CreateTime { get; set; } + [JsonProperty("method")] + public int Method { get; set; } + [JsonProperty("is_display")] + public bool IsDisplay { get; set; } + } +} diff --git a/HealthMonitor.WebApi/PackageProcess.cs b/HealthMonitor.WebApi/PackageProcess.cs index 57b0f7a..af3ee47 100644 --- a/HealthMonitor.WebApi/PackageProcess.cs +++ b/HealthMonitor.WebApi/PackageProcess.cs @@ -1,5 +1,7 @@ using HealthMonitor.Service.Resolver.Interface; using HealthMonitor.Service.Sub; +using HealthMonitor.Service.Sub.Interface; +using System.Collections.Concurrent; namespace HealthMonitor.WebApi { @@ -17,11 +19,15 @@ namespace HealthMonitor.WebApi public async Task ResolveAsync() { + + // ConcurrentQueue messageQueue = new ConcurrentQueue(); + var resolver = _msgManager.GetMsgResolver(); try { - var resolver = _msgManager.GetMsgResolver(); + if (resolver != null) { + // resolver.SetResolveInfo(); await resolver.ExecuteMessageAsync().ConfigureAwait(false); }