Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

519 lines
21KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Common.helper;
  3. using HealthMonitor.Model.Config;
  4. using HealthMonitor.Service.Biz.db.Dto;
  5. using HealthMonitor.Util.Models;
  6. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  7. using Microsoft.Extensions.Logging;
  8. using Microsoft.Extensions.Options;
  9. using Newtonsoft.Json;
  10. using Newtonsoft.Json.Linq;
  11. using System;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Text;
  15. using System.Threading.Tasks;
  16. using System.Xml.Linq;
  17. using TDengineDriver;
  18. using TDengineDriver.Impl;
  19. namespace HealthMonitor.Service.Biz.db
  20. {
  21. public class TDengineService
  22. {
  23. private readonly ILogger<TDengineService> _logger;
  24. private readonly HttpHelper _httpHelper=default!;
  25. private readonly TDengineServiceConfig _configTDengineService;
  26. public TDengineService(ILogger<TDengineService> logger,
  27. IOptions<TDengineServiceConfig> configTDengineService,
  28. HttpHelper httpHelper
  29. )
  30. {
  31. _logger = logger;
  32. _configTDengineService = configTDengineService.Value;
  33. _httpHelper = httpHelper;
  34. }
  35. public IntPtr Connection()
  36. {
  37. string host = _configTDengineService.Host;
  38. string user = _configTDengineService.UserName;
  39. string db = _configTDengineService.DB;
  40. short port = _configTDengineService.Port;
  41. string password = _configTDengineService.Password;
  42. //#if DEBUG
  43. // //string configDir = "C:/TDengine/cfg";
  44. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  45. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  46. //#endif
  47. IntPtr conn = TDengine.Connect(host, user, password, db, port);
  48. if (conn == IntPtr.Zero)
  49. {
  50. _logger.LogError($"连接 TDengine 失败....");
  51. }
  52. else
  53. {
  54. _logger.LogInformation($"连接 TDengine 成功....");
  55. }
  56. return conn;
  57. }
  58. public void ExecuteSQL(IntPtr conn, string sql)
  59. {
  60. IntPtr res = TDengine.Query(conn, sql);
  61. // Check if query success
  62. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  63. {
  64. Console.Write(sql + " failure, ");
  65. // Get error message while Res is a not null pointer.
  66. if (res != IntPtr.Zero)
  67. {
  68. Console.Write("reason:" + TDengine.Error(res));
  69. }
  70. }
  71. else
  72. {
  73. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  74. //... do something with res ...
  75. // Important: need to free result to avoid memory leak.
  76. TDengine.FreeResult(res);
  77. }
  78. }
  79. public void ExecuteQuerySQL(IntPtr conn, string sql)
  80. {
  81. IntPtr res = TDengine.Query(conn, sql);
  82. // Check if query success
  83. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  84. {
  85. Console.Write(sql + " failure, ");
  86. // Get error message while Res is a not null pointer.
  87. if (res != IntPtr.Zero)
  88. {
  89. Console.Write("reason:" + TDengine.Error(res));
  90. }
  91. }
  92. else
  93. {
  94. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  95. //... do something with res ...
  96. List<TDengineDriver.TDengineMeta> resMeta = LibTaos.GetMeta(res);
  97. List<object> resData = LibTaos.GetData(res);
  98. foreach (var meta in resMeta)
  99. {
  100. _logger.LogInformation("\t|{meta.name} {meta.TypeName()} ({meta.size})\t|", meta.name, meta.TypeName(), meta.size);
  101. }
  102. for (int i = 0; i < resData.Count; i++)
  103. {
  104. _logger.LogInformation($"|{resData[i].ToString()} \t");
  105. if (((i + 1) % resMeta.Count == 0))
  106. {
  107. _logger.LogInformation("");
  108. }
  109. }
  110. // Important: need to free result to avoid memory leak.
  111. TDengine.FreeResult(res);
  112. }
  113. }
  114. public void CheckRes(IntPtr conn, IntPtr res, String errorMsg)
  115. {
  116. if (TDengine.ErrorNo(res) != 0)
  117. {
  118. throw new Exception($"{errorMsg} since: {TDengine.Error(res)}");
  119. }
  120. }
  121. public void ExecuteInsertSQL(string sql)
  122. {
  123. var conn = Connection();
  124. try
  125. {
  126. //sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
  127. // "d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
  128. // "d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
  129. // "d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
  130. //#if DEBUG
  131. // //string configDir = "C:/TDengine/cfg";
  132. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  133. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  134. //#endif
  135. IntPtr res = TDengine.Query(conn, sql);
  136. CheckRes(conn, res, "failed to insert data");
  137. int affectedRows = TDengine.AffectRows(res);
  138. _logger.LogInformation("affectedRows {affectedRows}" , affectedRows);
  139. TDengine.FreeResult(res);
  140. }
  141. finally
  142. {
  143. TDengine.Close(conn);
  144. }
  145. }
  146. #region TDengine.Connector async query
  147. public void QueryCallback(IntPtr param, IntPtr taosRes, int code)
  148. {
  149. if (code == 0 && taosRes != IntPtr.Zero)
  150. {
  151. FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback);
  152. TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param);
  153. }
  154. else
  155. {
  156. _logger.LogInformation("async query data failed, failed code {code}",code);
  157. }
  158. }
  159. // Iteratively call this interface until "numOfRows" is no greater than 0.
  160. public void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows)
  161. {
  162. if (numOfRows > 0)
  163. {
  164. _logger.LogInformation("{numOfRows} rows async retrieved", numOfRows);
  165. IntPtr pdata = TDengine.GetRawBlock(taosRes);
  166. List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
  167. List<object> dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows);
  168. for (int i = 0; i < metaList.Count; i++)
  169. {
  170. _logger.LogInformation("{0} {1}({2}) \t|", metaList[i].name, metaList[i].type, metaList[i].size);
  171. }
  172. _logger.LogInformation("");
  173. for (int i = 0; i < dataList.Count; i++)
  174. {
  175. if (i != 0 && i % metaList.Count == 0)
  176. {
  177. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  178. }
  179. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  180. }
  181. TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param);
  182. }
  183. else
  184. {
  185. if (numOfRows == 0)
  186. {
  187. _logger.LogInformation("async retrieve complete.");
  188. }
  189. else
  190. {
  191. _logger.LogInformation("FetchRawBlockCallback callback error, error code {numOfRows}", numOfRows);
  192. }
  193. TDengine.FreeResult(taosRes);
  194. }
  195. }
  196. public void ExecuteQueryAsync(string sql)
  197. {
  198. var conn = Connection();
  199. QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  200. TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  201. }
  202. //public void ExecuteQuery(string sql)
  203. //{
  204. // var conn = Connection();
  205. // QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  206. // TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  207. //}
  208. public Aggregate GetAggregateValue(string field, string tbName, string? condition)
  209. {
  210. List<int> data = new();
  211. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  212. var conn = Connection();
  213. try
  214. {
  215. IntPtr res = TDengine.Query(conn, sql);
  216. // Check if query success
  217. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  218. {
  219. Console.Write(sql + " failure, ");
  220. // Get error message while Res is a not null pointer.
  221. if (res != IntPtr.Zero)
  222. {
  223. Console.Write("reason:" + TDengine.Error(res));
  224. }
  225. }
  226. else
  227. {
  228. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  229. //... do something with res ...
  230. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  231. List<object> resData = LibTaos.GetData(res);
  232. foreach (var meta in resMeta)
  233. {
  234. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  235. }
  236. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  237. // Important: need to free result to avoid memory leak.
  238. TDengine.FreeResult(res);
  239. }
  240. }
  241. finally
  242. {
  243. TDengine.Close(conn);
  244. }
  245. return new Aggregate
  246. {
  247. Max = data.Count.Equals(0) ? 0 : data[0],
  248. Min = data.Count.Equals(0) ? 0 : data[1],
  249. };
  250. }
  251. public int GetAvgExceptMaxMinValue(string field, string tbName, string? condition)
  252. {
  253. List<int> data = new();
  254. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  255. var aggregate= GetAggregateValue(field, tbName, condition);
  256. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < {aggregate.Max} and {field} > {aggregate.Min}";
  257. var conn = Connection();
  258. try
  259. {
  260. IntPtr res = TDengine.Query(conn, sqlAvg);
  261. // Check if query success
  262. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  263. {
  264. Console.Write(sqlAvg + " failure, ");
  265. // Get error message while Res is a not null pointer.
  266. if (res != IntPtr.Zero)
  267. {
  268. Console.Write("reason:" + TDengine.Error(res));
  269. }
  270. }
  271. else
  272. {
  273. Console.Write(sqlAvg + " success, {0} rows affected", TDengine.AffectRows(res));
  274. //... do something with res ...
  275. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  276. List<object> resData = LibTaos.GetData(res);
  277. foreach (var meta in resMeta)
  278. {
  279. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  280. }
  281. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  282. // Important: need to free result to avoid memory leak.
  283. TDengine.FreeResult(res);
  284. }
  285. }
  286. finally
  287. {
  288. TDengine.Close(conn);
  289. }
  290. return data.Count.Equals(0) ? 0 : data[0];
  291. }
  292. #endregion
  293. #region RestAPI
  294. public async Task<string?> ExecuteQuerySQLRestResponse(string sql)
  295. {
  296. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  297. List<KeyValuePair<string, string>> headers = new()
  298. {
  299. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  300. };
  301. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  302. return result;
  303. }
  304. public async Task<string?> ExecuteSelectRestResponseAsync( string tbName, string condition="1", string field = "*")
  305. {
  306. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  307. var sql = $"SELECT {field} FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  308. List<KeyValuePair<string, string>> headers = new()
  309. {
  310. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  311. };
  312. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  313. return result;
  314. }
  315. public async Task<bool> GernalRestSql(string sql)
  316. {
  317. //"http://{server}:{port}/rest/sql/{db}"
  318. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  319. List<KeyValuePair<string, string>> headers = new()
  320. {
  321. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  322. };
  323. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  324. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  325. if (result != null)
  326. {
  327. if (res?.Code == 0)
  328. {
  329. _logger.LogInformation($"{nameof(GernalRestSql)},SQL 语句执行成功|{sql}");
  330. return true;
  331. }
  332. else
  333. {
  334. _logger.LogWarning($"{nameof(GernalRestSql)},SQL 语句执行失败||{sql}");
  335. return false;
  336. }
  337. }
  338. else
  339. {
  340. _logger.LogError($"{nameof(GernalRestSql)},TDengine 服务器IP:{_configTDengineService.Host} 错误,请联系运维人员");
  341. return false;
  342. }
  343. //return res.Code==0;
  344. }
  345. public async Task<string?> GernalRestSqlResTextAsync(string sql)
  346. {
  347. _logger.LogInformation($"SQL: {nameof(GernalRestSqlResTextAsync)}--{sql}");
  348. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  349. List<KeyValuePair<string, string>> headers = new()
  350. {
  351. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  352. };
  353. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  354. return result;
  355. }
  356. /// <summary>
  357. /// 最大值,最小值聚合
  358. /// </summary>
  359. /// <param name="field"></param>
  360. /// <param name="tbName"></param>
  361. /// <param name="condition"></param>
  362. /// <returns></returns>
  363. public async Task<Aggregate> GetAggregateValueAsync(string field,string tbName,string? condition)
  364. {
  365. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  366. var result = await GernalRestSqlResTextAsync(sql);
  367. var res = JsonConvert.DeserializeObject<Aggregate>(result!);
  368. List<dynamic> data = res?.Data!;
  369. return new Aggregate
  370. {
  371. Max = data.Count.Equals(0) ? 0 : data[0][0],
  372. Min = data.Count.Equals(0) ? 0 : data[0][1],
  373. };
  374. }
  375. /// <summary>
  376. /// 去除最大值和最小值后的平均值
  377. /// </summary>
  378. /// <param name="field"></param>
  379. /// <param name="tbName"></param>
  380. /// <param name="condition"></param>
  381. /// <returns></returns>
  382. public async Task<int> GetAvgExceptMaxMinValueAsync(string field, string tbName, string? condition)
  383. {
  384. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  385. var result = await GernalRestSqlResTextAsync(sql);
  386. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  387. _logger.LogInformation($"最大小值:{sql}");
  388. List<dynamic> data = res?.Data!;
  389. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < { (data.Count.Equals(0)? 0: data[0][0]) } and {field} > {(data.Count.Equals(0) ? 0 : data[0][1])}";
  390. result = await GernalRestSqlResTextAsync(sqlAvg);
  391. _logger.LogInformation($"sqlAvg:{sqlAvg}");
  392. res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  393. data = res?.Data!;
  394. return data.Count.Equals(0)?0:(int)data[0][0];
  395. }
  396. /// <summary>
  397. /// 获取最后的记录
  398. /// </summary>
  399. /// <param name="tbName"></param>
  400. /// <param name="condition"></param>
  401. /// <returns></returns>
  402. public async Task<JArray?> GetLastAsync(string tbName, string? condition)
  403. {
  404. var sql = $"SELECT last_row(*) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  405. var result = await GernalRestSqlResTextAsync(sql);
  406. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  407. if(res?.Data?.Count==0) return null;
  408. List<dynamic> data = res?.Data!;
  409. return data[0] as JArray;
  410. }
  411. public async Task<int> GetCount(string tbName, string? condition)
  412. {
  413. var sql = $"SELECT count(ts) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  414. var result = await GernalRestSqlResTextAsync(sql);
  415. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  416. List<dynamic> data = res?.Data!;
  417. return SafeType.SafeInt(data[0][0]);
  418. }
  419. #endregion
  420. /// <summary>
  421. /// 平均值算法(去除最大值,最小值和大于标定值的平均值)
  422. /// </summary>
  423. /// <param name="numToRemove"></param>
  424. /// <param name="collection"></param>
  425. /// <param name="max"></param>
  426. /// <param name="min"></param>
  427. /// <returns></returns>
  428. public static decimal AverageAfterRemovingOneMinMaxRef(List<int> collection, int max, int min,int refValue)
  429. {
  430. collection.Remove(max);
  431. collection.Remove(min);
  432. collection.RemoveAll(_ => _ > refValue);
  433. if (collection.Count < 2)
  434. {
  435. throw new ArgumentException($"数据集{collection.ToArray()},去掉一个最大值 {max}和一个最小值{min},异常值(大于标定值{refValue}),后数据值不足");
  436. }
  437. return (decimal)collection.Average(x => x);
  438. //var values = ParseData.Select(valueSelector).ToList();
  439. //collection = values.Select(i => (int)i).ToArray();
  440. //if (values.Count <= 2)
  441. //{
  442. // throw new ArgumentException("Not enough elements to remove.");
  443. //}
  444. //// Remove the specified number of minimum and maximum values
  445. ////values.RemoveAll(_ => _ == values.Min());
  446. ////values.RemoveAll(_ => _ == values.Max());
  447. //max = (int)values.Max();
  448. //min = (int)values.Min();
  449. //values.Remove(max);
  450. //values.Remove(min);
  451. //// Remove values less than the specified threshold
  452. //values.RemoveAll(_ => _ > numToRemove);
  453. //// Calculate and return the average of the remaining values
  454. //return values.Average();
  455. }
  456. }
  457. }