You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

563 satır
23KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Common.helper;
  3. using HealthMonitor.Model.Config;
  4. using HealthMonitor.Model.Service.Mapper;
  5. using HealthMonitor.Service.Biz.db.Dto;
  6. using HealthMonitor.Util.Models;
  7. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  8. using Microsoft.Extensions.Logging;
  9. using Microsoft.Extensions.Options;
  10. using Newtonsoft.Json;
  11. using Newtonsoft.Json.Linq;
  12. using System;
  13. using System.Collections.Generic;
  14. using System.Linq;
  15. using System.Text;
  16. using System.Threading.Tasks;
  17. using System.Xml.Linq;
  18. using TDengineDriver;
  19. using TDengineDriver.Impl;
  20. namespace HealthMonitor.Service.Biz.db
  21. {
  22. public class TDengineService
  23. {
  24. private readonly ILogger<TDengineService> _logger;
  25. private readonly HttpHelper _httpHelper=default!;
  26. private readonly TDengineServiceConfig _configTDengineService;
  27. public TDengineService(ILogger<TDengineService> logger,
  28. IOptions<TDengineServiceConfig> configTDengineService,
  29. HttpHelper httpHelper
  30. )
  31. {
  32. _logger = logger;
  33. _configTDengineService = configTDengineService.Value;
  34. _httpHelper = httpHelper;
  35. }
  36. public IntPtr Connection()
  37. {
  38. string host = _configTDengineService.Host;
  39. string user = _configTDengineService.UserName;
  40. string db = _configTDengineService.DB;
  41. short port = _configTDengineService.Port;
  42. string password = _configTDengineService.Password;
  43. //#if DEBUG
  44. // //string configDir = "C:/TDengine/cfg";
  45. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  46. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  47. //#endif
  48. IntPtr conn = TDengine.Connect(host, user, password, db, port);
  49. if (conn == IntPtr.Zero)
  50. {
  51. _logger.LogError($"连接 TDengine 失败....");
  52. }
  53. else
  54. {
  55. _logger.LogInformation($"连接 TDengine 成功....");
  56. }
  57. return conn;
  58. }
  59. public void ExecuteSQL(IntPtr conn, string sql)
  60. {
  61. IntPtr res = TDengine.Query(conn, sql);
  62. // Check if query success
  63. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  64. {
  65. Console.Write(sql + " failure, ");
  66. // Get error message while Res is a not null pointer.
  67. if (res != IntPtr.Zero)
  68. {
  69. Console.Write("reason:" + TDengine.Error(res));
  70. }
  71. }
  72. else
  73. {
  74. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  75. //... do something with res ...
  76. // Important: need to free result to avoid memory leak.
  77. TDengine.FreeResult(res);
  78. }
  79. }
  80. public void ExecuteQuerySQL(IntPtr conn, string sql)
  81. {
  82. IntPtr res = TDengine.Query(conn, sql);
  83. // Check if query success
  84. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  85. {
  86. Console.Write(sql + " failure, ");
  87. // Get error message while Res is a not null pointer.
  88. if (res != IntPtr.Zero)
  89. {
  90. Console.Write("reason:" + TDengine.Error(res));
  91. }
  92. }
  93. else
  94. {
  95. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  96. //... do something with res ...
  97. List<TDengineDriver.TDengineMeta> resMeta = LibTaos.GetMeta(res);
  98. List<object> resData = LibTaos.GetData(res);
  99. foreach (var meta in resMeta)
  100. {
  101. _logger.LogInformation("\t|{meta.name} {meta.TypeName()} ({meta.size})\t|", meta.name, meta.TypeName(), meta.size);
  102. }
  103. for (int i = 0; i < resData.Count; i++)
  104. {
  105. _logger.LogInformation($"|{resData[i].ToString()} \t");
  106. if (((i + 1) % resMeta.Count == 0))
  107. {
  108. _logger.LogInformation("");
  109. }
  110. }
  111. // Important: need to free result to avoid memory leak.
  112. TDengine.FreeResult(res);
  113. }
  114. }
  115. public void CheckRes(IntPtr conn, IntPtr res, String errorMsg)
  116. {
  117. if (TDengine.ErrorNo(res) != 0)
  118. {
  119. throw new Exception($"{errorMsg} since: {TDengine.Error(res)}");
  120. }
  121. }
  122. public void ExecuteInsertSQL(string sql)
  123. {
  124. var conn = Connection();
  125. try
  126. {
  127. //sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
  128. // "d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
  129. // "d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
  130. // "d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
  131. //#if DEBUG
  132. // //string configDir = "C:/TDengine/cfg";
  133. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  134. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  135. //#endif
  136. _logger.LogInformation($"Insert SQL: {sql}");
  137. IntPtr res = TDengine.Query(conn, sql);
  138. CheckRes(conn, res, "failed to insert data");
  139. int affectedRows = TDengine.AffectRows(res);
  140. _logger.LogInformation("affectedRows {affectedRows}" , affectedRows);
  141. TDengine.FreeResult(res);
  142. }
  143. finally
  144. {
  145. TDengine.Close(conn);
  146. }
  147. }
  148. #region TDengine.Connector async query
  149. public void QueryCallback(IntPtr param, IntPtr taosRes, int code)
  150. {
  151. if (code == 0 && taosRes != IntPtr.Zero)
  152. {
  153. FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback);
  154. TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param);
  155. }
  156. else
  157. {
  158. _logger.LogInformation("async query data failed, failed code {code}",code);
  159. }
  160. }
  161. // Iteratively call this interface until "numOfRows" is no greater than 0.
  162. public void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows)
  163. {
  164. if (numOfRows > 0)
  165. {
  166. _logger.LogInformation("{numOfRows} rows async retrieved", numOfRows);
  167. IntPtr pdata = TDengine.GetRawBlock(taosRes);
  168. List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
  169. List<object> dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows);
  170. for (int i = 0; i < metaList.Count; i++)
  171. {
  172. _logger.LogInformation("{0} {1}({2}) \t|", metaList[i].name, metaList[i].type, metaList[i].size);
  173. }
  174. _logger.LogInformation("");
  175. for (int i = 0; i < dataList.Count; i++)
  176. {
  177. if (i != 0 && i % metaList.Count == 0)
  178. {
  179. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  180. }
  181. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  182. }
  183. TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param);
  184. }
  185. else
  186. {
  187. if (numOfRows == 0)
  188. {
  189. _logger.LogInformation("async retrieve complete.");
  190. }
  191. else
  192. {
  193. _logger.LogInformation("FetchRawBlockCallback callback error, error code {numOfRows}", numOfRows);
  194. }
  195. TDengine.FreeResult(taosRes);
  196. }
  197. }
  198. public void ExecuteQueryAsync(string sql)
  199. {
  200. var conn = Connection();
  201. QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  202. TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  203. }
  204. //public void ExecuteQuery(string sql)
  205. //{
  206. // var conn = Connection();
  207. // QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  208. // TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  209. //}
  210. public Aggregate GetAggregateValue(string field, string tbName, string? condition)
  211. {
  212. List<int> data = new();
  213. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  214. var conn = Connection();
  215. try
  216. {
  217. IntPtr res = TDengine.Query(conn, sql);
  218. // Check if query success
  219. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  220. {
  221. Console.Write(sql + " failure, ");
  222. // Get error message while Res is a not null pointer.
  223. if (res != IntPtr.Zero)
  224. {
  225. Console.Write("reason:" + TDengine.Error(res));
  226. }
  227. }
  228. else
  229. {
  230. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  231. //... do something with res ...
  232. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  233. List<object> resData = LibTaos.GetData(res);
  234. foreach (var meta in resMeta)
  235. {
  236. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  237. }
  238. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  239. // Important: need to free result to avoid memory leak.
  240. TDengine.FreeResult(res);
  241. }
  242. }
  243. finally
  244. {
  245. TDengine.Close(conn);
  246. }
  247. return new Aggregate
  248. {
  249. Max = data.Count.Equals(0) ? 0 : data[0],
  250. Min = data.Count.Equals(0) ? 0 : data[1],
  251. };
  252. }
  253. public int GetAvgExceptMaxMinValue(string field, string tbName, string? condition)
  254. {
  255. List<int> data = new();
  256. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  257. var aggregate= GetAggregateValue(field, tbName, condition);
  258. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < {aggregate.Max} and {field} > {aggregate.Min}";
  259. var conn = Connection();
  260. try
  261. {
  262. IntPtr res = TDengine.Query(conn, sqlAvg);
  263. // Check if query success
  264. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  265. {
  266. Console.Write(sqlAvg + " failure, ");
  267. // Get error message while Res is a not null pointer.
  268. if (res != IntPtr.Zero)
  269. {
  270. Console.Write("reason:" + TDengine.Error(res));
  271. }
  272. }
  273. else
  274. {
  275. Console.Write(sqlAvg + " success, {0} rows affected", TDengine.AffectRows(res));
  276. //... do something with res ...
  277. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  278. List<object> resData = LibTaos.GetData(res);
  279. foreach (var meta in resMeta)
  280. {
  281. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  282. }
  283. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  284. // Important: need to free result to avoid memory leak.
  285. TDengine.FreeResult(res);
  286. }
  287. }
  288. finally
  289. {
  290. TDengine.Close(conn);
  291. }
  292. return data.Count.Equals(0) ? 0 : data[0];
  293. }
  294. #endregion
  295. #region RestAPI
  296. public async Task<string?> ExecuteQuerySQLRestResponse(string sql)
  297. {
  298. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  299. List<KeyValuePair<string, string>> headers = new()
  300. {
  301. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  302. };
  303. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  304. return result;
  305. }
  306. public async Task<string?> ExecuteSelectRestResponseAsync( string tbName, string condition="1", string field = "*")
  307. {
  308. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  309. var sql = $"SELECT {field} FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  310. List<KeyValuePair<string, string>> headers = new()
  311. {
  312. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  313. };
  314. _logger.LogInformation($"{nameof(ExecuteSelectRestResponseAsync)} --- SQL 语句执行 {sql}");
  315. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  316. return result;
  317. }
  318. public async Task<bool> GernalRestSql(string sql)
  319. {
  320. //"http://{server}:{port}/rest/sql/{db}"
  321. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  322. List<KeyValuePair<string, string>> headers = new()
  323. {
  324. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  325. };
  326. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  327. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  328. if (result != null)
  329. {
  330. if (res?.Code == 0)
  331. {
  332. _logger.LogInformation($"{nameof(GernalRestSql)},SQL 语句执行成功|{sql}");
  333. return true;
  334. }
  335. else
  336. {
  337. _logger.LogWarning($"{nameof(GernalRestSql)},SQL 语句执行失败||{sql}");
  338. return false;
  339. }
  340. }
  341. else
  342. {
  343. _logger.LogError($"{nameof(GernalRestSql)},TDengine 服务器IP:{_configTDengineService.Host} 错误,请联系运维人员");
  344. return false;
  345. }
  346. //return res.Code==0;
  347. }
  348. public async Task<string?> GernalRestSqlResTextAsync(string sql)
  349. {
  350. _logger.LogInformation($"执行 SQL: {nameof(GernalRestSqlResTextAsync)}--{sql}");
  351. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  352. List<KeyValuePair<string, string>> headers = new()
  353. {
  354. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  355. };
  356. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  357. return result;
  358. }
  359. /** 取消
  360. /// <summary>
  361. /// 最大值,最小值聚合
  362. /// </summary>
  363. /// <param name="field"></param>
  364. /// <param name="tbName"></param>
  365. /// <param name="condition"></param>
  366. /// <returns></returns>
  367. public async Task<Aggregate> GetAggregateValueAsync(string field,string tbName,string? condition)
  368. {
  369. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  370. var result = await GernalRestSqlResTextAsync(sql);
  371. var res = JsonConvert.DeserializeObject<Aggregate>(result!);
  372. List<dynamic> data = res?.Data!;
  373. return new Aggregate
  374. {
  375. Max = data.Count.Equals(0) ? 0 : data[0][0],
  376. Min = data.Count.Equals(0) ? 0 : data[0][1],
  377. };
  378. }
  379. /// <summary>
  380. /// 去除最大值和最小值后的平均值
  381. /// </summary>
  382. /// <param name="field"></param>
  383. /// <param name="tbName"></param>
  384. /// <param name="condition"></param>
  385. /// <returns></returns>
  386. public async Task<int> GetAvgExceptMaxMinValueAsync(string field, string tbName, string? condition)
  387. {
  388. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  389. var result = await GernalRestSqlResTextAsync(sql);
  390. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  391. _logger.LogInformation($"最大小值:{sql}");
  392. List<dynamic> data = res?.Data!;
  393. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < { (data.Count.Equals(0)? 0: data[0][0]) } and {field} > {(data.Count.Equals(0) ? 0 : data[0][1])}";
  394. result = await GernalRestSqlResTextAsync(sqlAvg);
  395. _logger.LogInformation($"sqlAvg:{sqlAvg}");
  396. res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  397. data = res?.Data!;
  398. return data.Count.Equals(0)?0:(int)data[0][0];
  399. }
  400. /// <summary>
  401. /// 获取最后的记录
  402. /// </summary>
  403. /// <param name="tbName"></param>
  404. /// <param name="condition"></param>
  405. /// <returns></returns>
  406. public async Task<JArray?> GetLastAsync(string tbName, string? condition)
  407. {
  408. var sql = $"SELECT last_row(*) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  409. var result = await GernalRestSqlResTextAsync(sql);
  410. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  411. if(res?.Data?.Count==0) return null;
  412. List<dynamic> data = res?.Data!;
  413. return data[0] as JArray;
  414. }
  415. public async Task<int> GetCount(string tbName, string? condition)
  416. {
  417. var sql = $"SELECT count(ts) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  418. var result = await GernalRestSqlResTextAsync(sql);
  419. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  420. List<dynamic> data = res?.Data!;
  421. return SafeType.SafeInt(data[0][0]);
  422. }
  423. */
  424. #endregion
  425. /// <summary>
  426. /// 平均值算法(去除最大值,最小值和大于标定值的平均值)
  427. /// </summary>
  428. /// <param name="numToRemove"></param>
  429. /// <param name="collection"></param>
  430. /// <param name="max"></param>
  431. /// <param name="min"></param>
  432. /// <returns></returns>
  433. public static decimal AverageAfterRemovingOneMinMaxRef(List<int> collection, int max, int min,int refValue)
  434. {
  435. collection.Remove(max);
  436. collection.Remove(min);
  437. collection.RemoveAll(_ => _ > refValue);
  438. if (collection.Count < 2)
  439. {
  440. throw new ArgumentException($"数据集{collection.ToArray()},去掉一个最大值 {max}和一个最小值{min},异常值(大于标定值{refValue}),后数据值不足");
  441. }
  442. return (decimal)collection.Average(x => x);
  443. //var values = ParseData.Select(valueSelector).ToList();
  444. //collection = values.Select(i => (int)i).ToArray();
  445. //if (values.Count <= 2)
  446. //{
  447. // throw new ArgumentException("Not enough elements to remove.");
  448. //}
  449. //// Remove the specified number of minimum and maximum values
  450. ////values.RemoveAll(_ => _ == values.Min());
  451. ////values.RemoveAll(_ => _ == values.Max());
  452. //max = (int)values.Max();
  453. //min = (int)values.Min();
  454. //values.Remove(max);
  455. //values.Remove(min);
  456. //// Remove values less than the specified threshold
  457. //values.RemoveAll(_ => _ > numToRemove);
  458. //// Calculate and return the average of the remaining values
  459. //return values.Average();
  460. }
  461. /// <summary>
  462. ///
  463. /// </summary>
  464. /// <param name="systolicRefValue"></param>
  465. /// <param name="hmBpParser"></param>
  466. /// <returns></returns>
  467. public decimal[] AverageAfterRemovingOneMinMaxRef(int systolicRefValue, ParseTDengineRestResponse<BloodPressureModel>? hmBpParser)
  468. {
  469. var sortedList = hmBpParser?.Select(i => i)
  470. .Where(i => i.IsDisplay.Equals(true))
  471. .OrderByDescending(i => i.SystolicValue)
  472. .ThenByDescending(i => i.DiastolicValue)
  473. .ToList();
  474. _logger.LogInformation($"计算时间段排列数据集:{JsonConvert.SerializeObject(sortedList)}");
  475. // 去除最大值和最小值各一个(列表的头和尾)
  476. var trimmedList = sortedList?
  477. .Skip(1)
  478. .Take(sortedList.Count - 2)
  479. .ToList();
  480. _logger.LogInformation($"计算去除最大值和最小值各一个数据集:{JsonConvert.SerializeObject(trimmedList)}");
  481. // 去除异常值
  482. var filteredList = trimmedList?.Where(bp => bp.SystolicValue < SafeType.SafeInt(systolicRefValue!)).ToList();
  483. _logger.LogInformation($"计算除异常值个数据集:{JsonConvert.SerializeObject(filteredList)}");
  484. if (filteredList?.Count < 2)
  485. {
  486. // throw new ArgumentException("数据不够不能计算");
  487. // 平均值为0,说明数据不足,不能计算增量值
  488. return new decimal[] { 0M, 0M };
  489. }
  490. var systolicAvg = filteredList?.Select(bp => bp.SystolicValue).Average();
  491. var diastolicAvg = filteredList?.Select(bp => bp.DiastolicValue).Average();
  492. return new decimal[] { (decimal)systolicAvg!, (decimal)diastolicAvg! };
  493. }
  494. }
  495. }