From 2c01f55acc867046e09ca77562c887249e3a3412 Mon Sep 17 00:00:00 2001 From: H Vs Date: Tue, 27 Jun 2023 17:17:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=B9=E6=8D=AE=E6=A8=A1=E5=9E=8B=E5=88=9B?= =?UTF-8?q?=E5=BB=BAtopic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Resolver/Factory/ResolverFactory.cs | 48 ++- .../Sub/TDengineDataSubcribe.cs | 407 ++---------------- 2 files changed, 55 insertions(+), 400 deletions(-) diff --git a/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs b/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs index eae133c..50290aa 100644 --- a/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs +++ b/HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs @@ -23,28 +23,40 @@ namespace HealthMonitor.Service.Resolver.Factory public PackageMsgModel? ParseAndWrap(ReceiveMessageModel msg) { - var topic=msg.Topic; - switch (topic) - { - case "topic_hm_bp_stats": + //var topic=msg.Topic; + + + //switch (topic) + //{ + // // case "topic_hm_bp_stats": + // case "topichmbloodpress": + + // //TopicHmBloodPress bloodPress = JsonConvert.DeserializeObject(msg.Body)!; - //TopicHmBloodPress bloodPress = JsonConvert.DeserializeObject(msg.Body)!; + // return new PackageMsgModel() + // { + // DB= msg.DB, + // MessageId=msg.MessageId, + // Topic= topic, + // DetailData= msg.Body, - return new PackageMsgModel() - { - DB= msg.DB, - MessageId=msg.MessageId, - Topic= topic, - DetailData= msg.Body, - - }; + // }; - // return bloodPress!; + // // return bloodPress!; + + // default: + // break; + //} + //return null; + + return new PackageMsgModel() + { + DB = msg.DB, + MessageId = msg.MessageId, + Topic = msg.Topic, + DetailData = msg.Body, - default: - break; - } - return null; + }; } } } diff --git a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs index 96118e3..225bd71 100644 --- a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs +++ b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs @@ -11,6 +11,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Reflection; using System.Text; using System.Threading.Tasks; using TDengineDriver; @@ -22,16 +23,14 @@ namespace HealthMonitor.Service.Sub public class TDengineDataSubcribe { private readonly ILogger _logger; - private IConsumer _consumer = default!; - private IntPtr _conn = default!; private readonly MsgQueueManager _msgQueueManager; private readonly TDengineService _serviceTDengine; private readonly PersonCacheManager _personCacheMgr; private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager; private readonly IResolverFactory _resolverFactory; - private CancellationTokenSource _tokenSource = null; + private CancellationTokenSource? _tokenSource = null; - private int cnt = 0; + //private int cnt = 0; public TDengineDataSubcribe( @@ -50,7 +49,7 @@ namespace HealthMonitor.Service.Sub _logger = logger; _resolverFactory = resolverFactory; _msgQueueManager = msgQueueManager; - _conn = GetConnection(); + } public void BeginListen(CancellationToken stoppingToken) { @@ -103,10 +102,25 @@ namespace HealthMonitor.Service.Sub MsgWithTableName = "true", TDConnectIp = "47.116.142.20", }; - string topic = "topic_hm_bp_stats"; - //nameof(TopicHmBloodPress) + // string topic = "topic_hm_bp_stats"; + string topic = nameof(TopicHmBloodPress).ToLower(); + TopicHmBloodPress fields = new(); + PropertyInfo[] props = fields.GetType().GetProperties(); + + // 获取 fields + string attributes = ""; + foreach (PropertyInfo prop in props) + { + JsonPropertyAttribute attr = prop.GetCustomAttribute(); + if (attr != null) + { + attributes += attr.PropertyName + ","; + } + } + attributes = attributes.TrimEnd(','); + //create topic - IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress"); + IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select {attributes} from health_monitor.hm_bloodpress"); if (TDengine.ErrorNo(res) != 0) { @@ -120,7 +134,7 @@ namespace HealthMonitor.Service.Sub // subscribe consumer.Subscribe(topic); - while (!_tokenSource.IsCancellationRequested) + while (!_tokenSource!.IsCancellationRequested) { var consumeRes = consumer.Consume(300); foreach (KeyValuePair kv in consumeRes.Message) @@ -163,12 +177,10 @@ namespace HealthMonitor.Service.Sub } // close consumer after use.Otherwise will lead memory leak. - _consumer.Close(); - TDengine.Close(_conn); + consumer.Close(); + TDengine.Close(Connection); } - - public void ParsePackage(ReceiveMessageModel model) { var msg = _resolverFactory.ParseAndWrap(model); @@ -183,374 +195,5 @@ namespace HealthMonitor.Service.Sub } - public void CreateConnection() - { - var cfg = new ConsumerConfig - { - GourpId = "group_1", - TDConnectUser = "root", - TDConnectPasswd = "taosdata", - MsgWithTableName = "true", - TDConnectIp = "47.116.142.20", - }; - var conn = GetConnection(); - - - } - - - // 创建消费者 - public void CreateConsumer() - { - var cfg = new ConsumerConfig - { - GourpId = "group_1", - TDConnectUser = "root", - TDConnectPasswd = "taosdata", - MsgWithTableName = "true", - TDConnectIp = "47.116.142.20", - }; - //IntPtr conn = GetConnection(); - string topic = "topic_hm_bp_stats"; - //create topic - IntPtr res = TDengine.Query(_conn, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress"); - - if (TDengine.ErrorNo(res) != 0) - { - throw new Exception($"create topic failed, reason:{TDengine.Error(res)}"); - } - - // create consumer - var consumer = new ConsumerBuilder(cfg) - .Build(); - - // subscribe - consumer.Subscribe(topic); - - _consumer = consumer; - } - - //public IConsumer CreateConsumer() - //{ - // var cfg = new ConsumerConfig - // { - // GourpId = "group_1", - // TDConnectUser = "root", - // TDConnectPasswd = "taosdata", - // MsgWithTableName = "true", - // TDConnectIp = "47.116.142.20", - // }; - // var conn = GetConnection(); - // //IntPtr conn = GetConnection(); - // string topic = "topic_name"; - // //create topic - // IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from ctb1"); - - // if (TDengine.ErrorNo(res) != 0) - // { - // throw new Exception($"create topic failed, reason:{TDengine.Error(res)}"); - // } - - // // create consumer - // var consumer = new ConsumerBuilder(cfg) - // .Build(); - - // // subscribe - // consumer.Subscribe(topic); - - // return consumer; - //} - - //public void ProcessMsg() - //{ - // var consumerRes = _consumer.Consume(300); - // // process ConsumeResult - // foreach (KeyValuePair kv in consumerRes.Message) - // { - // Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString()); - - // kv.Value.Metas.ForEach(meta => - // { - // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size); - // }); - // Console.WriteLine(""); - // kv.Value.Datas.ForEach(data => - // { - // Console.WriteLine(data.ToString()); - // }); - // } - - // _consumer.Commit(consumerRes); - // // Console.WriteLine("\n================ {0} done "); - //} - - /** - public async Task ProcessMsg() - { - var consumerRes = _consumer.Consume(300); - - Console.WriteLine(consumerRes.Message.Count); - // process ConsumeResult - foreach (KeyValuePair kv in consumerRes.Message) - { - //Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString()); - - //kv.Value.Metas.ForEach(meta => - //{ - // Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size); - //}); - //Console.WriteLine("----------"); - //kv.Value.Datas.ForEach(data => - //{ - // Console.WriteLine(data.ToString()); - //}); - //Console.WriteLine("----------"); - for (int i = 0; i < kv.Value.Datas.Count; i++) - { - // Console.Write($"|{kv.Value.Datas[i].ToString()} \t"); - //Console.WriteLine("{0},{1},{2}", i, resMeta.Count, (i) % resMeta.Count); - if (((i + 1) % kv.Value.Metas.Count == 0)) - { - - - try - { - cnt++; - string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]); - string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]); - string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]); - int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]); - int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]); - DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000); - DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000); - int method = SafeType.SafeInt(kv.Value.Datas[i - 1]); - bool is_display = SafeType.SafeBool(kv.Value.Datas[i]); - - - // Console.WriteLine("----------"); - HisGpsBloodPress bp = new() - { - //BloodPressId = (string)kv.Value.Datas[i -8], - //MessageId = (string)kv.Value.Datas[i -7], - //Serialno = (string)kv.Value.Datas[i -6], - //SystolicValue = (int)kv.Value.Datas[i -5], - //DiastolicValue = (int)kv.Value.Datas[i -4], - //CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -3]/1000000), - //LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -2]/1000000), - //Method = (int)kv.Value.Datas[i -1], - //IsDisplay = (bool)kv.Value.Datas[i] ? 1 : 0, - - BloodPressId = bloodpress_id, - MessageId = message_id, - Serialno = serialno, - SystolicValue = systolic_value, - DiastolicValue = diastolic_value, - CreateTime = create_time, - LastUpdate = last_update, - Method = method, - IsDisplay = is_display ? 1 : 0, - }; - - - #region 获取个人信息 - - var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false); - // 验证这个信息是否存在 - if (person == null || person?.Person.BornDate == null) - { - Console.WriteLine("验证这个信息是否存在"); - } - else - { - // 验证年龄是否在范围 (2 - 120) - - - var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!); - #endregion - if (age < 1 || age > 120) - { - Console.WriteLine("验证年龄是否在范围 (2 - 120)"); - } - else - { - var gender = person?.Person.Gender == true ? 1 : 2; - var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!); - var height = SafeType.SafeDouble(person?.Person.Height!); - var weight = SafeType.SafeDouble(person?.Person.Weight!); - - - #region 插入当次BP数据 - // 保存到TDengine - - //var bpSql = $"INSERT INTO health_monitor.hm_bloodpress VALUES(" + - // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," + - // $"'{bp.BloodPressId}'," + - // $"'{bp.MessageId}'," + - // $"'{bp.Serialno}'," + - // $"{bp.SystolicValue}," + - // $"{bp.DiastolicValue}," + - // $"'{bp.CreateTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," + - // $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," + - // $"{bp.Method}," + - // $"{bp.IsDisplay == 1})"; - - //await _serviceTDengine.GernalRestSql(bpSql); - - #endregion - - #region 计算增量值 - var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension); - - var systolicRefValue = bpRef?.Systolic;//? - var diastolicRefValue = bpRef?.Diastolic;//? - int duration = 30; - // 获取历史数据 - //DateTime now = DateTime.Now; - DateTime now = (DateTime)bp.LastUpdate; //测试 - DateTime startTime = now.AddDays(-duration); - DateTime endTime = now; - - // - var systolicAggregate = await _serviceTDengine.GetAggregateValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'"); - var diastolicAggregate = await _serviceTDengine.GetAggregateValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'"); - - // 最大值 - var systolicMax = systolicAggregate.Max; - var diastolicMax = diastolicAggregate.Max; - // 最小值 - var systolicMin = systolicAggregate.Min; - var diastolicMin = diastolicAggregate.Min; - - - // 计算去除最大值和最小值和异常值的平均值 - var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} "); - var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}"); - - // 偏移参数 - var avgOffset = 0.25M; - - var systolicAvgOffset = avgOffset; - var diastolicAvgOffset = avgOffset; - - - // 增量值=(标定值-平均值)* 0.25 - var systolicInc = systolicAvg.Equals(0M) ? 0 : (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!; - var diastolicInc = diastolicAvg.Equals(0M) ? 0 : (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!; - - #endregion - - #region 插入BP增量值 - var sql = $"INSERT INTO health_monitor.hm_bloodpress_stats_inc VALUES(" + - $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," + - $"'{bp.BloodPressId}'," + - $"'{bp.MessageId}'," + - $"'{bp.Serialno}'," + - $"{bp.SystolicValue}," + - $"{systolicRefValue}," + - $"{systolicAvg}," + - $"{systolicMax}," + - $"{systolicMin}," + - $"{systolicAvgOffset}," + - $"{systolicInc}," + - $"{bp.DiastolicValue}," + - $"{diastolicRefValue}," + - $"{diastolicAvg}," + - $"{diastolicMax}," + - $"{diastolicMin}," + - $"{diastolicAvgOffset}," + - $"{diastolicInc}," + - $"{gender}," + - $"{age}," + - $"{height}," + - $"{weight}," + - $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," + - $"{duration}," + - $"'{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," + - $"'{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," + - $"'{string.Empty}')"; - var res = await _serviceTDengine.GernalRestSql(sql); - #endregion - - }; - // Console.WriteLine("----------"); - } - - - } - catch (Exception ex) - { - Console.WriteLine(ex.Message); - } - - } - Console.WriteLine("++++++++++++++++++++++"); - - Console.Write($"总共增加{cnt}"); - } - - // Console.WriteLine(""); - - - - - } - - _consumer.Commit(consumerRes); - // Console.WriteLine("\n================ {0} done "); - } - */ - - - - public async Task ProcessMsg() - { - - } - - - public IntPtr GetConnection() - { - string host = "47.116.142.20"; - short port = 6030; - string username = "root"; - string password = "taosdata"; - string dbname = "tmqdb"; - var conn = TDengine.Connect(host, username, password, dbname, port); - if (conn == IntPtr.Zero) - { - throw new Exception("Connect to TDengine failed"); - } - else - { - Console.WriteLine("Connect to TDengine success"); - } - return conn; - } - // 关闭消费者 - //public void CloseConsumer(IConsumer consumer, IntPtr conn) - //{ - // List topics = consumer.Subscription(); - // topics.ForEach(t => Console.WriteLine("topic name:{0}", t)); - - // // unsubscribe - // consumer.Unsubscribe(); - - // // close consumer after use.Otherwise will lead memory leak. - // consumer.Close(); - // TDengine.Close(conn); - //} - - public void CloseConsumer() - { - List topics = _consumer.Subscription(); - topics.ForEach(t => Console.WriteLine("topic name:{0}", t)); - - // unsubscribe - _consumer.Unsubscribe(); - - // close consumer after use.Otherwise will lead memory leak. - _consumer.Close(); - TDengine.Close(_conn); - } } }