From 6a2e2596358f772f92abda0336945097681d4120 Mon Sep 17 00:00:00 2001 From: H Vs Date: Mon, 15 Jul 2024 17:14:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8tdengine=20orm?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../HealthMonitor.Model.csproj | 2 + .../Service/Mapper/FetalHeartRateModel.cs | 65 ++ .../Service/Mapper/PregnancyHeartRateModel.cs | 64 ++ .../Biz/db/TDengineService.cs | 212 ++++++ .../HealthMonitor.Service.csproj | 2 + .../Sub/TDengineDataSubcribe.cs | 55 +- .../HealthMonitor/HmBloodPressController.cs | 28 + HealthMonitor.WebApi/Worker.cs | 700 +++++++++--------- 8 files changed, 741 insertions(+), 387 deletions(-) create mode 100644 HealthMonitor.Model/Service/Mapper/FetalHeartRateModel.cs create mode 100644 HealthMonitor.Model/Service/Mapper/PregnancyHeartRateModel.cs diff --git a/HealthMonitor.Model/HealthMonitor.Model.csproj b/HealthMonitor.Model/HealthMonitor.Model.csproj index f958b97..f4f1bf1 100644 --- a/HealthMonitor.Model/HealthMonitor.Model.csproj +++ b/HealthMonitor.Model/HealthMonitor.Model.csproj @@ -8,6 +8,8 @@ + + diff --git a/HealthMonitor.Model/Service/Mapper/FetalHeartRateModel.cs b/HealthMonitor.Model/Service/Mapper/FetalHeartRateModel.cs new file mode 100644 index 0000000..2bd2745 --- /dev/null +++ b/HealthMonitor.Model/Service/Mapper/FetalHeartRateModel.cs @@ -0,0 +1,65 @@ +using Newtonsoft.Json; +using SqlSugar; +using SqlSugar.TDengine; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Model.Service.Mapper +{ + [STableAttribute(STableName = "stb_hm_fetal_heart_rate_test")] + + public class FetalHeartRateModel + { + [JsonProperty("ts")] + [SqlSugar.SugarColumn(IsPrimaryKey = true, ColumnName = "ts", SqlParameterDbType = typeof(DateTime19))] + public DateTime Timestamp { get; set; } + + [JsonProperty("fetal_heart_rate_id")] + [SqlSugar.SugarColumn(ColumnName= "fetal_heart_rate_id")] + public string FetalHeartRateId { get; set; } = default!; + + [JsonProperty("message_id")] + [SqlSugar.SugarColumn(ColumnName = "message_id")] + public string MessageId { get; set; } = default!; + + + [JsonProperty("person_id")] + [SqlSugar.SugarColumn(ColumnName = "person_id")] + public string PersonId { get; set; } = default!; + + [JsonProperty("serialno")] + [SqlSugar.SugarColumn(ColumnName = "serialno")] + public string SerialNumber { get; set; } = default!; + + [JsonProperty("fetal_heart_rate")] + [SqlSugar.SugarColumn(ColumnName = "fetal_heart_rate")] + public int FetalHeartRate { get; set; } + + [JsonProperty("create_time")] + [SqlSugar.SugarColumn(ColumnName = "create_time")] + public DateTime CreateTime { get; set; } + + [JsonProperty("last_update")] + [SqlSugar.SugarColumn(ColumnName = "last_update")] + public DateTime LastUpdate { get; set; } + + [JsonProperty("method")] + [SqlSugar.SugarColumn(ColumnName = "method")] + public byte Method { get; set; } + + [JsonProperty("is_display")] + [SqlSugar.SugarColumn(ColumnName = "is_display")] + public bool IsDisplay { get; set; } + + [JsonProperty("device_key")] + [SqlSugar.SugarColumn(ColumnName = "device_key")] + public string DeviceKey { get; set; } = default!; + + [JsonProperty("serial_tail_no")] + [SqlSugar.SugarColumn(IsIgnore = true, ColumnName = "serial_tail_no")] + public string SerialTailNumber { get; set; } = default!; + } +} diff --git a/HealthMonitor.Model/Service/Mapper/PregnancyHeartRateModel.cs b/HealthMonitor.Model/Service/Mapper/PregnancyHeartRateModel.cs new file mode 100644 index 0000000..3fc749e --- /dev/null +++ b/HealthMonitor.Model/Service/Mapper/PregnancyHeartRateModel.cs @@ -0,0 +1,64 @@ +using Newtonsoft.Json; +using SqlSugar; +using SqlSugar.TDengine; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Model.Service.Mapper +{ + [STableAttribute(STableName = "stb_hm_pregnancy_heart_rate")] + public class PregnancyHeartRateModel + { + [JsonProperty("ts")] + [SqlSugar.SugarColumn(IsPrimaryKey = true, ColumnName = "ts", SqlParameterDbType = typeof(DateTime19))] + public DateTime Timestamp { get; set; } + + [JsonProperty("pregnancy_heart_rate_id")] + [SqlSugar.SugarColumn(ColumnName = "pregnancy_heart_rate_id")] + public string PregnancyHeartRateId { get; set; } = default!; + + [JsonProperty("message_id")] + [SqlSugar.SugarColumn(ColumnName = "message_id")] + public string MessageId { get; set; } = default!; + + + [JsonProperty("person_id")] + [SqlSugar.SugarColumn(ColumnName = "person_id")] + public string PersonId { get; set; } = default!; + + [JsonProperty("serialno")] + [SqlSugar.SugarColumn(ColumnName = "serialno")] + public string SerialNumber { get; set; } = default!; + + [JsonProperty("pregnancy_heart_rate")] + [SqlSugar.SugarColumn(ColumnName = "pregnancy_heart_rate")] + public int PregnancyHeartRate { get; set; } + + [JsonProperty("create_time")] + [SqlSugar.SugarColumn(ColumnName = "create_time")] + public DateTime CreateTime { get; set; } + + [JsonProperty("last_update")] + [SqlSugar.SugarColumn(ColumnName = "last_update")] + public DateTime LastUpdate { get; set; } + + [JsonProperty("method")] + [SqlSugar.SugarColumn(ColumnName = "method")] + public byte Method { get; set; } + + [JsonProperty("is_display")] + [SqlSugar.SugarColumn(ColumnName = "is_display")] + public bool IsDisplay { get; set; } + + [JsonProperty("device_key")] + [SqlSugar.SugarColumn(ColumnName = "device_key")] + public string DeviceKey { get; set; } = default!; + + [JsonProperty("serial_tail_no")] + [SqlSugar.SugarColumn(IsIgnore = true, ColumnName = "serial_tail_no")] + public string SerialTailNumber { get; set; } = default!; + } +} diff --git a/HealthMonitor.Service/Biz/db/TDengineService.cs b/HealthMonitor.Service/Biz/db/TDengineService.cs index bfc7062..e39bee1 100644 --- a/HealthMonitor.Service/Biz/db/TDengineService.cs +++ b/HealthMonitor.Service/Biz/db/TDengineService.cs @@ -9,14 +9,21 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using Newtonsoft.Json.Linq; +using SqlSugar; +using SqlSugar.DbConvert; +using SqlSugar.TDengine; using System; using System.Collections.Generic; +using System.Data; using System.Linq; +using System.Linq.Expressions; +using System.Reflection; using System.Text; using System.Threading.Tasks; using System.Xml.Linq; using TDengineDriver; using TDengineDriver.Impl; +using TDengineTMQ; namespace HealthMonitor.Service.Biz.db { @@ -26,14 +33,46 @@ namespace HealthMonitor.Service.Biz.db private readonly ILogger _logger; private readonly HttpHelper _httpHelper=default!; private readonly TDengineServiceConfig _configTDengineService; + private readonly SqlSugarClient _clientSqlSugar; public TDengineService(ILogger logger, IOptions configTDengineService, HttpHelper httpHelper + ) { _logger = logger; _configTDengineService = configTDengineService.Value; _httpHelper = httpHelper; + _clientSqlSugar = new SqlSugarClient(new ConnectionConfig() + { + DbType = SqlSugar.DbType.TDengine, + ConnectionString = $"Host={_configTDengineService.Host};Port={_configTDengineService.Port};Username={_configTDengineService.UserName};Password={_configTDengineService.Password};Database={_configTDengineService.DB};TsType=config_ns", + IsAutoCloseConnection = true, + AopEvents = new AopEvents + { + OnLogExecuting = (sql, p) => + { + Console.WriteLine(SqlSugar.UtilMethods.GetNativeSql(sql, p)); + } + }, + ConfigureExternalServices = new ConfigureExternalServices() + { + EntityService = (property, column) => + { + if (column.SqlParameterDbType == null) + { + column.SqlParameterDbType = typeof(CommonPropertyConvert); + } + } + }, + MoreSettings=new ConnMoreSettings() + { + PgSqlIsAutoToLower = false, + PgSqlIsAutoToLowerCodeFirst = false, + } + + }); + } public IntPtr Connection() { @@ -596,5 +635,178 @@ namespace HealthMonitor.Service.Biz.db return new decimal[] { (decimal)systolicAvg!, (decimal)diastolicAvg! }; } + + #region SqlSugarClient + public async Task InsertFetalHeartRateAsync() + { + var tableName = typeof(FetalHeartRateModel) + .GetCustomAttribute()? + .STableName; + + if (tableName == null) + { + throw new InvalidOperationException("STableAttribute not found on FetalHeartRateModel class."); + } + + await _clientSqlSugar.Ado.ExecuteCommandAsync($"create table IF NOT EXISTS hm_fhr_00 using {tableName} tags('00')"); + + _clientSqlSugar.Insertable(new FetalHeartRateModel() + { + Timestamp = DateTime.Now, + CreateTime = DateTime.Now, + FetalHeartRate = 90, + FetalHeartRateId = Guid.NewGuid().ToString("D"), + IsDisplay = false, + Method = 1, + PersonId = Guid.NewGuid().ToString("D"), + MessageId = Guid.NewGuid().ToString("D"), + SerialNumber = Guid.NewGuid().ToString("D"), + DeviceKey = Guid.NewGuid().ToString("D"), + SerialTailNumber = "00", + LastUpdate = DateTime.Now, + }).AS("hm_fhr_00").ExecuteCommand(); + + } + + public Task GetFirst() + { + var tableName = typeof(FetalHeartRateModel) + .GetCustomAttribute()? + .STableName; + + var first = _clientSqlSugar + .Queryable() + .AS(tableName) + .OrderByDescending(x => x.Timestamp).FirstAsync(); + return first; + } + public void InsertFetalHeartRate2() + { + //var insrtSql = ""; + //_clientSqlSugar.Ado.ExecuteCommand(insrtSql); + // 通过反射获取STableAttribute的STableName值 + var tableName = typeof(FetalHeartRateModel) + .GetCustomAttribute()? + .STableName; + + if (tableName == null) + { + throw new InvalidOperationException("STableAttribute not found on FetalHeartRateModel class."); + } + + _clientSqlSugar.Ado.ExecuteCommand($"create table IF NOT EXISTS hm_fhr_00 using {tableName} tags('00')"); + + //_clientSqlSugar.Ado.ExecuteCommand($"create table IF NOT EXISTS hm_fhr_00 using `stb_hm_fetal_heart_rate_test` tags('00')"); + _clientSqlSugar.Insertable(new FetalHeartRateModel() + { + Timestamp = DateTime.Now, + CreateTime = DateTime.Now, + FetalHeartRate = 90, + FetalHeartRateId = Guid.NewGuid().ToString("D"), + IsDisplay = false, + Method = 1, + PersonId = Guid.NewGuid().ToString("D"), + MessageId = Guid.NewGuid().ToString("D"), + SerialNumber = Guid.NewGuid().ToString("D"), + DeviceKey = Guid.NewGuid().ToString("D"), + SerialTailNumber = "00", + LastUpdate = DateTime.Now, + }).AS("hm_fhr_00").ExecuteCommand(); + + + //List rows = new(); + + //rows.Add(new FetalHeartRateModel() { + // Timestamp = new DateTime(2024, 1, 1), + // CreateTime = DateTime.Now, + // FetalHeartRate = 90, + // FetalHeartRateId = Guid.NewGuid().ToString("D"), + // IsDisplay = false, + // Method = 1, + // PersonId = Guid.NewGuid().ToString("D"), + // MessageId = Guid.NewGuid().ToString("D"), + // SerialNumber = Guid.NewGuid().ToString("D"), + // DeviceKey = Guid.NewGuid().ToString("D"), + // SerialTailNumber = "00", + // LastUpdate = DateTime.Now, + + //}); + + //_clientSqlSugar.Insertable(rows).AS("hm_fhr_00").ExecuteCommand(); + } + + /// + /// 插入记录 + /// + /// + /// + /// 子表名称 + /// + /// + public async Task InsertAsync(string tbName, T model) + { + var stbName = typeof(T) + .GetCustomAttribute()? + .STableName; + + if (stbName == null) + { + throw new InvalidOperationException($"STableAttribute not found on {nameof(T)} class."); + } + var tailNo = typeof(T).GetProperty("SerialTailNumber")?.GetValue(model)?.ToString(); + + if (string.IsNullOrEmpty(tailNo)) + { + throw new InvalidOperationException($"SerialNumberAttribute not found on {nameof(T)} class."); + } + var tbFullName = $"{tbName}_{tailNo}"; + await _clientSqlSugar.Ado.ExecuteCommandAsync($"create table IF NOT EXISTS {tbFullName} using {stbName} tags('{tailNo}')"); + + //_clientSqlSugar.InsertableByDynamic(model).AS(tbFullName).ExecuteCommand(); + _clientSqlSugar.InsertableByObject(model).AS(tbFullName).ExecuteCommand(); + } + + public Task GetFirstAsync() where T : class + { + var tableName = typeof(T) + .GetCustomAttribute()? + .STableName; + // 创建一个表示 Timestamp 属性的表达式 + var parameter = Expression.Parameter(typeof(T), "x"); + var property = Expression.Property(parameter, "Timestamp"); + var lambda = Expression.Lambda>(Expression.Convert(property, typeof(object)), parameter); + + + var first = _clientSqlSugar + .Queryable() + .AS(tableName) + .OrderByDescending(lambda).FirstAsync(); + return first; + } + + public async Task> GetBySerialNoAsync(string serialNo) where T : class + { + var tableName = typeof(T) + .GetCustomAttribute()? + .STableName; + // 创建表示 Timestamp 属性的表达式 + var parameter = Expression.Parameter(typeof(T), "x"); + var timestampProperty = Expression.Property(parameter, "Timestamp"); + var timestampLambda = Expression.Lambda>(Expression.Convert(timestampProperty, typeof(object)), parameter); + + // 创建表示 SerialNo 属性的表达式 + var serialNoProperty = Expression.Property(parameter, "SerialNumber"); + var serialNoConstant = Expression.Constant(serialNo); + var equalExpression = Expression.Equal(serialNoProperty, serialNoConstant); + var serialNoLambda = Expression.Lambda>(equalExpression, parameter); + + var res = await _clientSqlSugar + .Queryable() + .AS(tableName) + .Where(serialNoLambda) + .OrderByDescending(timestampLambda).ToListAsync(); + return res; + } + #endregion } } diff --git a/HealthMonitor.Service/HealthMonitor.Service.csproj b/HealthMonitor.Service/HealthMonitor.Service.csproj index fa312cb..af17312 100644 --- a/HealthMonitor.Service/HealthMonitor.Service.csproj +++ b/HealthMonitor.Service/HealthMonitor.Service.csproj @@ -11,6 +11,8 @@ + + diff --git a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs index 17b0ede..c5b3db4 100644 --- a/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs +++ b/HealthMonitor.Service/Sub/TDengineDataSubcribe.cs @@ -57,63 +57,25 @@ namespace HealthMonitor.Service.Sub } public void BeginListen(CancellationToken stoppingToken) { - //var cfg = new ConsumerConfig - //{ - // GourpId = "group_1", - // TDConnectUser = "root", - // TDConnectPasswd = "taosdata", - // MsgWithTableName = "true", - // TDConnectIp = "47.116.142.20", - //}; - //var conn = GetConnection(); - - - //ProcessMsg(consumer); - - //防止造成多线程运行 + _tokenSource?.Cancel(); - _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); DoTDengineConnect(); } public void DoTDengineConnect() { - // string host = _configTDengineService.Host; - // short port = 6030; - // string username = _configTDengineService.UserName; - // string password = _configTDengineService.Password; - // string dbname = _configTDengineService.DB; - ////#if DEBUG - //// //string configDir = "C:/TDengine/cfg"; - //// //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir); - //// TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing"); - ////#endif - // var conn = TDengine.Connect(host, username, password, dbname, port); - - // if (conn == IntPtr.Zero) - // { - // _logger.LogError("reason:{TDengine.Error(conn)}", TDengine.Error(conn)); - - // } - // else - // { - // _logger.LogInformation($"连接 TDengine 成功...."); - - // } - var conn = _serviceTDengine.Connection(); DoReceive(conn); } public void DoReceive(IntPtr Connection) { - - // string topic = "topic_hm_bp_stats"; + #region topichmbpstats 订阅 + // string topic = "topichmbpstats"; string topic = nameof(TopicHmBloodPress).ToLower(); TopicHmBloodPress fields = new(); PropertyInfo[] props = fields.GetType().GetProperties(); - // 获取 fields string attributes = ""; foreach (PropertyInfo prop in props) @@ -126,9 +88,16 @@ namespace HealthMonitor.Service.Sub } attributes = attributes.TrimEnd(','); - //create topic + //创建 topichmbpstats IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select {attributes} from health_monitor.stb_hm_bloodpress"); + //创建 topichmfetalheartrate + //var fetalHeartRateTopic = "topichmfetalheartrate"; + //var fetalHeartRateFields = "ts,fetal_heart_rate_id,message_id,person_id,serialno,fetal_heart_rate,create_time,method,last_update,is_display,device_key"; + //res = TDengine.Query(Connection, $"create topic if not exists {fetalHeartRateTopic} as select {fetalHeartRateFields} from health_monitor.stb_hm_fetal_heart_rate_test"); + #endregion + + if (TDengine.ErrorNo(res) != 0) { _logger.LogError($"create topic failed, reason:{TDengine.Error(res)}"); @@ -160,12 +129,10 @@ namespace HealthMonitor.Service.Sub { try { - IDictionary row = new Dictionary(); foreach (var meta in kv.Value.Metas) { int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1); - //var value = kv.Value.Datas[index]; row.Add(meta.name, kv.Value.Datas[index]); } diff --git a/HealthMonitor.WebApi/Controllers/HealthMonitor/HmBloodPressController.cs b/HealthMonitor.WebApi/Controllers/HealthMonitor/HmBloodPressController.cs index 3d0f2f6..de805ee 100644 --- a/HealthMonitor.WebApi/Controllers/HealthMonitor/HmBloodPressController.cs +++ b/HealthMonitor.WebApi/Controllers/HealthMonitor/HmBloodPressController.cs @@ -1,4 +1,5 @@ using HealthMonitor.Model.Config; +using HealthMonitor.Model.Service.Mapper; using HealthMonitor.Service.Biz.db; using HealthMonitor.WebApi.Configs; using HealthMonitor.WebApi.Controllers.Api; @@ -72,5 +73,32 @@ namespace HealthMonitor.WebApi.Controllers.HealthMonitor return ApiResponse.Fail(500, $"{reqModel.Serialno} -- {reqModel.MessageId} -- 血压数据插入失败"); } + [HttpGet] + public async Task Test() + { + // _serviceTDengine.InsertFetalHeartRate2(); + //await _serviceTDengine.InsertFetalHeartRateAsync(); + + //var test = new FetalHeartRateModel() + //{ + // Timestamp = DateTime.Now, + // CreateTime = DateTime.Now, + // FetalHeartRate = 122, + // FetalHeartRateId = Guid.NewGuid().ToString("D"), + // IsDisplay = false, + // Method = 1, + // PersonId = Guid.NewGuid().ToString("D"), + // MessageId = Guid.NewGuid().ToString("D"), + // SerialNumber = "864144050568123", + // DeviceKey = Guid.NewGuid().ToString("D"), + // SerialTailNumber = "23", + // LastUpdate = DateTime.Now, + //}; + //await _serviceTDengine.InsertAsync("hm_fhr", test); + //var first = _serviceTDengine.GetFirst(); + + var first = await _serviceTDengine.GetBySerialNoAsync("864144050568123"); + return Ok(first); + } } } diff --git a/HealthMonitor.WebApi/Worker.cs b/HealthMonitor.WebApi/Worker.cs index 6283a2a..0bfcc94 100644 --- a/HealthMonitor.WebApi/Worker.cs +++ b/HealthMonitor.WebApi/Worker.cs @@ -57,386 +57,342 @@ namespace HealthMonitor.WebApi public override Task StartAsync(CancellationToken cancellationToken) { - _logger.LogInformation("------StartAsync"); + //_logger.LogInformation("------StartAsync"); _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - // 创建消费者 - // _tdEngineDataSubcribe.CreateConsumer(); return base.StartAsync(cancellationToken); } public override Task StopAsync(CancellationToken cancellationToken) { - _logger.LogInformation("------StopAsync"); + //_logger.LogInformation("------StopAsync"); _tokenSource.Cancel(); //停止工作线程 - // 关闭消费者 - // _tdEngineDataSubcribe.CloseConsumer(); return base.StopAsync(cancellationToken); } - - protected override Task ExecuteAsync(CancellationToken stoppingToken) + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - // var processor = _services.GetService(); - TaskFactory factory = new(_tokenSource.Token); - factory.StartNew(async () => + var tasks = new[] { - if (_tokenSource.IsCancellationRequested) - _logger.LogWarning("Worker exit"); - - _logger.LogInformation("------ResolveAsync"); - while (!_tokenSource.IsCancellationRequested) + Task.Run(async () => { - // - await _processor.ResolveAsync().ConfigureAwait(false); - // await _tdEngineDataSubcribe.ProcessMsg(); - } - - }, TaskCreationOptions.LongRunning); + _logger.LogInformation("解析器启动"); + while (!stoppingToken.IsCancellationRequested) + { + await _processor.ResolveAsync().ConfigureAwait(false); + } + }, stoppingToken), - factory.StartNew(() => - { - _logger.LogInformation("------_tdEngineDataSubcribe"); - while (!_tokenSource.IsCancellationRequested) + Task.Run(() => { - Console.WriteLine("test"); - _tdEngineDataSubcribe.BeginListen(_tokenSource.Token); - } - }, TaskCreationOptions.LongRunning); - - Task.Run(() => - _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents) - , stoppingToken); - - // watch - //factory.StartNew(() => - //{ - // while (!_tokenSource.IsCancellationRequested) - // { - // //_serviceEtcd.WacthKeysWithPrefixAsync($"health_moniter/schedule_push", watchEvents => WatchEvents(watchEvents)); - // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents); - // } - //}, TaskCreationOptions.LongRunning); - - // _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents); - return Task.Delay(1000, _tokenSource.Token); - - } - - private void WatchEvents(WatchEvent[] response) - { - foreach (WatchEvent e1 in response) - { - // Console.WriteLine($"{nameof(WatchEventsAsync)} --- {e1.Key}:{e1.Value}:{e1.Type}"); + _logger.LogInformation("TDengine 订阅启动"); + while (!stoppingToken.IsCancellationRequested) + { + _tdEngineDataSubcribe.BeginListen(stoppingToken); + } + }, stoppingToken), - switch (e1.Type.ToString()) - { - //case "Put": - // // 获取时间点计算TTL - // break; + Task.Run(() => + _serviceEtcd.WacthKeysWithPrefixResponseAsync("health_moniter/schedule_push", WatchEvents), + stoppingToken) + }; - case "Delete": - // TTL到了重新计算TTL,下发 - Console.WriteLine($"--- {e1.Key}:{e1.Value}:{e1.Type}"); - break; - } - } + await Task.WhenAll(tasks); } - private void WatchEvents(WatchResponse response) { - - response.Events.ToList().ForEach(async e => + + response.Events.ToList().ForEach(async e => + { + try { - try + switch (e.Type.ToString()) { - switch (e.Type.ToString()) - { - case "Put": - // 获取时间点计算TTL - Console.BackgroundColor = ConsoleColor.Blue; - Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}"); - Console.BackgroundColor = ConsoleColor.Black; - break; - - case "Delete": - // TTL到了重新计算TTL,下发 - Console.BackgroundColor = ConsoleColor.Green; - Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}"); - - // var key = $"health_moniter/schedule_push/imei/{bp.Serialno}"; - var key = e.Kv.Key.ToStringUtf8(); - var imeiDel = key.Split('/')[3]; - var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false); - if (string.IsNullOrWhiteSpace(schedule_push)) + case "Put": + // 获取时间点计算TTL + Console.BackgroundColor = ConsoleColor.Blue; + Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}"); + Console.BackgroundColor = ConsoleColor.Black; + break; + + case "Delete": + // TTL到了重新计算TTL,下发 + Console.BackgroundColor = ConsoleColor.Green; + Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}"); + + // var key = $"health_moniter/schedule_push/imei/{bp.Serialno}"; + var key = e.Kv.Key.ToStringUtf8(); + var imeiDel = key.Split('/')[3]; + var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false); + if (string.IsNullOrWhiteSpace(schedule_push)) + { + int systolicInc; + int diastolicInc; + + int systolicRefValue; + int diastolicRefValue; + + decimal systolicAvg; + decimal diastolicAvg; + + int systolicMax = 0; + int diastolicMax = 0; + + // 统计时间 + //DateTime endTime = DateTime.Now; //测试 + DateTime statStartTime = DateTime.Now; + + + // 最小值 + int systolicMin = 0; + int diastolicMin = 0; + + // 偏移参数 + var avgOffset = 0.25M; + var systolicAvgOffset = avgOffset; + var diastolicAvgOffset = avgOffset; + + // 最后一次下发值 + int lastPushSystolicInc = 0; + int lastPushDiastolicInc = 0; + + + var startTime = DateTime.Now; + // 下发增量值 + #region 统计定时下发增量值 + //var last = await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc"); + //var ts = last?[0]; + + // 最后一条血压数据 + var condition = $"serialno='{imeiDel}' order by last_update desc"; + var field = "last_row(*)"; + var lastHmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition, field); + var lastHmBpParser = JsonConvert.DeserializeObject>(lastHmBpResponse!); + var lastHmBp = lastHmBpParser?.Select().FirstOrDefault(); + //if (lastHmBpParser?.Select()?.ToList().Count < 2) + //{ + // _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 血压数据条目不足"); + // break; + //} + + // 7 天有效数据 + if (lastHmBp?.Timestamp.AddDays(7) > DateTime.Now) { - int systolicInc; - int diastolicInc; - - int systolicRefValue; - int diastolicRefValue; - - decimal systolicAvg; - decimal diastolicAvg; - - int systolicMax = 0; - int diastolicMax = 0; - - // 统计时间 - //DateTime endTime = DateTime.Now; //测试 - DateTime statStartTime = DateTime.Now; - - - // 最小值 - int systolicMin = 0; - int diastolicMin = 0; - - // 偏移参数 - var avgOffset = 0.25M; - var systolicAvgOffset = avgOffset; - var diastolicAvgOffset = avgOffset; - - // 最后一次下发值 - int lastPushSystolicInc = 0; - int lastPushDiastolicInc = 0; - - - var startTime = DateTime.Now; - // 下发增量值 - #region 统计定时下发增量值 - //var last = await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc"); - //var ts = last?[0]; - - // 最后一条血压数据 - var condition = $"serialno='{imeiDel}' order by last_update desc"; - var field = "last_row(*)"; - var lastHmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition, field); - var lastHmBpParser = JsonConvert.DeserializeObject>(lastHmBpResponse!); - var lastHmBp = lastHmBpParser?.Select().FirstOrDefault(); - //if (lastHmBpParser?.Select()?.ToList().Count < 2) - //{ - // _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 血压数据条目不足"); - // break; - //} - - // 7 天有效数据 - if (lastHmBp?.Timestamp.AddDays(7) > DateTime.Now) + // 计算增量值 + condition = $"serialno='{imeiDel}' order by ts desc"; + var lastPushResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bp_push_ref_inc_value", condition, field); + if (lastPushResponse == null) + { + _logger.LogInformation($"{imeiDel}--没有下发记录"); + break; + } + var lastPushParser = JsonConvert.DeserializeObject>(lastPushResponse); + var lastPush = lastPushParser!.Select().FirstOrDefault(); + // 有下推记录 + if (lastPush != null) + { + systolicRefValue = lastPush!.SystolicRefValue; + diastolicRefValue = lastPush!.DiastolicRefValue; + lastPushSystolicInc = lastPush!.SystolicIncValue; + lastPushDiastolicInc = lastPush!.DiastolicIncValue; + condition = $"ts between '{lastPush?.Timestamp:yyyy-MM-dd HH:mm:ss.fff}' and '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " + + $"and serialno='{imeiDel}' " + + $"and is_display = true"; + // 使用最近一次的下推时间作为统计的开始时间 + statStartTime = lastPush!.Timestamp; + } + // 没有下推记录(历史遗留数据),没有初始的测量值产生的平均值(测量值=平均值) + else { - // 计算增量值 - condition = $"serialno='{imeiDel}' order by ts desc"; - var lastPushResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bp_push_ref_inc_value", condition, field); - if (lastPushResponse == null) + #region 获取个人信息 + + var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(Guid.NewGuid().ToString(), imeiDel).ConfigureAwait(false); + //验证这个信息是否存在 + if (person == null || person?.Person.BornDate == null) { - _logger.LogInformation($"{imeiDel}--没有下发记录"); + _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证个人信息,找不到个人信息,跳过此消息"); break; } - var lastPushParser = JsonConvert.DeserializeObject>(lastPushResponse); - var lastPush = lastPushParser!.Select().FirstOrDefault(); - // 有下推记录 - if (lastPush != null) + // 验证年龄是否在范围 (2 - 120) + var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!); + if (age < 2 || age > 120) { - systolicRefValue = lastPush!.SystolicRefValue; - diastolicRefValue = lastPush!.DiastolicRefValue; - lastPushSystolicInc = lastPush!.SystolicIncValue; - lastPushDiastolicInc = lastPush!.DiastolicIncValue; - condition = $"ts between '{lastPush?.Timestamp:yyyy-MM-dd HH:mm:ss.fff}' and '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " + - $"and serialno='{imeiDel}' " + - $"and is_display = true"; - // 使用最近一次的下推时间作为统计的开始时间 - statStartTime= lastPush!.Timestamp; + _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证年龄,不在范围 (2 - 120)岁,跳过此消息"); + break; } - // 没有下推记录(历史遗留数据),没有初始的测量值产生的平均值(测量值=平均值) - else - { - #region 获取个人信息 - var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(Guid.NewGuid().ToString(), imeiDel).ConfigureAwait(false); - //验证这个信息是否存在 - if (person == null || person?.Person.BornDate == null) - { - _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证个人信息,找不到个人信息,跳过此消息"); - break; - } - // 验证年龄是否在范围 (2 - 120) - var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!); - if (age < 2 || age > 120) - { - _logger.LogWarning($"{nameof(Worker)}--{imeiDel} 验证年龄,不在范围 (2 - 120)岁,跳过此消息"); - break; - } - - var gender = person?.Person.Gender == true ? 1 : 2; - var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!); - var height = SafeType.SafeDouble(person?.Person.Height!); - var weight = SafeType.SafeDouble(person?.Person.Weight!); - - #endregion - - #region 初始化常规血压标定值标定值 - var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension); - //systolicRefValue = bpRef!.Systolic;//? - //diastolicRefValue = bpRef!.Diastolic;//? - #endregion + var gender = person?.Person.Gender == true ? 1 : 2; + var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!); + var height = SafeType.SafeDouble(person?.Person.Height!); + var weight = SafeType.SafeDouble(person?.Person.Weight!); + + #endregion + + #region 初始化常规血压标定值标定值 + var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension); + //systolicRefValue = bpRef!.Systolic;//? + //diastolicRefValue = bpRef!.Diastolic;//? + #endregion + + systolicRefValue = bpRef!.Systolic; + diastolicRefValue = bpRef!.Diastolic; + lastPushSystolicInc = 0; + lastPushDiastolicInc = 0; + condition = $"ts <= '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " + + $"and serialno='{imeiDel}' " + + $"and is_display = true"; + } - systolicRefValue = bpRef!.Systolic; - diastolicRefValue = bpRef!.Diastolic; - lastPushSystolicInc = 0; - lastPushDiastolicInc = 0; - condition = $"ts <= '{startTime:yyyy-MM-dd HH:mm:ss.fff}' " + - $"and serialno='{imeiDel}' " + - $"and is_display = true"; - } + var hmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition); + var hmBpParser = JsonConvert.DeserializeObject>(hmBpResponse!); + var hmBp = hmBpParser?.Select(); + //if (hmBp?.ToList().Count < 2) + // 1. 判断数据样本数量 + if (hmBpParser!.Rows < 5) + { + _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 统计定时下发,计算增量值的数据条目不足:{hmBpParser!.Rows} < 5"); + _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} 没有足够的数据样本,不会定时下发"); + break; + } + // 没有下推记录重新计算统计时间 + if (lastPush == null) + { + var firstHmBp = hmBpParser?.Select(i => i).OrderBy(i => i.Timestamp).FirstOrDefault(); + statStartTime = firstHmBp!.Timestamp; + } - var hmBpResponse = await _serviceTDengine.ExecuteSelectRestResponseAsync("stb_hm_bloodpress", condition); - var hmBpParser = JsonConvert.DeserializeObject>(hmBpResponse!); - var hmBp = hmBpParser?.Select(); - //if (hmBp?.ToList().Count < 2) - // 1. 判断数据样本数量 - if (hmBpParser!.Rows < 5) - { - _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} -- 统计定时下发,计算增量值的数据条目不足:{hmBpParser!.Rows} < 5"); - _logger.LogInformation($"{imeiDel} -- {nameof(Worker)} --{nameof(WatchEvents)} 没有足够的数据样本,不会定时下发"); - break; - } - // 没有下推记录重新计算统计时间 - if (lastPush == null) - { - var firstHmBp = hmBpParser?.Select(i=>i).OrderBy(i=>i.Timestamp).FirstOrDefault(); - statStartTime = firstHmBp!.Timestamp; - } + // NewMethod(systolicRefValue, hmBpParser); - // NewMethod(systolicRefValue, hmBpParser); + // 最大值 + //systolicMax = (int)hmBpParser?.Select(i => i.SystolicValue).Max()!; + //diastolicMax = (int)hmBpParser?.Select(i => i.DiastolicValue).Max()!; + //// 最小值 + //systolicMin = (int)hmBpParser?.Select(i => i.SystolicValue).Min()!; + //diastolicMin = (int)hmBpParser?.Select(i => i.DiastolicValue).Min()!; - // 最大值 - //systolicMax = (int)hmBpParser?.Select(i => i.SystolicValue).Max()!; - //diastolicMax = (int)hmBpParser?.Select(i => i.DiastolicValue).Max()!; - //// 最小值 - //systolicMin = (int)hmBpParser?.Select(i => i.SystolicValue).Min()!; - //diastolicMin = (int)hmBpParser?.Select(i => i.DiastolicValue).Min()!; + //systolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.SystolicValue, SafeType.SafeInt(systolicRefValue!)))!; + //diastolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.DiastolicValue, SafeType.SafeInt(diastolicRefValue!)))!; - //systolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.SystolicValue, SafeType.SafeInt(systolicRefValue!)))!; - //diastolicAvg = (int)(hmBpParser?.AverageAfterRemovingOneMinMaxRef(i => i.DiastolicValue, SafeType.SafeInt(diastolicRefValue!)))!; + var avgs = _serviceTDengine.AverageAfterRemovingOneMinMaxRef(hmBpParser!); + systolicAvg = avgs[0]; + diastolicAvg = avgs[1]; - var avgs = _serviceTDengine.AverageAfterRemovingOneMinMaxRef(hmBpParser!); - systolicAvg = avgs[0]; - diastolicAvg = avgs[1]; + // 2. 判断能否计算增量值 + if (systolicAvg.Equals(0)) + { + _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算平均值" + + $"\n currentSystolicAvg:{systolicAvg} -- lastPushSystolicInc:{lastPushSystolicInc}" + + $"\n currentDiastolicInc:{diastolicAvg} -- lastPushDiastolicInc:{lastPushDiastolicInc}"); + _logger.LogInformation($"{imeiDel}--{nameof(Worker)} 没有足够的数据样本计算平均值,不会定时下发"); + break; + } + // 除最大值和最小值后的平均值与标定值差值少于4后(当天计算出该结果则也不产生增量调整),就不再进行增量值调整了。 + if (systolicRefValue - systolicAvg < 4) + { + _logger.LogInformation($"diastolic 舒张压 {imeiDel}除最大值和最小值后的平均值:{diastolicAvg}与标定值:{diastolicRefValue}\n systolic 收缩压 {imeiDel}除最大值和最小值后的平均值:{systolicAvg}与标定值:{systolicRefValue}的差值(标定值-平均值)少于4后,systolic 收缩压 不再进行增量值调整"); + break; + } - // 2. 判断能否计算增量值 - if (systolicAvg.Equals(0)) - { - _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算平均值" + - $"\n currentSystolicAvg:{systolicAvg} -- lastPushSystolicInc:{lastPushSystolicInc}" + - $"\n currentDiastolicInc:{diastolicAvg} -- lastPushDiastolicInc:{lastPushDiastolicInc}"); - _logger.LogInformation($"{imeiDel}--{nameof(Worker)} 没有足够的数据样本计算平均值,不会定时下发"); - break; - } - // 除最大值和最小值后的平均值与标定值差值少于4后(当天计算出该结果则也不产生增量调整),就不再进行增量值调整了。 - if (systolicRefValue - systolicAvg < 4) - { - _logger.LogInformation($"diastolic 舒张压 {imeiDel}除最大值和最小值后的平均值:{diastolicAvg}与标定值:{diastolicRefValue}\n systolic 收缩压 {imeiDel}除最大值和最小值后的平均值:{systolicAvg}与标定值:{systolicRefValue}的差值(标定值-平均值)少于4后,systolic 收缩压 不再进行增量值调整"); - break; - } + if (diastolicRefValue - diastolicAvg < 4) + { + _logger.LogInformation($"systolic 收缩压 {imeiDel}除最大值和最小值后的平均值:{systolicAvg}与标定值:{systolicRefValue}\n diastolic 舒张压 {imeiDel}除最大值和最小值后的平均值:{diastolicAvg}与标定值:{diastolicRefValue}的差值(标定值-平均值)少于4后,diastolic 舒张压 不再进行增量值调整"); + break; + } - if (diastolicRefValue - diastolicAvg < 4) - { - _logger.LogInformation($"systolic 收缩压 {imeiDel}除最大值和最小值后的平均值:{systolicAvg}与标定值:{systolicRefValue}\n diastolic 舒张压 {imeiDel}除最大值和最小值后的平均值:{diastolicAvg}与标定值:{diastolicRefValue}的差值(标定值-平均值)少于4后,diastolic 舒张压 不再进行增量值调整"); - break; - } + // 增量值=(标定值-平均值)* 0.25 + var currentSystolicInc = (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!; + var currentDiastolicInc = (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!; - // 增量值=(标定值-平均值)* 0.25 - var currentSystolicInc = (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!; - var currentDiastolicInc = (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!; + // 累计增量 + systolicInc = currentSystolicInc + lastPushSystolicInc; + diastolicInc = currentDiastolicInc + lastPushDiastolicInc; - // 累计增量 - systolicInc = currentSystolicInc + lastPushSystolicInc; - diastolicInc = currentDiastolicInc + lastPushDiastolicInc; + _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算增量值" + + $"\n {imeiDel} -- systolicAvg:{systolicAvg}-- systolicInc:{systolicInc}-- currentSystolicInc:{currentSystolicInc} -- lastPushSystolicInc:{lastPushSystolicInc}" + + $"\n {imeiDel} -- diastolicAvg:{diastolicAvg}-- diastolicInc:{diastolicInc} --currentDiastolicInc:{currentDiastolicInc} -- lastPushDiastolicInc:{lastPushDiastolicInc}"); + _logger.LogInformation($"{imeiDel}--{nameof(Worker)}-- 定时校准,发给设备的绝对增量值=(上次绝对增量值+新数据的增量值)"); - _logger.LogInformation($"{imeiDel}--{nameof(Worker)}--计算增量值" + - $"\n {imeiDel} -- systolicAvg:{systolicAvg}-- systolicInc:{systolicInc}-- currentSystolicInc:{currentSystolicInc} -- lastPushSystolicInc:{lastPushSystolicInc}" + - $"\n {imeiDel} -- diastolicAvg:{diastolicAvg}-- diastolicInc:{diastolicInc} --currentDiastolicInc:{currentDiastolicInc} -- lastPushDiastolicInc:{lastPushDiastolicInc}"); - _logger.LogInformation($"{imeiDel}--{nameof(Worker)}-- 定时校准,发给设备的绝对增量值=(上次绝对增量值+新数据的增量值)"); + _logger.LogInformation($"{nameof(Worker)} 开启血压标定值下发: {_configBoodPressResolver.EnableBPRefPush}"); + if (_configBoodPressResolver.EnableBPRefPush) + // if (false) // 临时关闭 + { + BloodPressCalibrationConfigModel bpIncData = new() + { - _logger.LogInformation($"{nameof(Worker)} 开启血压标定值下发: {_configBoodPressResolver.EnableBPRefPush}"); - if (_configBoodPressResolver.EnableBPRefPush) - // if (false) // 临时关闭 + Imei = imeiDel, + SystolicRefValue = SafeType.SafeInt(((int)systolicRefValue!)), //收缩压标定值,值为0 表示不生效 + DiastolicRefValue = SafeType.SafeInt(((int)diastolicRefValue!)), //舒张压标定值,值为0表示不生效 + SystolicIncValue = SafeType.SafeInt(((int)systolicInc!)), //收缩压显示增量,值为0 表示不生效 + DiastolicIncValue = SafeType.SafeInt(((int)diastolicInc!)) //舒张压显示增量,值为0 表示不生效 + }; + //var pushedBP = await _serviceIotWebApi.SetBloodPressCalibrationConfigAsync(bpIncData).ConfigureAwait(false); + var response = await _serviceIotWebApi.SetBloodPressCalibrationConfig2Async(bpIncData).ConfigureAwait(false); + var pushedBP = response.Flag; + if (pushedBP) { - BloodPressCalibrationConfigModel bpIncData = new() - { + #region 保存下推记录 stb_hm_bp_push_ref_inc_value + var sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{imeiDel.Substring(imeiDel.Length - 2)} " + + $"USING health_monitor.stb_hm_bp_push_ref_inc_value " + + $"TAGS ('{imeiDel.Substring(imeiDel.Length - 2)}') " + + $"VALUES(" + + $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'," + + $"'{imeiDel}'," + + $"{bpIncData.SystolicRefValue}," + + $"{bpIncData.DiastolicRefValue}," + + $"{bpIncData.SystolicIncValue}," + + $"{bpIncData.DiastolicIncValue}," + + $"{false}," + + $"{systolicAvg}," + + $"{diastolicAvg}," + + $"{systolicAvgOffset}," + + $"{diastolicAvgOffset}," + + $"'{statStartTime:yyyy-MM-dd HH:mm:ss.fff}'," + + $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'" + + $")"; + _serviceTDengine.ExecuteInsertSQL(sql); + #endregion - Imei = imeiDel, - SystolicRefValue = SafeType.SafeInt(((int)systolicRefValue!)), //收缩压标定值,值为0 表示不生效 - DiastolicRefValue = SafeType.SafeInt(((int)diastolicRefValue!)), //舒张压标定值,值为0表示不生效 - SystolicIncValue = SafeType.SafeInt(((int)systolicInc!)), //收缩压显示增量,值为0 表示不生效 - DiastolicIncValue = SafeType.SafeInt(((int)diastolicInc!)) //舒张压显示增量,值为0 表示不生效 - }; - //var pushedBP = await _serviceIotWebApi.SetBloodPressCalibrationConfigAsync(bpIncData).ConfigureAwait(false); - var response = await _serviceIotWebApi.SetBloodPressCalibrationConfig2Async(bpIncData).ConfigureAwait(false); - var pushedBP = response.Flag; - if (pushedBP) - { - #region 保存下推记录 stb_hm_bp_push_ref_inc_value - var sql = $"INSERT INTO health_monitor.hm_bp_push_ref_inc_value_{imeiDel.Substring(imeiDel.Length - 2)} " + - $"USING health_monitor.stb_hm_bp_push_ref_inc_value " + - $"TAGS ('{imeiDel.Substring(imeiDel.Length - 2)}') " + - $"VALUES(" + - $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'," + - $"'{imeiDel}'," + - $"{bpIncData.SystolicRefValue}," + - $"{bpIncData.DiastolicRefValue}," + - $"{bpIncData.SystolicIncValue}," + - $"{bpIncData.DiastolicIncValue}," + - $"{false}," + - $"{systolicAvg}," + - $"{diastolicAvg}," + - $"{systolicAvgOffset}," + - $"{diastolicAvgOffset}," + - $"'{statStartTime:yyyy-MM-dd HH:mm:ss.fff}'," + - $"'{startTime:yyyy-MM-dd HH:mm:ss.fff}'" + - $")"; - _serviceTDengine.ExecuteInsertSQL(sql); - #endregion - - #region 注册定时下发 - // 注册下次下推 - var endTime = DateTime.Now; + #region 注册定时下发 + // 注册下次下推 + var endTime = DateTime.Now; #if DEBUG - //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000); - //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false); + //long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000); + //await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false); - var interval = 0; - // 获取当前时间 - DateTime now = DateTime.Now; + var interval = 0; + // 获取当前时间 + DateTime now = DateTime.Now; - // 计算距离下一个$interval天后的8点的时间间隔 - DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval); - TimeSpan timeUntilNextRun = nextRunTime - now; + // 计算距离下一个$interval天后的8点的时间间隔 + DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute + 2, 0).AddDays(interval); + TimeSpan timeUntilNextRun = nextRunTime - now; - // 如果当前时间已经超过了8点,将等待到明天后的8点 - if (timeUntilNextRun < TimeSpan.Zero) - { - timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1)); - nextRunTime += timeUntilNextRun; - } + // 如果当前时间已经超过了8点,将等待到明天后的8点 + if (timeUntilNextRun < TimeSpan.Zero) + { + timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromMinutes(1)); + nextRunTime += timeUntilNextRun; + } - // var ttl = timeUntilNextRun.TotalMilliseconds; - long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000); - var data = new - { - imei = imeiDel, - create_time = now.ToString("yyyy-MM-dd HH:mm:ss"), - ttl, - next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss") - }; - var result = JsonConvert.SerializeObject(data); + // var ttl = timeUntilNextRun.TotalMilliseconds; + long ttl = (long)((timeUntilNextRun.TotalMilliseconds - (endTime - startTime).TotalMilliseconds) / 1000); + var data = new + { + imei = imeiDel, + create_time = now.ToString("yyyy-MM-dd HH:mm:ss"), + ttl, + next_run_time = nextRunTime.ToString("yyyy-MM-dd HH:mm:ss") + }; + var result = JsonConvert.SerializeObject(data); - await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false); + await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false); #else @@ -468,35 +424,93 @@ namespace HealthMonitor.WebApi var result = JsonConvert.SerializeObject(data); await _serviceEtcd.PutValAsync(key, result, ttl, false).ConfigureAwait(false); #endif - #endregion + #endregion - } - else - { - _logger.LogInformation($"错误响应,没有下推数据:{response.Message}"); - } + } + else + { + _logger.LogInformation($"错误响应,没有下推数据:{response.Message}"); } } - else - { - _logger.LogInformation($"向{imeiDel}统计数据已经失效"); - } - #endregion + } + else + { + _logger.LogInformation($"向{imeiDel}统计数据已经失效"); + } + #endregion - } + } - break; - } + break; } - catch (Exception ex) - { - _logger.LogInformation($"{nameof(WatchEvents)},出错: |{ex.Message}|{ex.StackTrace}"); - } - + } + catch (Exception ex) + { + _logger.LogInformation($"{nameof(WatchEvents)},出错: |{ex.Message}|{ex.StackTrace}"); + } + + + }); + + } + /** + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + TaskFactory factory = new(_tokenSource.Token); + factory.StartNew(async () => + { + if (_tokenSource.IsCancellationRequested) + _logger.LogWarning("Worker exit"); + + _logger.LogInformation("------ResolveAsync"); + while (!_tokenSource.IsCancellationRequested) + { + + await _processor.ResolveAsync().ConfigureAwait(false); + } + + }, TaskCreationOptions.LongRunning); + + factory.StartNew(() => + { + _logger.LogInformation("------_tdEngineDataSubcribe"); + while (!_tokenSource.IsCancellationRequested) + { + _tdEngineDataSubcribe.BeginListen(_tokenSource.Token); + } + }, TaskCreationOptions.LongRunning); + + Task.Run(() => + _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents) + , stoppingToken); + return Task.Delay(1000, _tokenSource.Token); + + } + **/ + + /** + private void WatchEvents(WatchEvent[] response) + { + foreach (WatchEvent e1 in response) + { + // Console.WriteLine($"{nameof(WatchEventsAsync)} --- {e1.Key}:{e1.Value}:{e1.Type}"); + + switch (e1.Type.ToString()) + { + //case "Put": + // // 获取时间点计算TTL + // break; - }); - + case "Delete": + // TTL到了重新计算TTL,下发 + Console.WriteLine($"--- {e1.Key}:{e1.Value}:{e1.Type}"); + break; + } + } } + */ + + } }