using HealthMonitor.Common; using HealthMonitor.Common.helper; using HealthMonitor.Model.Config; using HealthMonitor.Model.Service.Mapper; using HealthMonitor.Service.Biz.db.Dto; using HealthMonitor.Util.Models; using Microsoft.EntityFrameworkCore.Metadata.Internal; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using SqlSugar; using SqlSugar.DbConvert; using SqlSugar.TDengine; using System; using System.Collections.Generic; using System.Data; using System.Linq; using System.Linq.Expressions; using System.Reflection; using System.Text; using System.Threading.Tasks; using System.Xml.Linq; using TDengineDriver; using TDengineDriver.Impl; using TDengineTMQ; using HealthMonitor.Service.Cache; using System.Text.RegularExpressions; using Etcdserverpb; using static Microsoft.EntityFrameworkCore.DbLoggerCategory; using HealthMonitor.Core.Pipeline; using static Dm.net.buffer.ByteArrayBuffer; namespace HealthMonitor.Service.Biz.db { public class TDengineService { private readonly ILogger _logger; private readonly HttpHelper _httpHelper=default!; private readonly TDengineServiceConfig _configTDengineService; private readonly SqlSugarClient _clientSqlSugar; private readonly FhrPhrMapCacheManager _mgrFhrPhrMapCache; private readonly DeviceCacheManager _deviceCacheMgr; public TDengineService(ILogger logger, IOptions configTDengineService, HttpHelper httpHelper, FhrPhrMapCacheManager fhrPhrMapCacheManager, DeviceCacheManager deviceCacheMgr ) { _logger = logger; _configTDengineService = configTDengineService.Value; _httpHelper = httpHelper; _mgrFhrPhrMapCache = fhrPhrMapCacheManager; _deviceCacheMgr = deviceCacheMgr; _clientSqlSugar = new SqlSugarClient(new ConnectionConfig() { DbType = SqlSugar.DbType.TDengine, ConnectionString = $"Host={_configTDengineService.Host};Port={_configTDengineService.Port};Username={_configTDengineService.UserName};Password={_configTDengineService.Password};Database={_configTDengineService.DB};TsType=config_ns", IsAutoCloseConnection = true, AopEvents = new AopEvents { OnLogExecuting = (sql, p) => { Console.WriteLine(SqlSugar.UtilMethods.GetNativeSql(sql, p)); } }, ConfigureExternalServices = new ConfigureExternalServices() { EntityService = (property, column) => { if (column.SqlParameterDbType == null) { column.SqlParameterDbType = typeof(CommonPropertyConvert); } } }, MoreSettings=new ConnMoreSettings() { PgSqlIsAutoToLower = false, PgSqlIsAutoToLowerCodeFirst = false, } }); } public IntPtr Connection() { string host = _configTDengineService.Host; string user = _configTDengineService.UserName; string db = _configTDengineService.DB; short port = _configTDengineService.Port; string password = _configTDengineService.Password; //#if DEBUG // //string configDir = "C:/TDengine/cfg"; // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir); // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing"); //#endif IntPtr conn = TDengine.Connect(host, user, password, db, port); if (conn == IntPtr.Zero) { _logger.LogError($"连接 TDengine 失败...."); } else { _logger.LogInformation($"连接 TDengine 成功...."); } return conn; } public void ExecuteSQL(IntPtr conn, string sql) { IntPtr res = TDengine.Query(conn, sql); // Check if query success if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0)) { Console.Write(sql + " failure, "); // Get error message while Res is a not null pointer. if (res != IntPtr.Zero) { Console.Write("reason:" + TDengine.Error(res)); } } else { Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res)); //... do something with res ... // Important: need to free result to avoid memory leak. TDengine.FreeResult(res); } } public void ExecuteQuerySQL(IntPtr conn, string sql) { IntPtr res = TDengine.Query(conn, sql); // Check if query success if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0)) { Console.Write(sql + " failure, "); // Get error message while Res is a not null pointer. if (res != IntPtr.Zero) { Console.Write("reason:" + TDengine.Error(res)); } } else { Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res)); //... do something with res ... List resMeta = LibTaos.GetMeta(res); List resData = LibTaos.GetData(res); foreach (var meta in resMeta) { _logger.LogInformation("\t|{meta.name} {meta.TypeName()} ({meta.size})\t|", meta.name, meta.TypeName(), meta.size); } for (int i = 0; i < resData.Count; i++) { _logger.LogInformation($"|{resData[i].ToString()} \t"); if (((i + 1) % resMeta.Count == 0)) { _logger.LogInformation(""); } } // Important: need to free result to avoid memory leak. TDengine.FreeResult(res); } } public void CheckRes(IntPtr conn, IntPtr res, String errorMsg) { if (TDengine.ErrorNo(res) != 0) { throw new Exception($"{errorMsg} since: {TDengine.Error(res)}"); } } public void ExecuteInsertSQL(string sql) { var conn = Connection(); try { //sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " + // "d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " + // "d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " + // "d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)"; //#if DEBUG // //string configDir = "C:/TDengine/cfg"; // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir); // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing"); //#endif _logger.LogInformation($"Insert SQL: {sql}"); IntPtr res = TDengine.Query(conn, sql); CheckRes(conn, res, "failed to insert data"); int affectedRows = TDengine.AffectRows(res); _logger.LogInformation("affectedRows {affectedRows}" , affectedRows); TDengine.FreeResult(res); } finally { TDengine.Close(conn); } } #region TDengine.Connector async query public void QueryCallback(IntPtr param, IntPtr taosRes, int code) { if (code == 0 && taosRes != IntPtr.Zero) { FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback); TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param); } else { _logger.LogInformation("async query data failed, failed code {code}",code); } } // Iteratively call this interface until "numOfRows" is no greater than 0. public void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows) { if (numOfRows > 0) { _logger.LogInformation("{numOfRows} rows async retrieved", numOfRows); IntPtr pdata = TDengine.GetRawBlock(taosRes); List metaList = TDengine.FetchFields(taosRes); List dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows); for (int i = 0; i < metaList.Count; i++) { _logger.LogInformation("{0} {1}({2}) \t|", metaList[i].name, metaList[i].type, metaList[i].size); } _logger.LogInformation(""); for (int i = 0; i < dataList.Count; i++) { if (i != 0 && i % metaList.Count == 0) { _logger.LogInformation("{dataList[i]}\t|", dataList[i]); } _logger.LogInformation("{dataList[i]}\t|", dataList[i]); } TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param); } else { if (numOfRows == 0) { _logger.LogInformation("async retrieve complete."); } else { _logger.LogInformation("FetchRawBlockCallback callback error, error code {numOfRows}", numOfRows); } TDengine.FreeResult(taosRes); } } public void ExecuteQueryAsync(string sql) { var conn = Connection(); QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback); TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero); } //public void ExecuteQuery(string sql) //{ // var conn = Connection(); // QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback); // TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero); //} public Aggregate GetAggregateValue(string field, string tbName, string? condition) { List data = new(); var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var conn = Connection(); try { IntPtr res = TDengine.Query(conn, sql); // Check if query success if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0)) { Console.Write(sql + " failure, "); // Get error message while Res is a not null pointer. if (res != IntPtr.Zero) { Console.Write("reason:" + TDengine.Error(res)); } } else { Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res)); //... do something with res ... List resMeta = LibTaos.GetMeta(res); List resData = LibTaos.GetData(res); foreach (var meta in resMeta) { Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|"); } resData.ForEach(x => data.Add(SafeType.SafeInt(x))); // Important: need to free result to avoid memory leak. TDengine.FreeResult(res); } } finally { TDengine.Close(conn); } return new Aggregate { Max = data.Count.Equals(0) ? 0 : data[0], Min = data.Count.Equals(0) ? 0 : data[1], }; } public int GetAvgExceptMaxMinValue(string field, string tbName, string? condition) { List data = new(); var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var aggregate= GetAggregateValue(field, tbName, condition); var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < {aggregate.Max} and {field} > {aggregate.Min}"; var conn = Connection(); try { IntPtr res = TDengine.Query(conn, sqlAvg); // Check if query success if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0)) { Console.Write(sqlAvg + " failure, "); // Get error message while Res is a not null pointer. if (res != IntPtr.Zero) { Console.Write("reason:" + TDengine.Error(res)); } } else { Console.Write(sqlAvg + " success, {0} rows affected", TDengine.AffectRows(res)); //... do something with res ... List resMeta = LibTaos.GetMeta(res); List resData = LibTaos.GetData(res); foreach (var meta in resMeta) { Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|"); } resData.ForEach(x => data.Add(SafeType.SafeInt(x))); // Important: need to free result to avoid memory leak. TDengine.FreeResult(res); } } finally { TDengine.Close(conn); } return data.Count.Equals(0) ? 0 : data[0]; } #endregion #region RestAPI public async Task ExecuteQuerySQLRestResponse(string sql) { var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}"; List> headers = new() { new KeyValuePair("Authorization", "Basic " + _configTDengineService.Token) }; var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false); return result; } public async Task ExecuteSelectRestResponseAsync( string tbName, string condition="1", string field = "*") { var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}"; var sql = $"SELECT {field} FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; List> headers = new() { new KeyValuePair("Authorization", "Basic " + _configTDengineService.Token) }; _logger.LogInformation($"{nameof(ExecuteSelectRestResponseAsync)} --- SQL 语句执行 {sql}"); var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false); return result; } public async Task GernalRestSql(string sql) { //"http://{server}:{port}/rest/sql/{db}" var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}"; List> headers = new() { new KeyValuePair("Authorization", "Basic " + _configTDengineService.Token) }; var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false); var res = JsonConvert.DeserializeObject(result!); if (result != null) { if (res?.Code == 0) { _logger.LogInformation($"{nameof(GernalRestSql)},SQL 语句执行成功|{sql}"); return true; } else { _logger.LogWarning($"{nameof(GernalRestSql)},SQL 语句执行失败||{sql}"); return false; } } else { _logger.LogError($"{nameof(GernalRestSql)},TDengine 服务器IP:{_configTDengineService.Host} 错误,请联系运维人员"); return false; } //return res.Code==0; } public async Task GernalRestSqlResTextAsync(string sql) { _logger.LogInformation($"执行 SQL: {nameof(GernalRestSqlResTextAsync)}--{sql}"); var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}"; List> headers = new() { new KeyValuePair("Authorization", "Basic " + _configTDengineService.Token) }; var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false); return result; } /** 取消 /// /// 最大值,最小值聚合 /// /// /// /// /// public async Task GetAggregateValueAsync(string field,string tbName,string? condition) { var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var result = await GernalRestSqlResTextAsync(sql); var res = JsonConvert.DeserializeObject(result!); List data = res?.Data!; return new Aggregate { Max = data.Count.Equals(0) ? 0 : data[0][0], Min = data.Count.Equals(0) ? 0 : data[0][1], }; } /// /// 去除最大值和最小值后的平均值 /// /// /// /// /// public async Task GetAvgExceptMaxMinValueAsync(string field, string tbName, string? condition) { var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var result = await GernalRestSqlResTextAsync(sql); var res = JsonConvert.DeserializeObject(result!); _logger.LogInformation($"最大小值:{sql}"); List data = res?.Data!; var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < { (data.Count.Equals(0)? 0: data[0][0]) } and {field} > {(data.Count.Equals(0) ? 0 : data[0][1])}"; result = await GernalRestSqlResTextAsync(sqlAvg); _logger.LogInformation($"sqlAvg:{sqlAvg}"); res = JsonConvert.DeserializeObject(result!); data = res?.Data!; return data.Count.Equals(0)?0:(int)data[0][0]; } /// /// 获取最后的记录 /// /// /// /// public async Task GetLastAsync(string tbName, string? condition) { var sql = $"SELECT last_row(*) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var result = await GernalRestSqlResTextAsync(sql); var res = JsonConvert.DeserializeObject(result!); if(res?.Data?.Count==0) return null; List data = res?.Data!; return data[0] as JArray; } public async Task GetCount(string tbName, string? condition) { var sql = $"SELECT count(ts) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var result = await GernalRestSqlResTextAsync(sql); var res = JsonConvert.DeserializeObject(result!); List data = res?.Data!; return SafeType.SafeInt(data[0][0]); } */ #endregion /// /// 平均值算法(去除最大值,最小值和大于标定值的平均值) /// /// /// /// /// /// public static decimal AverageAfterRemovingOneMinMaxRef(List collection, int max, int min,int refValue) { collection.Remove(max); collection.Remove(min); collection.RemoveAll(_ => _ > refValue); if (collection.Count < 2) { throw new ArgumentException($"数据集{collection.ToArray()},去掉一个最大值 {max}和一个最小值{min},异常值(大于标定值{refValue}),后数据值不足"); } return (decimal)collection.Average(x => x); //var values = ParseData.Select(valueSelector).ToList(); //collection = values.Select(i => (int)i).ToArray(); //if (values.Count <= 2) //{ // throw new ArgumentException("Not enough elements to remove."); //} //// Remove the specified number of minimum and maximum values ////values.RemoveAll(_ => _ == values.Min()); ////values.RemoveAll(_ => _ == values.Max()); //max = (int)values.Max(); //min = (int)values.Min(); //values.Remove(max); //values.Remove(min); //// Remove values less than the specified threshold //values.RemoveAll(_ => _ > numToRemove); //// Calculate and return the average of the remaining values //return values.Average(); } /// /// 去除最大值和最小值各一个(列表的头和尾),再去除异常值 /// /// /// /// public decimal[] AverageAfterRemovingOneMinMaxRef(int systolicRefValue, ParseTDengineRestResponse? hmBpParser) { var sortedList = hmBpParser?.Select(i => i) .Where(i => i.IsDisplay.Equals(true)) .OrderByDescending(i => i.SystolicValue) .ThenByDescending(i => i.DiastolicValue) .ToList(); //_logger.LogInformation($"计算时间段排列数据集:{JsonConvert.SerializeObject(sortedList)}"); // 去除最大值和最小值各一个(列表的头和尾) var trimmedList = sortedList? .Skip(1) .Take(sortedList.Count - 2) .ToList(); //_logger.LogInformation($"计算去除最大值和最小值各一个数据集:{JsonConvert.SerializeObject(trimmedList)}"); // 去除异常值 var filteredList = trimmedList?.Where(bp => bp.SystolicValue < SafeType.SafeInt(systolicRefValue!)).ToList(); //_logger.LogInformation($"计算除异常值个数据集:{JsonConvert.SerializeObject(filteredList)}"); if (filteredList?.Count < 2) { // throw new ArgumentException("数据不够不能计算"); // 平均值为0,说明数据不足,不能计算增量值 return new decimal[] { 0M, 0M }; } var systolicAvg = filteredList?.Select(bp => bp.SystolicValue).Average(); var diastolicAvg = filteredList?.Select(bp => bp.DiastolicValue).Average(); return new decimal[] { (decimal)systolicAvg!, (decimal)diastolicAvg! }; } /// /// 去除最大值和最小值各一个(列表的头和尾) /// /// /// /// public decimal[] AverageAfterRemovingOneMinMaxRef(ParseTDengineRestResponse? hmBpParser) { var sortedList = hmBpParser?.Select(i => i) .Where(i => i.IsDisplay.Equals(true)) .OrderByDescending(i => i.SystolicValue) .ThenByDescending(i => i.DiastolicValue) .ToList(); //_logger.LogInformation($"计算时间段排列数据集:{JsonConvert.SerializeObject(sortedList)}"); // 去除最大值和最小值各一个(列表的头和尾) var trimmedList = sortedList? .Skip(1) .Take(sortedList.Count - 2) .ToList(); //_logger.LogInformation($"计算去除最大值和最小值各一个数据集:{JsonConvert.SerializeObject(trimmedList)}"); var filteredList = trimmedList?.ToList(); //_logger.LogInformation($"计算除异常值个数据集:{JsonConvert.SerializeObject(filteredList)}"); if (filteredList?.Count < 2) { // throw new ArgumentException("数据不够不能计算"); // 平均值为0,说明数据不足,不能计算增量值 return new decimal[] { 0M, 0M }; } var systolicAvg = filteredList?.Select(bp => bp.SystolicValue).Average(); var diastolicAvg = filteredList?.Select(bp => bp.DiastolicValue).Average(); return new decimal[] { (decimal)systolicAvg!, (decimal)diastolicAvg! }; } #region SqlSugarClient /** public async Task InsertFetalHeartRateAsync() { var tableName = typeof(FetalHeartRateModel) .GetCustomAttribute()? .STableName; if (tableName == null) { throw new InvalidOperationException("STableAttribute not found on FetalHeartRateModel class."); } await _clientSqlSugar.Ado.ExecuteCommandAsync($"create table IF NOT EXISTS hm_fhr_00 using {tableName} tags('00')"); _clientSqlSugar.Insertable(new FetalHeartRateModel() { Timestamp = DateTime.Now, CreateTime = DateTime.Now, FetalHeartRate = 90, FetalHeartRateId = Guid.NewGuid().ToString("D"), IsDisplay = false, Method = 1, PersonId = Guid.NewGuid().ToString("D"), MessageId = Guid.NewGuid().ToString("D"), SerialNumber = Guid.NewGuid().ToString("D"), DeviceKey = Guid.NewGuid().ToString("D"), SerialTailNumber = "00", LastUpdate = DateTime.Now, }).AS("hm_fhr_00").ExecuteCommand(); } public Task GetFirst() { var tableName = typeof(FetalHeartRateModel) .GetCustomAttribute()? .STableName; var first = _clientSqlSugar .Queryable() .AS(tableName) .OrderByDescending(x => x.Timestamp).FirstAsync(); return first; } */ /// /// 插入记录 /// /// /// /// 子表名称 /// /// public async Task InsertAsync(string tbName, T model) { var stbName = typeof(T) .GetCustomAttribute()? .STableName; if (stbName == null) { throw new InvalidOperationException($"STableAttribute not found on {nameof(T)} class."); } var tailNo = typeof(T).GetProperty("SerialTailNumber")?.GetValue(model)?.ToString(); if (string.IsNullOrEmpty(tailNo)) { throw new InvalidOperationException($"SerialNumberAttribute not found on {nameof(T)} class."); } var tbFullName = $"{tbName}_{tailNo}"; await _clientSqlSugar.Ado.ExecuteCommandAsync($"create table IF NOT EXISTS {tbFullName} using {stbName} tags('{tailNo}')"); //_clientSqlSugar.InsertableByDynamic(model).AS(tbFullName).ExecuteCommand(); _clientSqlSugar.InsertableByObject(model).AS(tbFullName).ExecuteCommand(); } public Task GetLastAsync() where T : class { var tableName = typeof(T) .GetCustomAttribute()? .STableName; // 创建一个表示 Timestamp 属性的表达式 var parameter = Expression.Parameter(typeof(T), "x"); var property = Expression.Property(parameter, "Timestamp"); var lambda = Expression.Lambda>(Expression.Convert(property, typeof(object)), parameter); var first = _clientSqlSugar .Queryable() .AS(tableName) .OrderByDescending(lambda).FirstAsync(); return first; } public Task GetLastAsync(string serialNo) where T : class { var tableName = typeof(T) .GetCustomAttribute()? .STableName; // 创建表示 Timestamp 属性的表达式 var parameter = Expression.Parameter(typeof(T), "x"); var timestampProperty = Expression.Property(parameter, "Timestamp"); var timestampLambda = Expression.Lambda>(Expression.Convert(timestampProperty, typeof(object)), parameter); // 创建表示 SerialNo 属性的表达式 var serialNoProperty = Expression.Property(parameter, "SerialNumber"); var serialNoConstant = Expression.Constant(serialNo); var equalExpression = Expression.Equal(serialNoProperty, serialNoConstant); var serialNoLambda = Expression.Lambda>(equalExpression, parameter); var first = _clientSqlSugar .Queryable() .AS(tableName) .Where(serialNoLambda) .OrderByDescending(timestampLambda).FirstAsync(); return first; } public async Task> GetBySerialNoAsync(string serialNo) where T : class { var tableName = typeof(T) .GetCustomAttribute()? .STableName; // 创建表示 Timestamp 属性的表达式 var parameter = Expression.Parameter(typeof(T), "x"); var timestampProperty = Expression.Property(parameter, "Timestamp"); var timestampLambda = Expression.Lambda>(Expression.Convert(timestampProperty, typeof(object)), parameter); // 创建表示 SerialNo 属性的表达式 var serialNoProperty = Expression.Property(parameter, "SerialNumber"); var serialNoConstant = Expression.Constant(serialNo); var equalExpression = Expression.Equal(serialNoProperty, serialNoConstant); var serialNoLambda = Expression.Lambda>(equalExpression, parameter); var res = await _clientSqlSugar .Queryable() .AS(tableName) .Where(serialNoLambda) .OrderByDescending(timestampLambda).ToListAsync(); return res; } public async Task> GetBySerialNoAsync(string serialNo, int days) where T : class { var tableName = typeof(T) .GetCustomAttribute()? .STableName; // 创建表示 Timestamp 属性的表达式 var parameter = Expression.Parameter(typeof(T), "x"); var timestampProperty = Expression.Property(parameter, "Timestamp"); var timestampLambda = Expression.Lambda>(Expression.Convert(timestampProperty, typeof(object)), parameter); // 创建表示 SerialNo 属性的表达式 var serialNoProperty = Expression.Property(parameter, "SerialNumber"); var serialNoConstant = Expression.Constant(serialNo); var equalExpression = Expression.Equal(serialNoProperty, serialNoConstant); // 创建表示 Timestamp 大于指定天数的表达式 var daysAgo = DateTime.Now.AddDays(-days); var daysAgoConstant = Expression.Constant(daysAgo, typeof(DateTime)); var greaterThanExpression = Expression.GreaterThan(timestampProperty, daysAgoConstant); // 合并 SerialNo 和 Timestamp 条件 var combinedExpression = Expression.AndAlso(equalExpression, greaterThanExpression); var combinedLambda = Expression.Lambda>(combinedExpression, parameter); var res = await _clientSqlSugar .Queryable() .AS(tableName) .Where(combinedLambda) .OrderByDescending(timestampLambda).ToListAsync(); return res; } public async Task DeleteAllBySerialNoCMDAsync(string serialNo) where T : class { var records = await GetBySerialNoAsync(serialNo, 365); var stbName = typeof(T) .GetCustomAttribute()? .STableName; var tasks = records.Select(async r => { Type modelType = typeof(T); PropertyInfo timestampProperty = typeof(T).GetProperty("Timestamp")!; object timestampValue = timestampProperty.GetValue(r)!; var ts = ((DateTime)timestampValue); var startTimestamp = ts.ToString("yyyy-MM-dd HH:mm:ss.fff"); var endTimestamp = ts.AddMilliseconds(1).ToString("yyyy-MM-dd HH:mm:ss.fff"); var sql = $"DELETE FROM {stbName} WHERE ts >= '{startTimestamp}' AND ts < '{endTimestamp}'"; var res= await _clientSqlSugar.Ado.ExecuteCommandAsync(sql); Console.WriteLine(res); }); await Task.WhenAll(tasks); } #endregion #region 胎心算法 ///// ///// 计算个人一般心率(最大值,最大值,最小值) ///// ///// ///// ///// ///// //public async Task InitPregnancyCommonHeartRateModeAsync(string serialNo, int days = 7, int percentage = 90) //{ // var tableName = typeof(PregnancyHeartRateModel) // .GetCustomAttribute()? // .STableName; // var daysAgo = DateTime.Now.AddDays(-days); // var collection = await _clientSqlSugar // .Queryable() // .AS(tableName) // .Where(i => i.SerialNumber.Equals(serialNo)) // .Where(i => i.Timestamp > daysAgo) // .OrderByDescending(i => i.Timestamp) // .ToArrayAsync(); // var res = collection // .Select(i => i.PregnancyHeartRate).ToList(); // // 心率数据量必须30个以上才进行计算 // if (res.Count < 30) // { // _logger.LogInformation($"{serialNo} 心率数据不足,无法计算其众数"); // return null; // } // #region 计算众数 // var mode = res.GroupBy(n => n) // .OrderByDescending(g => g.Count()) // .First() // .Key; // Console.WriteLine("众数是: " + mode); // // 如果有多个众数的情况 // var maxCount = res.GroupBy(n => n) // .Max(g => g.Count()); // var modes = res.GroupBy(n => n) // .Where(g => g.Count() == maxCount) // .Select(g => g.Key) // .ToList(); // // 多个众数,选择最接近平均数或中位数的众数 // if (modes.Count > 1) // { // // 计算平均值 // double average = res.Average(); // Console.WriteLine("平均值是: " + average); // // 计算中位数 // double median; // int count = res.Count; // var sortedRes = res.OrderBy(n => n).ToList(); // if (count % 2 == 0) // { // // 偶数个元素,取中间两个数的平均值 // median = (sortedRes[count / 2 - 1] + sortedRes[count / 2]) / 2.0; // } // else // { // // 奇数个元素,取中间的数 // median = sortedRes[count / 2]; // } // //Console.WriteLine("中位数是: " + median); // _logger.LogInformation($"{serialNo} 中位数是: " + median); // // 找出最接近平均值的众数 // //var closestToAverage = modes.OrderBy(m => Math.Abs(m - average)).First(); // //Console.WriteLine("最接近平均值的众数是: " + closestToAverage); // // 找出最接近中位数的众数 // var closestToMedian = modes.OrderBy(m => Math.Abs(m - median)).First(); // _logger.LogInformation($"{serialNo} 最接近中位数的众数是: " + closestToMedian); // mode = closestToMedian; // } // #endregion // // 计算需要的数量 // int requiredCount = (int)(res.Count * 0.9); // // 从原始数据集中获取最接近众数的元素 // var closestToModeData = res.OrderBy(n => Math.Abs(n - mode)) // .Take(requiredCount) // .ToList(); // // 输出新数据集 // _logger.LogInformation($"{serialNo} 新数据集: " + string.Join(", ", closestToModeData)); // _logger.LogInformation($"{serialNo} 新数据集的数量: " + closestToModeData.Count); // var fhrMap = _mgrFhrPhrMapCache.GetHeartRatesMap(); // var watchConfig = await _deviceCacheMgr.GetGpsDeviceWatchConfigCacheObjectBySerialNoAsync(serialNo, "0067"); // if (watchConfig == null) // { // return null; // } // // long.TryParse(watchConfig["EDOC"]!.ToString(), out long edoc); // // "EDOC": "1720860180652",当前时间 - (EDOC - 280) days =怀孕时间 // //edoc = edoc.ToString().Length == 10 ? edoc * 1000 : edoc; // var edoc = DateTimeUtil.ToDateTime(watchConfig["EDOC"]!.ToString()); // int pregnancyWeek = (DateTime.Now - edoc.AddDays(-280)).Days / 7; // _logger.LogInformation($"IMEI {serialNo},EDOC:{edoc},NOW:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},SinceNOW:{edoc.AddDays(-280).ToString("yyyy-MM-dd HH:mm:ss")},怀孕周数 {pregnancyWeek}"); // float statMaxValueFprCoefficient = 0f; // float statMinValueFprCoefficient = 0f; // float StatModeAvgFprCoefficient = 0f; // // 20-45周之间 // if (pregnancyWeek >= 12 && pregnancyWeek <= 45) // { // var map = fhrMap // .Where(i => // i.PregnancyPeriod![0] <= pregnancyWeek && // i.PregnancyPeriod[1] >= pregnancyWeek && // i.PregnancyHeartRateRange![0] <= mode && // i.PregnancyHeartRateRange[1] >= mode) // .FirstOrDefault(); // if (map != null) // { // statMaxValueFprCoefficient = (float)Math.Round((decimal)map.FetalHeartRateRange![1] / res.Max(), 3); // statMinValueFprCoefficient = (float)Math.Round((decimal)map.FetalHeartRateRange[0] / res.Min(), 3); // StatModeAvgFprCoefficient = (float)Math.Round((decimal)map.FetalHeartRateAverage / mode, 3); // } // } // return new PregnancyCommonHeartRateModel() // { // Timestamp = DateTime.Now, // PersonId = collection.First().DeviceKey, // DeviceKey = collection.First().DeviceKey, // SerialNumber = collection.First().SerialNumber, // Mode = mode, // Percentage = percentage, // MaxValue = closestToModeData.Max(), // MinValue = closestToModeData.Min(), // OriginalMaxValue = res.Max(), // OriginalMinValue = res.Min(), // CreateTime = DateTime.Now, // StatStartTime = collection.OrderBy(i => i.Timestamp).Select(i => i.Timestamp).First(), // StatEndTime = collection.OrderBy(i => i.Timestamp).Select(i => i.Timestamp).Last(), // StatMaxValueFprCoefficient = statMaxValueFprCoefficient, // StatMinValueFprCoefficient = statMinValueFprCoefficient, // StatModeAvgFprCoefficient = StatModeAvgFprCoefficient, // Remark = string.Empty, // SerialTailNumber = serialNo.Substring(serialNo.Length - 2) // }; //} /// /// 建模 /// /// /// /// /// /// public async Task InitPregnancyCommonHeartRateModeAsync(string serialNo, int days = 7, int percentage = 90, int highFreqSampleInterval=0) { var tableName = typeof(PregnancyHeartRateModel) .GetCustomAttribute()? .STableName; var daysAgo = DateTime.Now.AddDays(-days); var collection = await _clientSqlSugar .Queryable() .AS(tableName) .Where(i => i.SerialNumber.Equals(serialNo)) .Where(i => i.LastUpdate > daysAgo) .OrderByDescending(i => i.LastUpdate) .ToListAsync(); // 去除高频数据 var filteredCollection = GetNonFreqPregnancyHeartRate(collection, highFreqSampleInterval); // 心率数据量必须30个以上才进行计算 if (filteredCollection.Count < 30) { _logger.LogInformation($"{serialNo} 心率数据不足,无法计算其众数"); return null; } var res = filteredCollection .Select(i => i.PregnancyHeartRate).ToList(); //// 心率数据量必须30个以上才进行计算 //if (res.Count < 30) //{ // _logger.LogInformation($"{serialNo} 心率数据不足,无法计算其众数"); // return null; //} var listRes = filteredCollection.Select(i => new { last_update=i.LastUpdate.ToString("yyyy-MM-dd HH:mm:ss"),heart_rate=i.PregnancyHeartRate }).ToList(); //_logger.LogInformation($"highFreqSampleInterval:{highFreqSampleInterval},{serialNo} 去除高频数据后列表: {JsonConvert.SerializeObject(listRes)}"); _logger.LogInformation($"{serialNo} 去除高频数据后的数据集: " + string.Join(", ", res)); #region 计算众数 var mode = res.GroupBy(n => n) .OrderByDescending(g => g.Count()) .First() .Key; _logger.LogInformation("众数是: " + mode); // 如果有多个众数的情况 var maxCount = res.GroupBy(n => n) .Max(g => g.Count()); var modes = res.GroupBy(n => n) .Where(g => g.Count() == maxCount) .Select(g => g.Key) .ToList(); // 多个众数,选择最接近平均数或中位数的众数 if (modes.Count > 1) { // 计算平均值 double average = res.Average(); Console.WriteLine("平均值是: " + average); // 计算中位数 double median; int count = res.Count; var sortedRes = res.OrderBy(n => n).ToList(); if (count % 2 == 0) { // 偶数个元素,取中间两个数的平均值 median = (sortedRes[count / 2 - 1] + sortedRes[count / 2]) / 2.0; } else { // 奇数个元素,取中间的数 median = sortedRes[count / 2]; } //Console.WriteLine("中位数是: " + median); _logger.LogInformation($"{serialNo} 中位数是: " + median); // 找出最接近平均值的众数 //var closestToAverage = modes.OrderBy(m => Math.Abs(m - average)).First(); //Console.WriteLine("最接近平均值的众数是: " + closestToAverage); // 找出最接近中位数的众数 var closestToMedian = modes.OrderBy(m => Math.Abs(m - median)).First(); _logger.LogInformation($"{serialNo} 最接近中位数的众数是: " + closestToMedian); mode = closestToMedian; } #endregion // 计算需要的数量 int requiredCount = (int)(res.Count * 0.9); // 从原始数据集中获取最接近众数的元素 var closestToModeData = res.OrderBy(n => Math.Abs(n - mode)) .Take(requiredCount) .ToList(); // 输出新数据集 _logger.LogInformation($"{serialNo} 新数据集: " + string.Join(", ", closestToModeData)); _logger.LogInformation($"{serialNo} 新数据集的数量: {closestToModeData.Count},最大值: {closestToModeData.Max()},最小值 {closestToModeData.Min()},原始最大值: {res.Max()},原始最小值 {res.Min()}" ); var fhrMap = _mgrFhrPhrMapCache.GetHeartRatesMap(); var watchConfig = await _deviceCacheMgr.GetGpsDeviceWatchConfigCacheObjectBySerialNoAsync(serialNo, "0067"); if (watchConfig == null) { return null; } // long.TryParse(watchConfig["EDOC"]!.ToString(), out long edoc); // "EDOC": "1720860180652",当前时间 - (EDOC - 280) days =怀孕时间 //edoc = edoc.ToString().Length == 10 ? edoc * 1000 : edoc; var edoc = DateTimeUtil.ToDateTime(watchConfig["EDOC"]!.ToString()); int pregnancyWeek = (DateTime.Now - edoc.AddDays(-280)).Days / 7; _logger.LogInformation($"IMEI {serialNo},EDOC:{edoc},NOW:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},SinceNOW:{edoc.AddDays(-280).ToString("yyyy-MM-dd HH:mm:ss")},怀孕周数 {pregnancyWeek}"); float statMaxValueFprCoefficient = 0f; float statMinValueFprCoefficient = 0f; float StatModeAvgFprCoefficient = 0f; // 如果最大值与最小值在60~100范围内,都按个固定值下发高频阈值。 var maxValue = closestToModeData.Max(); var minValue = closestToModeData.Min(); //if ((maxValue >= 60 && maxValue <= 100) && (minValue >= 60 && minValue <= 100)) //{ // minValue = 60; // maxValue = 100; //} // 最大值最小值两边扩散 // 最小值不能大于60 minValue = minValue > 60 ? 60 : minValue; // 最大值不能少于100 maxValue= maxValue < 100 ? 100 : maxValue; // 12-45周之间 if (pregnancyWeek >= 12 && pregnancyWeek <= 45) { var map = fhrMap .Where(i => i.PregnancyPeriod![0] <= pregnancyWeek && i.PregnancyPeriod[1] >= pregnancyWeek //&&i.PregnancyHeartRateRange![0] <= mode && i.PregnancyHeartRateRange[1] >= mode ) .FirstOrDefault(); if (map != null) { //statMaxValueFprCoefficient = (float)Math.Round((decimal)map.FetalHeartRateRange![1] / closestToModeData.Max(), 3); //statMinValueFprCoefficient = (float)Math.Round((decimal)map.FetalHeartRateRange[0] / closestToModeData.Min(), 3); statMaxValueFprCoefficient = (float)Math.Round((decimal)map.FetalHeartRateRange![1] / maxValue, 3); statMinValueFprCoefficient = (float)Math.Round((decimal)map.FetalHeartRateRange[0] / minValue, 3); StatModeAvgFprCoefficient = (float)Math.Round((decimal)map.FetalHeartRateAverage / mode, 3); } } return new PregnancyCommonHeartRateModel() { Timestamp = DateTime.Now, PersonId = collection.First().DeviceKey, DeviceKey = collection.First().DeviceKey, SerialNumber = collection.First().SerialNumber, Mode = mode, Percentage = percentage, MaxValue = maxValue, MinValue = minValue, OriginalMaxValue = res.Max(), OriginalMinValue = res.Min(), CreateTime = DateTime.Now, StatStartTime = collection.OrderBy(i => i.Timestamp).Select(i => i.Timestamp).First(), StatEndTime = collection.OrderBy(i => i.Timestamp).Select(i => i.Timestamp).Last(), StatMaxValueFprCoefficient = statMaxValueFprCoefficient, StatMinValueFprCoefficient = statMinValueFprCoefficient, StatModeAvgFprCoefficient = StatModeAvgFprCoefficient, Remark = string.Empty, SerialTailNumber = serialNo.Substring(serialNo.Length - 2) }; } /// /// 去除高频数据 /// /// /// /// private static List GetNonFreqPregnancyHeartRate(List phr, int highFreqSampleInterval) { //phr = phr.OrderByDescending(i => i.LastUpdate).ToList(); //var result = new List(); //PregnancyHeartRateModel? previousItem = null; //foreach (var item in phr) //{ // if (previousItem != null) // { // var timeNextDiff =(previousItem!.LastUpdate - item.LastUpdate).TotalSeconds; // if (timeNextDiff > highFreqSampleInterval) // { // result.Add(previousItem); // } // } // previousItem = item; //} //// 添加上一个 //if (previousItem != null) //{ // result.Add(previousItem); //} //return result; #region 反向 var phr1 = phr.OrderByDescending(i => i.LastUpdate).ToList(); var result = new List(); PregnancyHeartRateModel? previousItem1 = null; foreach (var item in phr1) { if (previousItem1 != null) { var timeNextDiff = (previousItem1!.LastUpdate - item.LastUpdate).TotalSeconds; if (timeNextDiff > highFreqSampleInterval) { result.Add(previousItem1); } } previousItem1 = item; } // 添加上一个 if (previousItem1 != null) { result.Add(previousItem1); } #endregion #region 正向 var phr2 = phr.OrderByDescending(i => i.LastUpdate).ToList(); ; var freqCollection = new List(); PregnancyHeartRateModel? previousItem = null; foreach (var item in phr2) { if (previousItem != null) { var timeNextDiff = (previousItem!.LastUpdate - item.LastUpdate).TotalSeconds; if (timeNextDiff <= highFreqSampleInterval) { freqCollection.Add(item); } } previousItem = item; } //去除高频 foreach (var item in freqCollection) { phr2.Remove(item); } #endregion // 交集 var commonElements = phr2.Intersect(result).ToList(); return commonElements; } /// /// 获取高频数据 /// /// /// /// private static List GetFreqPregnancyHeartRate(List phr, int highFreqSampleInterval) { phr = phr.OrderByDescending(i => i.LastUpdate).ToList(); var freqCollection = new List(); PregnancyHeartRateModel? previousItem = null; foreach (var item in phr) { if (previousItem != null) { var timeNextDiff = (previousItem.LastUpdate - item.LastUpdate).TotalSeconds; if (timeNextDiff <= highFreqSampleInterval) { freqCollection.Add(previousItem); } } previousItem = item; } // 检查最后一条是否高频 if (previousItem != null && (phr.Last().LastUpdate - previousItem.LastUpdate).TotalSeconds <= highFreqSampleInterval) { freqCollection.Add(previousItem); } return freqCollection; } /// /// 胎心算法 /// 1、增量系数 = (正常胎心最大值 - 正常胎心最小值) / (阀值最大值 - 阀值最小值) /// 2、胎心值 = 正常胎心最小值 + 增量系数 * (心率值 - 阀值最小值) /// /// /// /// public async Task GetFetalHeartRateAsync(string serialNo,int heartrateValue) { var fhrMap = _mgrFhrPhrMapCache.GetHeartRatesMap(); var watchConfig = await _deviceCacheMgr.GetGpsDeviceWatchConfigCacheObjectBySerialNoAsync(serialNo, "0067"); if (watchConfig == null) { return 0; } var edoc = DateTimeUtil.ToDateTime(watchConfig["EDOC"]!.ToString()); int pregnancyWeek = (DateTime.Now - edoc.AddDays(-280)).Days / 7; // 12-45周之间 if (pregnancyWeek >= 12 && pregnancyWeek <= 45) { var map = fhrMap .Where(i => i.PregnancyPeriod![0] <= pregnancyWeek && i.PregnancyPeriod[1] >= pregnancyWeek //&&i.PregnancyHeartRateRange![0] <= mode && i.PregnancyHeartRateRange[1] >= mode ) .FirstOrDefault(); var NormaletalHeartRateMin = map?.FetalHeartRateRange![0]; var NormaletalHeartRateMax = map?.FetalHeartRateRange![1]; // 触发高频监测的心率上限值 var triggerHighFreqHigh = (int)watchConfig["triggerHighFreqHigh"]!; // 触发高频监测的心率下限值 var triggerHighFreqLow = (int)watchConfig["triggerHighFreqLow"]!; //增量系数 = (正常胎心最大值 - 正常胎心最小值) / (阀值最大值 - 阀值最小值) var coefficient = (NormaletalHeartRateMax - NormaletalHeartRateMin) / (triggerHighFreqHigh - triggerHighFreqLow); //胎心值 = 正常胎心最小值 + 增量系数 * (心率值 - 阀值最小值) var fetalHeartRate = NormaletalHeartRateMin + coefficient * (heartrateValue - triggerHighFreqLow); _logger.LogInformation($"{serialNo} 孕周:{pregnancyWeek}, 正常胎心最大值:{NormaletalHeartRateMax},正常胎心最小值:{NormaletalHeartRateMin},阀值最大值:{triggerHighFreqHigh} ,阀值最小值:{triggerHighFreqLow}, 增量系数:{coefficient}"); return SafeType.SafeInt(fetalHeartRate!); } return 0; } #endregion } }