diff --git a/HealthMonitor.Service/HealthMonitor.Service.csproj b/HealthMonitor.Service/HealthMonitor.Service.csproj
index 1254ad6..26b3e93 100644
--- a/HealthMonitor.Service/HealthMonitor.Service.csproj
+++ b/HealthMonitor.Service/HealthMonitor.Service.csproj
@@ -10,6 +10,7 @@
+
diff --git a/HealthMonitor.Service/Resolver/BloodpressResolver.cs b/HealthMonitor.Service/Resolver/BloodpressResolver.cs
index dfb6b03..dcf1f9d 100644
--- a/HealthMonitor.Service/Resolver/BloodpressResolver.cs
+++ b/HealthMonitor.Service/Resolver/BloodpressResolver.cs
@@ -1,12 +1,21 @@
+using HealthMonitor.Common;
+using HealthMonitor.Service.Biz.db;
+using HealthMonitor.Service.Cache;
using HealthMonitor.Service.Resolver.Interface;
+using HealthMonitor.Service.Sub;
+using HealthMonitor.Service.Sub.Topic.Model;
using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
using System;
using System.Collections.Generic;
+using System.Data.Common;
using System.Linq;
using System.Text;
+using System.Text.Json.Serialization;
using System.Threading.Tasks;
using TDengineTMQ;
+using TelpoDataService.Util.Entities.GpsLocationHistory;
namespace HealthMonitor.Service.Resolver
{
@@ -14,24 +23,153 @@ namespace HealthMonitor.Service.Resolver
public class BloodpressResolver: IResolver
{
private readonly ILogger _logger;
- public BloodpressResolver
- (
- ILogger logger
- )
+ private readonly PersonCacheManager _personCacheMgr;
+ private readonly TDengineService _serviceTDengine;
+ private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
+
+ private AsyncLocal _messageId = new AsyncLocal();
+ private AsyncLocal _msgData = new AsyncLocal();
+
+ public BloodpressResolver(
+ TDengineService serviceDengine,
+ BloodPressReferenceValueCacheManager bpRefValCacheManager,
+ PersonCacheManager personCacheMgr,
+ ILogger logger)
{
+ _serviceTDengine = serviceDengine;
+ _bpRefValCacheManager = bpRefValCacheManager;
_logger = logger;
+ _personCacheMgr = personCacheMgr;
+ }
+
+ public void SetResolveInfo(PackageMsgModel msg)
+ {
+ var topicHmBloodPress = JsonConvert.DeserializeObject(msg.DetailData.ToString()!);
+ _messageId.Value = msg.MessageId;
+ _msgData.Value = new HisGpsBloodPress()
+ {
+ BloodPressId = topicHmBloodPress!.BloodPressId,
+ MessageId = topicHmBloodPress!.MessageId,
+ Serialno= topicHmBloodPress!.Serialno,
+ SystolicValue = topicHmBloodPress!.SystolicValue,
+ DiastolicValue= topicHmBloodPress!.DiastolicValue,
+ LastUpdate= DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmBloodPress.LastUpdate) / 1000000),
+ CreateTime= DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(topicHmBloodPress.CreateTime) / 1000000),
+ Method= topicHmBloodPress!.Method,
+ IsDisplay=topicHmBloodPress!.IsDisplay ? 1 : 0
+ };
+ Console.WriteLine($"SetResolveInfo" + _messageId.Value);
+
}
- public void SetResolveInfo(object msg)
+ public override string ToString()
{
- throw new NotImplementedException();
+ return $"{nameof(BloodpressResolver)}[{_messageId.Value}]";
}
- public Task ExecuteMessageAsync()
+ public async Task ExecuteMessageAsync()
{
- throw new NotImplementedException();
+ var messageId = _messageId.Value;
+ Console.WriteLine($"ExecuteMessageAsync" + messageId);
+ var bp = _msgData.Value!;
+
+ //#region 获取个人信息
+
+ var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false);
+ Console.WriteLine(person?.Person.PersonName);
+ // 验证这个信息是否存在
+ //if (person == null || person?.Person.BornDate == null)
+ //{
+ // Console.WriteLine("验证这个信息是否存在");
+ // return;
+ //}
+ //// 验证年龄是否在范围 (2 - 120)
+ //var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
+ //if (age < 1 || age > 120)
+ //{
+ // Console.WriteLine("验证年龄是否在范围 (2 - 120)");
+ //}
+ //#endregion
+
+ //var gender = person?.Person.Gender == true ? 1 : 2;
+ //var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
+ //var height = SafeType.SafeDouble(person?.Person.Height!);
+ //var weight = SafeType.SafeDouble(person?.Person.Weight!);
+
+ //#region 计算增量值
+ //var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);
+
+ //var systolicRefValue = bpRef?.Systolic;//?
+ //var diastolicRefValue = bpRef?.Diastolic;//?
+ //int duration = 30;
+ //// 获取历史数据
+ ////DateTime now = DateTime.Now;
+ //DateTime now = (DateTime)bp.LastUpdate!; //测试
+ //DateTime startTime = now.AddDays(-duration);
+ //DateTime endTime = now;
+
+ ////
+ //var systolicAggregate = await _serviceTDengine.GetAggregateValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
+ //var diastolicAggregate = await _serviceTDengine.GetAggregateValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
+
+ //// 最大值
+ //var systolicMax = systolicAggregate.Max;
+ //var diastolicMax = diastolicAggregate.Max;
+ //// 最小值
+ //var systolicMin = systolicAggregate.Min;
+ //var diastolicMin = diastolicAggregate.Min;
+
+
+ //// 计算去除最大值和最小值和异常值的平均值
+ //var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} ");
+ //var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}");
+
+ //// 偏移参数
+ //var avgOffset = 0.25M;
+
+ //var systolicAvgOffset = avgOffset;
+ //var diastolicAvgOffset = avgOffset;
+
+
+ //// 增量值=(标定值-平均值)* 0.25
+ //var systolicInc = systolicAvg.Equals(0M) ? 0 : (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
+ //var diastolicInc = diastolicAvg.Equals(0M) ? 0 : (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;
+
+ //#endregion
+
+ //#region 插入BP增量值 hm_bloodpress_stats_inc
+ //var sql = $"INSERT INTO health_monitor.hm_bloodpress_stats_inc VALUES(" +
+ // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
+ // $"'{bp.BloodPressId}'," +
+ // $"'{bp.MessageId}'," +
+ // $"'{bp.Serialno}'," +
+ // $"{bp.SystolicValue}," +
+ // $"{systolicRefValue}," +
+ // $"{systolicAvg}," +
+ // $"{systolicMax}," +
+ // $"{systolicMin}," +
+ // $"{systolicAvgOffset}," +
+ // $"{systolicInc}," +
+ // $"{bp.DiastolicValue}," +
+ // $"{diastolicRefValue}," +
+ // $"{diastolicAvg}," +
+ // $"{diastolicMax}," +
+ // $"{diastolicMin}," +
+ // $"{diastolicAvgOffset}," +
+ // $"{diastolicInc}," +
+ // $"{gender}," +
+ // $"{age}," +
+ // $"{height}," +
+ // $"{weight}," +
+ // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
+ // $"{duration}," +
+ // $"'{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
+ // $"'{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
+ // $"'{string.Empty}')";
+ //var res = await _serviceTDengine.GernalRestSql(sql);
+ //#endregion
}
-
+
}
}
diff --git a/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs b/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs
index b936a1c..eae133c 100644
--- a/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs
+++ b/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs
@@ -1,5 +1,6 @@
using HealthMonitor.Service.Resolver.Interface;
using HealthMonitor.Service.Sub;
+using HealthMonitor.Service.Sub.Topic.Model;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
@@ -20,20 +21,30 @@ namespace HealthMonitor.Service.Resolver.Factory
_logger = logger;
}
- public dynamic ParseAndWrap(ReceiveMessageModel msg)
+ public PackageMsgModel? ParseAndWrap(ReceiveMessageModel msg)
{
- var table=msg.Topic;
- switch (table)
+ var topic=msg.Topic;
+ switch (topic)
{
case "topic_hm_bp_stats":
- HisGpsBloodPress bloodPress = JsonConvert.DeserializeObject(msg.Body)!;
- return bloodPress!;
+ //TopicHmBloodPress bloodPress = JsonConvert.DeserializeObject(msg.Body)!;
+
+ return new PackageMsgModel()
+ {
+ DB= msg.DB,
+ MessageId=msg.MessageId,
+ Topic= topic,
+ DetailData= msg.Body,
+
+ };
+
+ // return bloodPress!;
default:
break;
}
- return false;
+ return null;
}
}
}
diff --git a/HealthMonitor.Service/Resolver/Interface/IResolver.cs b/HealthMonitor.Service/Resolver/Interface/IResolver.cs
index 92cc9a2..4f715e7 100644
--- a/HealthMonitor.Service/Resolver/Interface/IResolver.cs
+++ b/HealthMonitor.Service/Resolver/Interface/IResolver.cs
@@ -1,4 +1,5 @@
-using System;
+using HealthMonitor.Service.Sub;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -9,7 +10,7 @@ namespace HealthMonitor.Service.Resolver.Interface
{
public interface IResolver
{
- void SetResolveInfo(object msg);
+ void SetResolveInfo(PackageMsgModel msg);
Task ExecuteMessageAsync();
}
diff --git a/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs b/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs
index 093d72e..6ba5a1c 100644
--- a/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs
+++ b/HealthMonitor.Service/Resolver/Interface/IResolverFactory.cs
@@ -10,6 +10,6 @@ namespace HealthMonitor.Service.Resolver.Interface
{
public interface IResolverFactory
{
- dynamic ParseAndWrap(ReceiveMessageModel msg);
+ PackageMsgModel? ParseAndWrap(ReceiveMessageModel msg);
}
}
diff --git a/HealthMonitor.Service/Sub/MsgManager.cs b/HealthMonitor.Service/Sub/MsgManager.cs
index 2c6ffc6..54a038b 100644
--- a/HealthMonitor.Service/Sub/MsgManager.cs
+++ b/HealthMonitor.Service/Sub/MsgManager.cs
@@ -2,6 +2,7 @@
using HealthMonitor.Service.Resolver.Interface;
using Microsoft.Extensions.Logging;
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -10,31 +11,47 @@ using System.Threading.Tasks;
namespace HealthMonitor.Service.Sub
{
+
+
public class MsgManager
{
private readonly BloodpressResolver _resolver;
private readonly ILogger _logger;
- private object _obj = default!;
+ private PackageMsgModel _msg = default!;
public MsgManager(ILogger logger, BloodpressResolver resolver)
{
_logger = logger;
_resolver = resolver;
}
- public void AddMsg(object msg)
+ public void EnqueueMsg(PackageMsgModel msg)
{
- _obj = msg;
+ _msg = msg;
+ Console.WriteLine($"GetMsgResolver-{_msg.MessageId}");
}
public IResolver? GetMsgResolver()
{
- if (_obj == null)
+ if (_msg == null)
{
- return null;
+ return null;
}
- _resolver.SetResolveInfo(_obj);
+ Console.WriteLine($"GetMsgResolver-{_msg.MessageId}");
+ _resolver.SetResolveInfo(_msg);
+ _msg = default!;
return _resolver;
}
+ //public IResolver? GetMsgResolver(PackageMsgModel msg)
+ //{
+ // if (_msg == null)
+ // {
+ // return null;
+ // }
+ // Console.WriteLine($"GetMsgResolver-{_msg.MessageId}");
+ // _resolver.SetResolveInfo(msg);
+ // _msg = default!;
+ // return _resolver;
+ //}
}
}
diff --git a/HealthMonitor.Service/Sub/MsgQueueManager.cs b/HealthMonitor.Service/Sub/MsgQueueManager.cs
new file mode 100644
index 0000000..9df1a0e
--- /dev/null
+++ b/HealthMonitor.Service/Sub/MsgQueueManager.cs
@@ -0,0 +1,14 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace HealthMonitor.Service.Sub
+{
+ public class MsgQueueManager: ConcurrentQueue
+ {
+
+ }
+}
diff --git a/HealthMonitor.Service/Sub/PackageMsgModel.cs b/HealthMonitor.Service/Sub/PackageMsgModel.cs
new file mode 100644
index 0000000..01aa570
--- /dev/null
+++ b/HealthMonitor.Service/Sub/PackageMsgModel.cs
@@ -0,0 +1,17 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace HealthMonitor.Service.Sub
+{
+ public class PackageMsgModel
+ {
+ public object DetailData { get; set; } = default!;
+ public string MessageId { get; set; } = default!;
+ public string Topic { get; set; } = default!;
+ public string Table { get; set; } = default!;
+ public string DB { get; set; } = default!;
+ }
+}
diff --git a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs
index 231e051..1d5e086 100644
--- a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs
+++ b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs
@@ -4,9 +4,11 @@ using HealthMonitor.Service.Biz.db;
using HealthMonitor.Service.Cache;
using HealthMonitor.Service.Resolver.Factory;
using HealthMonitor.Service.Resolver.Interface;
+using HealthMonitor.Service.Sub.Topic.Model;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -120,37 +122,63 @@ namespace HealthMonitor.Service.Sub
{
if (((i + 1) % kv.Value.Metas.Count == 0))
{
- string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
- string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
- string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
- int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
- int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
- DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
- DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
- int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
- bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
-
- HisGpsBloodPress bp = new()
- {
- BloodPressId = bloodpress_id,
- MessageId = message_id,
- Serialno = serialno,
- SystolicValue = systolic_value,
- DiastolicValue = diastolic_value,
- CreateTime = create_time,
- LastUpdate = last_update,
- Method = method,
- IsDisplay = is_display ? 1 : 0,
- };
+ //string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
+ //string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
+ //string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
+ //int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
+ //int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
+ //DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
+ //DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
+ //int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
+ //bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);
+
+ //IDictionary row = new Dictionary();
+ //foreach (var meta in kv.Value.Metas)
+ //{
+ // int index = i-(kv.Value.Metas.Count-kv.Value.Metas.IndexOf(meta)-1);
+ // //var value = kv.Value.Datas[index];
+ // row.Add(meta.name, kv.Value.Datas[index]);
+ //}
+
+ //var body2 = JsonConvert.SerializeObject(row);
+ //kv.Value.Metas.ForEach(meta =>
+ //{
+ // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
+ //});
+
+
+
+ //HisGpsBloodPress bp = new()
+ //{
+ // BloodPressId = bloodpress_id,
+ // MessageId = message_id,
+ // Serialno = serialno,
+ // SystolicValue = systolic_value,
+ // DiastolicValue = diastolic_value,
+ // CreateTime = create_time,
+ // LastUpdate = last_update,
+ // Method = method,
+ // IsDisplay = is_display ? 1 : 0,
+ //};
try
{
+
+ IDictionary row = new Dictionary();
+ foreach (var meta in kv.Value.Metas)
+ {
+ int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
+ //var value = kv.Value.Datas[index];
+ row.Add(meta.name, kv.Value.Datas[index]);
+ }
+
+
var db = kv.Key.db;
var table = kv.Key.table;
var kvTopic = kv.Key.topic;
- var body = JsonConvert.SerializeObject(bp);
+ var body = JsonConvert.SerializeObject(row);
- ReceiveMessageModel msg = new(db, table, kvTopic, Guid.NewGuid().ToString("N"), body);
+ ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
ParsePackage(msg);
}
catch (Exception ex)
@@ -173,8 +201,12 @@ namespace HealthMonitor.Service.Sub
{
var msg = _resolverFactory.ParseAndWrap(model);
Console.WriteLine("msg");
+ cnt++;
+ Console.WriteLine(cnt);
+ Console.WriteLine(msg!.MessageId);
if (msg == null) return;
- _msgManager.AddMsg(msg);
+ // ConcurrentQueue messageQueue = new ConcurrentQueue();
+ _msgManager.EnqueueMsg(msg!);
}
diff --git a/HealthMonitor.Service/Sub/Topic/Model/TopicHmBloodPress.cs b/HealthMonitor.Service/Sub/Topic/Model/TopicHmBloodPress.cs
new file mode 100644
index 0000000..0f1a2c9
--- /dev/null
+++ b/HealthMonitor.Service/Sub/Topic/Model/TopicHmBloodPress.cs
@@ -0,0 +1,34 @@
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.ComponentModel.DataAnnotations;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace HealthMonitor.Service.Sub.Topic.Model
+{
+ public class TopicHmBloodPress
+ {
+ [JsonProperty("ts")]
+ public long Ts { get; set; } = default!;
+ [JsonProperty("bloodpress_id")]
+ public string BloodPressId { get; set; } = default!;
+ [JsonProperty("message_id")]
+ public string MessageId { get; set; } = default!;
+ [JsonProperty("serialno")]
+ public string Serialno { get; set; } = default!;
+ [JsonProperty("systolic_value")]
+ public int SystolicValue { get; set; }
+ [JsonProperty("diastolic_value")]
+ public int DiastolicValue { get; set; }
+ [JsonProperty("create_time")]
+ public long LastUpdate { get; set; }
+ [JsonProperty("last_update")]
+ public long CreateTime { get; set; }
+ [JsonProperty("method")]
+ public int Method { get; set; }
+ [JsonProperty("is_display")]
+ public bool IsDisplay { get; set; }
+ }
+}
diff --git a/HealthMonitor.WebApi/PackageProcess.cs b/HealthMonitor.WebApi/PackageProcess.cs
index 57b0f7a..af3ee47 100644
--- a/HealthMonitor.WebApi/PackageProcess.cs
+++ b/HealthMonitor.WebApi/PackageProcess.cs
@@ -1,5 +1,7 @@
using HealthMonitor.Service.Resolver.Interface;
using HealthMonitor.Service.Sub;
+using HealthMonitor.Service.Sub.Interface;
+using System.Collections.Concurrent;
namespace HealthMonitor.WebApi
{
@@ -17,11 +19,15 @@ namespace HealthMonitor.WebApi
public async Task ResolveAsync()
{
+
+ // ConcurrentQueue messageQueue = new ConcurrentQueue();
+ var resolver = _msgManager.GetMsgResolver();
try
{
- var resolver = _msgManager.GetMsgResolver();
+
if (resolver != null)
{
+ // resolver.SetResolveInfo();
await resolver.ExecuteMessageAsync().ConfigureAwait(false);
}