Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

949 lines
39KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Common.helper;
  3. using HealthMonitor.Model.Config;
  4. using HealthMonitor.Model.Service.Mapper;
  5. using HealthMonitor.Service.Biz.db.Dto;
  6. using HealthMonitor.Util.Models;
  7. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  8. using Microsoft.Extensions.Logging;
  9. using Microsoft.Extensions.Options;
  10. using Newtonsoft.Json;
  11. using Newtonsoft.Json.Linq;
  12. using SqlSugar;
  13. using SqlSugar.DbConvert;
  14. using SqlSugar.TDengine;
  15. using System;
  16. using System.Collections.Generic;
  17. using System.Data;
  18. using System.Linq;
  19. using System.Linq.Expressions;
  20. using System.Reflection;
  21. using System.Text;
  22. using System.Threading.Tasks;
  23. using System.Xml.Linq;
  24. using TDengineDriver;
  25. using TDengineDriver.Impl;
  26. using TDengineTMQ;
  27. namespace HealthMonitor.Service.Biz.db
  28. {
  29. public class TDengineService
  30. {
  31. private readonly ILogger<TDengineService> _logger;
  32. private readonly HttpHelper _httpHelper=default!;
  33. private readonly TDengineServiceConfig _configTDengineService;
  34. private readonly SqlSugarClient _clientSqlSugar;
  35. public TDengineService(ILogger<TDengineService> logger,
  36. IOptions<TDengineServiceConfig> configTDengineService,
  37. HttpHelper httpHelper
  38. )
  39. {
  40. _logger = logger;
  41. _configTDengineService = configTDengineService.Value;
  42. _httpHelper = httpHelper;
  43. _clientSqlSugar = new SqlSugarClient(new ConnectionConfig()
  44. {
  45. DbType = SqlSugar.DbType.TDengine,
  46. ConnectionString = $"Host={_configTDengineService.Host};Port={_configTDengineService.Port};Username={_configTDengineService.UserName};Password={_configTDengineService.Password};Database={_configTDengineService.DB};TsType=config_ns",
  47. IsAutoCloseConnection = true,
  48. AopEvents = new AopEvents
  49. {
  50. OnLogExecuting = (sql, p) =>
  51. {
  52. Console.WriteLine(SqlSugar.UtilMethods.GetNativeSql(sql, p));
  53. }
  54. },
  55. ConfigureExternalServices = new ConfigureExternalServices()
  56. {
  57. EntityService = (property, column) =>
  58. {
  59. if (column.SqlParameterDbType == null)
  60. {
  61. column.SqlParameterDbType = typeof(CommonPropertyConvert);
  62. }
  63. }
  64. },
  65. MoreSettings=new ConnMoreSettings()
  66. {
  67. PgSqlIsAutoToLower = false,
  68. PgSqlIsAutoToLowerCodeFirst = false,
  69. }
  70. });
  71. }
  72. public IntPtr Connection()
  73. {
  74. string host = _configTDengineService.Host;
  75. string user = _configTDengineService.UserName;
  76. string db = _configTDengineService.DB;
  77. short port = _configTDengineService.Port;
  78. string password = _configTDengineService.Password;
  79. //#if DEBUG
  80. // //string configDir = "C:/TDengine/cfg";
  81. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  82. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  83. //#endif
  84. IntPtr conn = TDengine.Connect(host, user, password, db, port);
  85. if (conn == IntPtr.Zero)
  86. {
  87. _logger.LogError($"连接 TDengine 失败....");
  88. }
  89. else
  90. {
  91. _logger.LogInformation($"连接 TDengine 成功....");
  92. }
  93. return conn;
  94. }
  95. public void ExecuteSQL(IntPtr conn, string sql)
  96. {
  97. IntPtr res = TDengine.Query(conn, sql);
  98. // Check if query success
  99. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  100. {
  101. Console.Write(sql + " failure, ");
  102. // Get error message while Res is a not null pointer.
  103. if (res != IntPtr.Zero)
  104. {
  105. Console.Write("reason:" + TDengine.Error(res));
  106. }
  107. }
  108. else
  109. {
  110. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  111. //... do something with res ...
  112. // Important: need to free result to avoid memory leak.
  113. TDengine.FreeResult(res);
  114. }
  115. }
  116. public void ExecuteQuerySQL(IntPtr conn, string sql)
  117. {
  118. IntPtr res = TDengine.Query(conn, sql);
  119. // Check if query success
  120. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  121. {
  122. Console.Write(sql + " failure, ");
  123. // Get error message while Res is a not null pointer.
  124. if (res != IntPtr.Zero)
  125. {
  126. Console.Write("reason:" + TDengine.Error(res));
  127. }
  128. }
  129. else
  130. {
  131. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  132. //... do something with res ...
  133. List<TDengineDriver.TDengineMeta> resMeta = LibTaos.GetMeta(res);
  134. List<object> resData = LibTaos.GetData(res);
  135. foreach (var meta in resMeta)
  136. {
  137. _logger.LogInformation("\t|{meta.name} {meta.TypeName()} ({meta.size})\t|", meta.name, meta.TypeName(), meta.size);
  138. }
  139. for (int i = 0; i < resData.Count; i++)
  140. {
  141. _logger.LogInformation($"|{resData[i].ToString()} \t");
  142. if (((i + 1) % resMeta.Count == 0))
  143. {
  144. _logger.LogInformation("");
  145. }
  146. }
  147. // Important: need to free result to avoid memory leak.
  148. TDengine.FreeResult(res);
  149. }
  150. }
  151. public void CheckRes(IntPtr conn, IntPtr res, String errorMsg)
  152. {
  153. if (TDengine.ErrorNo(res) != 0)
  154. {
  155. throw new Exception($"{errorMsg} since: {TDengine.Error(res)}");
  156. }
  157. }
  158. public void ExecuteInsertSQL(string sql)
  159. {
  160. var conn = Connection();
  161. try
  162. {
  163. //sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
  164. // "d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
  165. // "d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
  166. // "d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
  167. //#if DEBUG
  168. // //string configDir = "C:/TDengine/cfg";
  169. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  170. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  171. //#endif
  172. _logger.LogInformation($"Insert SQL: {sql}");
  173. IntPtr res = TDengine.Query(conn, sql);
  174. CheckRes(conn, res, "failed to insert data");
  175. int affectedRows = TDengine.AffectRows(res);
  176. _logger.LogInformation("affectedRows {affectedRows}" , affectedRows);
  177. TDengine.FreeResult(res);
  178. }
  179. finally
  180. {
  181. TDengine.Close(conn);
  182. }
  183. }
  184. #region TDengine.Connector async query
  185. public void QueryCallback(IntPtr param, IntPtr taosRes, int code)
  186. {
  187. if (code == 0 && taosRes != IntPtr.Zero)
  188. {
  189. FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback);
  190. TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param);
  191. }
  192. else
  193. {
  194. _logger.LogInformation("async query data failed, failed code {code}",code);
  195. }
  196. }
  197. // Iteratively call this interface until "numOfRows" is no greater than 0.
  198. public void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows)
  199. {
  200. if (numOfRows > 0)
  201. {
  202. _logger.LogInformation("{numOfRows} rows async retrieved", numOfRows);
  203. IntPtr pdata = TDengine.GetRawBlock(taosRes);
  204. List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
  205. List<object> dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows);
  206. for (int i = 0; i < metaList.Count; i++)
  207. {
  208. _logger.LogInformation("{0} {1}({2}) \t|", metaList[i].name, metaList[i].type, metaList[i].size);
  209. }
  210. _logger.LogInformation("");
  211. for (int i = 0; i < dataList.Count; i++)
  212. {
  213. if (i != 0 && i % metaList.Count == 0)
  214. {
  215. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  216. }
  217. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  218. }
  219. TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param);
  220. }
  221. else
  222. {
  223. if (numOfRows == 0)
  224. {
  225. _logger.LogInformation("async retrieve complete.");
  226. }
  227. else
  228. {
  229. _logger.LogInformation("FetchRawBlockCallback callback error, error code {numOfRows}", numOfRows);
  230. }
  231. TDengine.FreeResult(taosRes);
  232. }
  233. }
  234. public void ExecuteQueryAsync(string sql)
  235. {
  236. var conn = Connection();
  237. QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  238. TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  239. }
  240. //public void ExecuteQuery(string sql)
  241. //{
  242. // var conn = Connection();
  243. // QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  244. // TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  245. //}
  246. public Aggregate GetAggregateValue(string field, string tbName, string? condition)
  247. {
  248. List<int> data = new();
  249. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  250. var conn = Connection();
  251. try
  252. {
  253. IntPtr res = TDengine.Query(conn, sql);
  254. // Check if query success
  255. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  256. {
  257. Console.Write(sql + " failure, ");
  258. // Get error message while Res is a not null pointer.
  259. if (res != IntPtr.Zero)
  260. {
  261. Console.Write("reason:" + TDengine.Error(res));
  262. }
  263. }
  264. else
  265. {
  266. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  267. //... do something with res ...
  268. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  269. List<object> resData = LibTaos.GetData(res);
  270. foreach (var meta in resMeta)
  271. {
  272. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  273. }
  274. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  275. // Important: need to free result to avoid memory leak.
  276. TDengine.FreeResult(res);
  277. }
  278. }
  279. finally
  280. {
  281. TDengine.Close(conn);
  282. }
  283. return new Aggregate
  284. {
  285. Max = data.Count.Equals(0) ? 0 : data[0],
  286. Min = data.Count.Equals(0) ? 0 : data[1],
  287. };
  288. }
  289. public int GetAvgExceptMaxMinValue(string field, string tbName, string? condition)
  290. {
  291. List<int> data = new();
  292. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  293. var aggregate= GetAggregateValue(field, tbName, condition);
  294. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < {aggregate.Max} and {field} > {aggregate.Min}";
  295. var conn = Connection();
  296. try
  297. {
  298. IntPtr res = TDengine.Query(conn, sqlAvg);
  299. // Check if query success
  300. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  301. {
  302. Console.Write(sqlAvg + " failure, ");
  303. // Get error message while Res is a not null pointer.
  304. if (res != IntPtr.Zero)
  305. {
  306. Console.Write("reason:" + TDengine.Error(res));
  307. }
  308. }
  309. else
  310. {
  311. Console.Write(sqlAvg + " success, {0} rows affected", TDengine.AffectRows(res));
  312. //... do something with res ...
  313. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  314. List<object> resData = LibTaos.GetData(res);
  315. foreach (var meta in resMeta)
  316. {
  317. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  318. }
  319. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  320. // Important: need to free result to avoid memory leak.
  321. TDengine.FreeResult(res);
  322. }
  323. }
  324. finally
  325. {
  326. TDengine.Close(conn);
  327. }
  328. return data.Count.Equals(0) ? 0 : data[0];
  329. }
  330. #endregion
  331. #region RestAPI
  332. public async Task<string?> ExecuteQuerySQLRestResponse(string sql)
  333. {
  334. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  335. List<KeyValuePair<string, string>> headers = new()
  336. {
  337. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  338. };
  339. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  340. return result;
  341. }
  342. public async Task<string?> ExecuteSelectRestResponseAsync( string tbName, string condition="1", string field = "*")
  343. {
  344. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  345. var sql = $"SELECT {field} FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  346. List<KeyValuePair<string, string>> headers = new()
  347. {
  348. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  349. };
  350. _logger.LogInformation($"{nameof(ExecuteSelectRestResponseAsync)} --- SQL 语句执行 {sql}");
  351. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  352. return result;
  353. }
  354. public async Task<bool> GernalRestSql(string sql)
  355. {
  356. //"http://{server}:{port}/rest/sql/{db}"
  357. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  358. List<KeyValuePair<string, string>> headers = new()
  359. {
  360. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  361. };
  362. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  363. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  364. if (result != null)
  365. {
  366. if (res?.Code == 0)
  367. {
  368. _logger.LogInformation($"{nameof(GernalRestSql)},SQL 语句执行成功|{sql}");
  369. return true;
  370. }
  371. else
  372. {
  373. _logger.LogWarning($"{nameof(GernalRestSql)},SQL 语句执行失败||{sql}");
  374. return false;
  375. }
  376. }
  377. else
  378. {
  379. _logger.LogError($"{nameof(GernalRestSql)},TDengine 服务器IP:{_configTDengineService.Host} 错误,请联系运维人员");
  380. return false;
  381. }
  382. //return res.Code==0;
  383. }
  384. public async Task<string?> GernalRestSqlResTextAsync(string sql)
  385. {
  386. _logger.LogInformation($"执行 SQL: {nameof(GernalRestSqlResTextAsync)}--{sql}");
  387. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  388. List<KeyValuePair<string, string>> headers = new()
  389. {
  390. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  391. };
  392. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  393. return result;
  394. }
  395. /** 取消
  396. /// <summary>
  397. /// 最大值,最小值聚合
  398. /// </summary>
  399. /// <param name="field"></param>
  400. /// <param name="tbName"></param>
  401. /// <param name="condition"></param>
  402. /// <returns></returns>
  403. public async Task<Aggregate> GetAggregateValueAsync(string field,string tbName,string? condition)
  404. {
  405. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  406. var result = await GernalRestSqlResTextAsync(sql);
  407. var res = JsonConvert.DeserializeObject<Aggregate>(result!);
  408. List<dynamic> data = res?.Data!;
  409. return new Aggregate
  410. {
  411. Max = data.Count.Equals(0) ? 0 : data[0][0],
  412. Min = data.Count.Equals(0) ? 0 : data[0][1],
  413. };
  414. }
  415. /// <summary>
  416. /// 去除最大值和最小值后的平均值
  417. /// </summary>
  418. /// <param name="field"></param>
  419. /// <param name="tbName"></param>
  420. /// <param name="condition"></param>
  421. /// <returns></returns>
  422. public async Task<int> GetAvgExceptMaxMinValueAsync(string field, string tbName, string? condition)
  423. {
  424. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  425. var result = await GernalRestSqlResTextAsync(sql);
  426. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  427. _logger.LogInformation($"最大小值:{sql}");
  428. List<dynamic> data = res?.Data!;
  429. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < { (data.Count.Equals(0)? 0: data[0][0]) } and {field} > {(data.Count.Equals(0) ? 0 : data[0][1])}";
  430. result = await GernalRestSqlResTextAsync(sqlAvg);
  431. _logger.LogInformation($"sqlAvg:{sqlAvg}");
  432. res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  433. data = res?.Data!;
  434. return data.Count.Equals(0)?0:(int)data[0][0];
  435. }
  436. /// <summary>
  437. /// 获取最后的记录
  438. /// </summary>
  439. /// <param name="tbName"></param>
  440. /// <param name="condition"></param>
  441. /// <returns></returns>
  442. public async Task<JArray?> GetLastAsync(string tbName, string? condition)
  443. {
  444. var sql = $"SELECT last_row(*) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  445. var result = await GernalRestSqlResTextAsync(sql);
  446. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  447. if(res?.Data?.Count==0) return null;
  448. List<dynamic> data = res?.Data!;
  449. return data[0] as JArray;
  450. }
  451. public async Task<int> GetCount(string tbName, string? condition)
  452. {
  453. var sql = $"SELECT count(ts) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  454. var result = await GernalRestSqlResTextAsync(sql);
  455. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  456. List<dynamic> data = res?.Data!;
  457. return SafeType.SafeInt(data[0][0]);
  458. }
  459. */
  460. #endregion
  461. /// <summary>
  462. /// 平均值算法(去除最大值,最小值和大于标定值的平均值)
  463. /// </summary>
  464. /// <param name="numToRemove"></param>
  465. /// <param name="collection"></param>
  466. /// <param name="max"></param>
  467. /// <param name="min"></param>
  468. /// <returns></returns>
  469. public static decimal AverageAfterRemovingOneMinMaxRef(List<int> collection, int max, int min,int refValue)
  470. {
  471. collection.Remove(max);
  472. collection.Remove(min);
  473. collection.RemoveAll(_ => _ > refValue);
  474. if (collection.Count < 2)
  475. {
  476. throw new ArgumentException($"数据集{collection.ToArray()},去掉一个最大值 {max}和一个最小值{min},异常值(大于标定值{refValue}),后数据值不足");
  477. }
  478. return (decimal)collection.Average(x => x);
  479. //var values = ParseData.Select(valueSelector).ToList();
  480. //collection = values.Select(i => (int)i).ToArray();
  481. //if (values.Count <= 2)
  482. //{
  483. // throw new ArgumentException("Not enough elements to remove.");
  484. //}
  485. //// Remove the specified number of minimum and maximum values
  486. ////values.RemoveAll(_ => _ == values.Min());
  487. ////values.RemoveAll(_ => _ == values.Max());
  488. //max = (int)values.Max();
  489. //min = (int)values.Min();
  490. //values.Remove(max);
  491. //values.Remove(min);
  492. //// Remove values less than the specified threshold
  493. //values.RemoveAll(_ => _ > numToRemove);
  494. //// Calculate and return the average of the remaining values
  495. //return values.Average();
  496. }
  497. /// <summary>
  498. /// 去除最大值和最小值各一个(列表的头和尾),再去除异常值
  499. /// </summary>
  500. /// <param name="systolicRefValue"></param>
  501. /// <param name="hmBpParser"></param>
  502. /// <returns></returns>
  503. public decimal[] AverageAfterRemovingOneMinMaxRef(int systolicRefValue, ParseTDengineRestResponse<BloodPressureModel>? hmBpParser)
  504. {
  505. var sortedList = hmBpParser?.Select(i => i)
  506. .Where(i => i.IsDisplay.Equals(true))
  507. .OrderByDescending(i => i.SystolicValue)
  508. .ThenByDescending(i => i.DiastolicValue)
  509. .ToList();
  510. _logger.LogInformation($"计算时间段排列数据集:{JsonConvert.SerializeObject(sortedList)}");
  511. // 去除最大值和最小值各一个(列表的头和尾)
  512. var trimmedList = sortedList?
  513. .Skip(1)
  514. .Take(sortedList.Count - 2)
  515. .ToList();
  516. _logger.LogInformation($"计算去除最大值和最小值各一个数据集:{JsonConvert.SerializeObject(trimmedList)}");
  517. // 去除异常值
  518. var filteredList = trimmedList?.Where(bp => bp.SystolicValue < SafeType.SafeInt(systolicRefValue!)).ToList();
  519. _logger.LogInformation($"计算除异常值个数据集:{JsonConvert.SerializeObject(filteredList)}");
  520. if (filteredList?.Count < 2)
  521. {
  522. // throw new ArgumentException("数据不够不能计算");
  523. // 平均值为0,说明数据不足,不能计算增量值
  524. return new decimal[] { 0M, 0M };
  525. }
  526. var systolicAvg = filteredList?.Select(bp => bp.SystolicValue).Average();
  527. var diastolicAvg = filteredList?.Select(bp => bp.DiastolicValue).Average();
  528. return new decimal[] { (decimal)systolicAvg!, (decimal)diastolicAvg! };
  529. }
  530. /// <summary>
  531. /// 去除最大值和最小值各一个(列表的头和尾)
  532. /// </summary>
  533. /// <param name="systolicRefValue"></param>
  534. /// <param name="hmBpParser"></param>
  535. /// <returns></returns>
  536. public decimal[] AverageAfterRemovingOneMinMaxRef(ParseTDengineRestResponse<BloodPressureModel>? hmBpParser)
  537. {
  538. var sortedList = hmBpParser?.Select(i => i)
  539. .Where(i => i.IsDisplay.Equals(true))
  540. .OrderByDescending(i => i.SystolicValue)
  541. .ThenByDescending(i => i.DiastolicValue)
  542. .ToList();
  543. _logger.LogInformation($"计算时间段排列数据集:{JsonConvert.SerializeObject(sortedList)}");
  544. // 去除最大值和最小值各一个(列表的头和尾)
  545. var trimmedList = sortedList?
  546. .Skip(1)
  547. .Take(sortedList.Count - 2)
  548. .ToList();
  549. _logger.LogInformation($"计算去除最大值和最小值各一个数据集:{JsonConvert.SerializeObject(trimmedList)}");
  550. var filteredList = trimmedList?.ToList();
  551. _logger.LogInformation($"计算除异常值个数据集:{JsonConvert.SerializeObject(filteredList)}");
  552. if (filteredList?.Count < 2)
  553. {
  554. // throw new ArgumentException("数据不够不能计算");
  555. // 平均值为0,说明数据不足,不能计算增量值
  556. return new decimal[] { 0M, 0M };
  557. }
  558. var systolicAvg = filteredList?.Select(bp => bp.SystolicValue).Average();
  559. var diastolicAvg = filteredList?.Select(bp => bp.DiastolicValue).Average();
  560. return new decimal[] { (decimal)systolicAvg!, (decimal)diastolicAvg! };
  561. }
  562. #region SqlSugarClient
  563. /**
  564. public async Task InsertFetalHeartRateAsync()
  565. {
  566. var tableName = typeof(FetalHeartRateModel)
  567. .GetCustomAttribute<STableAttribute>()?
  568. .STableName;
  569. if (tableName == null)
  570. {
  571. throw new InvalidOperationException("STableAttribute not found on FetalHeartRateModel class.");
  572. }
  573. await _clientSqlSugar.Ado.ExecuteCommandAsync($"create table IF NOT EXISTS hm_fhr_00 using {tableName} tags('00')");
  574. _clientSqlSugar.Insertable(new FetalHeartRateModel()
  575. {
  576. Timestamp = DateTime.Now,
  577. CreateTime = DateTime.Now,
  578. FetalHeartRate = 90,
  579. FetalHeartRateId = Guid.NewGuid().ToString("D"),
  580. IsDisplay = false,
  581. Method = 1,
  582. PersonId = Guid.NewGuid().ToString("D"),
  583. MessageId = Guid.NewGuid().ToString("D"),
  584. SerialNumber = Guid.NewGuid().ToString("D"),
  585. DeviceKey = Guid.NewGuid().ToString("D"),
  586. SerialTailNumber = "00",
  587. LastUpdate = DateTime.Now,
  588. }).AS("hm_fhr_00").ExecuteCommand();
  589. }
  590. public Task<FetalHeartRateModel> GetFirst()
  591. {
  592. var tableName = typeof(FetalHeartRateModel)
  593. .GetCustomAttribute<STableAttribute>()?
  594. .STableName;
  595. var first = _clientSqlSugar
  596. .Queryable<FetalHeartRateModel>()
  597. .AS(tableName)
  598. .OrderByDescending(x => x.Timestamp).FirstAsync();
  599. return first;
  600. }
  601. */
  602. /// <summary>
  603. /// 插入记录
  604. /// </summary>
  605. /// <typeparam name="T"></typeparam>
  606. /// <param name="model"></param>
  607. /// <param name="tbName">子表名称</param>
  608. /// <returns></returns>
  609. /// <exception cref="InvalidOperationException"></exception>
  610. public async Task InsertAsync<T>(string tbName, T model)
  611. {
  612. var stbName = typeof(T)
  613. .GetCustomAttribute<STableAttribute>()?
  614. .STableName;
  615. if (stbName == null)
  616. {
  617. throw new InvalidOperationException($"STableAttribute not found on {nameof(T)} class.");
  618. }
  619. var tailNo = typeof(T).GetProperty("SerialTailNumber")?.GetValue(model)?.ToString();
  620. if (string.IsNullOrEmpty(tailNo))
  621. {
  622. throw new InvalidOperationException($"SerialNumberAttribute not found on {nameof(T)} class.");
  623. }
  624. var tbFullName = $"{tbName}_{tailNo}";
  625. await _clientSqlSugar.Ado.ExecuteCommandAsync($"create table IF NOT EXISTS {tbFullName} using {stbName} tags('{tailNo}')");
  626. //_clientSqlSugar.InsertableByDynamic(model).AS(tbFullName).ExecuteCommand();
  627. _clientSqlSugar.InsertableByObject(model).AS(tbFullName).ExecuteCommand();
  628. }
  629. public Task<T> GetFirstAsync<T>() where T : class
  630. {
  631. var tableName = typeof(T)
  632. .GetCustomAttribute<STableAttribute>()?
  633. .STableName;
  634. // 创建一个表示 Timestamp 属性的表达式
  635. var parameter = Expression.Parameter(typeof(T), "x");
  636. var property = Expression.Property(parameter, "Timestamp");
  637. var lambda = Expression.Lambda<Func<T, object>>(Expression.Convert(property, typeof(object)), parameter);
  638. var first = _clientSqlSugar
  639. .Queryable<T>()
  640. .AS(tableName)
  641. .OrderByDescending(lambda).FirstAsync();
  642. return first;
  643. }
  644. public async Task<List<T>> GetBySerialNoAsync<T>(string serialNo) where T : class
  645. {
  646. var tableName = typeof(T)
  647. .GetCustomAttribute<STableAttribute>()?
  648. .STableName;
  649. // 创建表示 Timestamp 属性的表达式
  650. var parameter = Expression.Parameter(typeof(T), "x");
  651. var timestampProperty = Expression.Property(parameter, "Timestamp");
  652. var timestampLambda = Expression.Lambda<Func<T, object>>(Expression.Convert(timestampProperty, typeof(object)), parameter);
  653. // 创建表示 SerialNo 属性的表达式
  654. var serialNoProperty = Expression.Property(parameter, "SerialNumber");
  655. var serialNoConstant = Expression.Constant(serialNo);
  656. var equalExpression = Expression.Equal(serialNoProperty, serialNoConstant);
  657. var serialNoLambda = Expression.Lambda<Func<T, bool>>(equalExpression, parameter);
  658. var res = await _clientSqlSugar
  659. .Queryable<T>()
  660. .AS(tableName)
  661. .Where(serialNoLambda)
  662. .OrderByDescending(timestampLambda).ToListAsync();
  663. return res;
  664. }
  665. #endregion
  666. #region 胎心算法
  667. /// <summary>
  668. /// 获取孕妇心率众数
  669. /// </summary>
  670. /// <param name="serialNo"></param>
  671. /// <param name="days"></param>
  672. /// <returns></returns>
  673. public async Task<int> GetPregnancyHeartRateModeAsync(string serialNo,int days=7)
  674. {
  675. var tableName = typeof(PregnancyHeartRateModel)
  676. .GetCustomAttribute<STableAttribute>()?
  677. .STableName;
  678. var res = await _clientSqlSugar
  679. .Queryable<PregnancyHeartRateModel>()
  680. .AS(tableName)
  681. .Where(i=>i.SerialNumber.Equals(serialNo))
  682. .Where(i => i.Timestamp > DateTime.Now.AddDays(-days))
  683. //.OrderByDescending(i => i.PregnancyHeartRate)
  684. .Select(i =>i.PregnancyHeartRate)
  685. .ToListAsync();
  686. // 心率数据量必须30个以上才进行计算
  687. if (res.Count < 30) return 0;
  688. // 计算众数
  689. var mode = res.GroupBy(n => n)
  690. .OrderByDescending(g => g.Count())
  691. .First()
  692. .Key;
  693. Console.WriteLine("众数是: " + mode);
  694. // 如果有多个众数的情况
  695. var maxCount = res.GroupBy(n => n)
  696. .Max(g => g.Count());
  697. var modes = res.GroupBy(n => n)
  698. .Where(g => g.Count() == maxCount)
  699. .Select(g => g.Key)
  700. .ToList();
  701. // 多个众数,选择最接近平均数或中位数的众数
  702. if (modes.Count>1)
  703. {
  704. // 计算平均值
  705. double average = res.Average();
  706. Console.WriteLine("平均值是: " + average);
  707. // 计算中位数
  708. double median;
  709. int count = res.Count;
  710. var sortedRes = res.OrderBy(n => n).ToList();
  711. if (count % 2 == 0)
  712. {
  713. // 偶数个元素,取中间两个数的平均值
  714. median = (sortedRes[count / 2 - 1] + sortedRes[count / 2]) / 2.0;
  715. }
  716. else
  717. {
  718. // 奇数个元素,取中间的数
  719. median = sortedRes[count / 2];
  720. }
  721. Console.WriteLine("中位数是: " + median);
  722. // 找出最接近平均值的众数
  723. //var closestToAverage = modes.OrderBy(m => Math.Abs(m - average)).First();
  724. //Console.WriteLine("最接近平均值的众数是: " + closestToAverage);
  725. // 找出最接近中位数的众数
  726. var closestToMedian = modes.OrderBy(m => Math.Abs(m - median)).First();
  727. Console.WriteLine("最接近中位数的众数是: " + closestToMedian);
  728. mode = closestToMedian;
  729. }
  730. return mode;
  731. }
  732. /// <summary>
  733. /// 计算个人一般心率
  734. /// </summary>
  735. /// <param name="serialNo"></param>
  736. /// <param name="days"></param>
  737. /// <param name="percentage"></param>
  738. /// <returns></returns>
  739. public async Task<PregnancyCommonHeartRateModel?> InitPregnancyCommonHeartRateModeAsync(string serialNo, int days = 7,int percentage=90)
  740. {
  741. var tableName = typeof(PregnancyHeartRateModel)
  742. .GetCustomAttribute<STableAttribute>()?
  743. .STableName;
  744. var collection = await _clientSqlSugar
  745. .Queryable<PregnancyHeartRateModel>()
  746. .AS(tableName)
  747. .Where(i => i.SerialNumber.Equals(serialNo))
  748. .Where(i => i.Timestamp > DateTime.Now.AddDays(-days))
  749. .OrderByDescending(i => i.Timestamp)
  750. .ToArrayAsync();
  751. var res = collection
  752. .Select(i => i.PregnancyHeartRate).ToList();
  753. // 心率数据量必须30个以上才进行计算
  754. if (res.Count < 30)
  755. {
  756. _logger.LogInformation($"{serialNo} 心率数据不足,无法计算其众数");
  757. return null;
  758. }
  759. #region 计算众数
  760. var mode = res.GroupBy(n => n)
  761. .OrderByDescending(g => g.Count())
  762. .First()
  763. .Key;
  764. Console.WriteLine("众数是: " + mode);
  765. // 如果有多个众数的情况
  766. var maxCount = res.GroupBy(n => n)
  767. .Max(g => g.Count());
  768. var modes = res.GroupBy(n => n)
  769. .Where(g => g.Count() == maxCount)
  770. .Select(g => g.Key)
  771. .ToList();
  772. // 多个众数,选择最接近平均数或中位数的众数
  773. if (modes.Count > 1)
  774. {
  775. // 计算平均值
  776. double average = res.Average();
  777. Console.WriteLine("平均值是: " + average);
  778. // 计算中位数
  779. double median;
  780. int count = res.Count;
  781. var sortedRes = res.OrderBy(n => n).ToList();
  782. if (count % 2 == 0)
  783. {
  784. // 偶数个元素,取中间两个数的平均值
  785. median = (sortedRes[count / 2 - 1] + sortedRes[count / 2]) / 2.0;
  786. }
  787. else
  788. {
  789. // 奇数个元素,取中间的数
  790. median = sortedRes[count / 2];
  791. }
  792. Console.WriteLine("中位数是: " + median);
  793. // 找出最接近平均值的众数
  794. //var closestToAverage = modes.OrderBy(m => Math.Abs(m - average)).First();
  795. //Console.WriteLine("最接近平均值的众数是: " + closestToAverage);
  796. // 找出最接近中位数的众数
  797. var closestToMedian = modes.OrderBy(m => Math.Abs(m - median)).First();
  798. Console.WriteLine("最接近中位数的众数是: " + closestToMedian);
  799. mode = closestToMedian;
  800. }
  801. #endregion
  802. // 计算需要的数量
  803. int requiredCount = (int)(res.Count * 0.8);
  804. // 从原始数据集中获取最接近众数的元素
  805. var closestToModeData = res.OrderBy(n => Math.Abs(n - mode))
  806. .Take(requiredCount)
  807. .ToList();
  808. // 输出新数据集
  809. Console.WriteLine("新数据集: " + string.Join(", ", closestToModeData));
  810. Console.WriteLine("新数据集的数量: " + closestToModeData.Count);
  811. return new PregnancyCommonHeartRateModel()
  812. {
  813. Timestamp = DateTime.Now,
  814. PersonId = collection.First().DeviceKey,
  815. DeviceKey = collection.First().DeviceKey,
  816. SerialNumber = collection.First().SerialNumber,
  817. Mode = mode,
  818. Percentage= percentage,
  819. MaxValue= closestToModeData.Max(),
  820. MinValue= closestToModeData.Min(),
  821. OriginalMaxValue=res.Max(),
  822. OriginalMinValue= res.Min(),
  823. CreateTime = DateTime.Now,
  824. StatStartTime = collection.OrderBy(i=>i.Timestamp).Select(i=>i.Timestamp).First(),
  825. StatEndTime= collection.OrderBy(i => i.Timestamp).Select(i => i.Timestamp).Last(),
  826. };
  827. }
  828. #endregion
  829. }
  830. }