Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

398 linhas
16KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Common.helper;
  3. using HealthMonitor.Model.Config;
  4. using HealthMonitor.Service.Biz.db.Dto;
  5. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  6. using Microsoft.Extensions.Logging;
  7. using Microsoft.Extensions.Options;
  8. using Newtonsoft.Json;
  9. using System;
  10. using System.Collections.Generic;
  11. using System.Linq;
  12. using System.Text;
  13. using System.Threading.Tasks;
  14. using TDengineDriver;
  15. using TDengineDriver.Impl;
  16. namespace HealthMonitor.Service.Biz.db
  17. {
  18. public class TDengineService
  19. {
  20. private readonly ILogger<TDengineService> _logger;
  21. private readonly HttpHelper _httpHelper=default!;
  22. private readonly TDengineServiceConfig _configTDengineService;
  23. public TDengineService(ILogger<TDengineService> logger,
  24. IOptions<TDengineServiceConfig> configTDengineService,
  25. HttpHelper httpHelper
  26. )
  27. {
  28. _logger = logger;
  29. _configTDengineService = configTDengineService.Value;
  30. _httpHelper = httpHelper;
  31. }
  32. public IntPtr Connection()
  33. {
  34. string host = _configTDengineService.Host;
  35. string user = _configTDengineService.UserName;
  36. string db = _configTDengineService.DB;
  37. short port = _configTDengineService.Port;
  38. string password = _configTDengineService.Password;
  39. IntPtr conn = TDengine.Connect(host, user, password, db, port);
  40. if (conn == IntPtr.Zero)
  41. {
  42. _logger.LogError($"连接 TDengine 失败....");
  43. }
  44. else
  45. {
  46. _logger.LogInformation($"连接 TDengine 成功....");
  47. }
  48. return conn;
  49. }
  50. public void ExecuteSQL(IntPtr conn, string sql)
  51. {
  52. IntPtr res = TDengine.Query(conn, sql);
  53. // Check if query success
  54. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  55. {
  56. Console.Write(sql + " failure, ");
  57. // Get error message while Res is a not null pointer.
  58. if (res != IntPtr.Zero)
  59. {
  60. Console.Write("reason:" + TDengine.Error(res));
  61. }
  62. }
  63. else
  64. {
  65. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  66. //... do something with res ...
  67. // Important: need to free result to avoid memory leak.
  68. TDengine.FreeResult(res);
  69. }
  70. }
  71. public void ExecuteQuerySQL(IntPtr conn, string sql)
  72. {
  73. IntPtr res = TDengine.Query(conn, sql);
  74. // Check if query success
  75. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  76. {
  77. Console.Write(sql + " failure, ");
  78. // Get error message while Res is a not null pointer.
  79. if (res != IntPtr.Zero)
  80. {
  81. Console.Write("reason:" + TDengine.Error(res));
  82. }
  83. }
  84. else
  85. {
  86. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  87. //... do something with res ...
  88. List<TDengineDriver.TDengineMeta> resMeta = LibTaos.GetMeta(res);
  89. List<object> resData = LibTaos.GetData(res);
  90. foreach (var meta in resMeta)
  91. {
  92. _logger.LogInformation("\t|{meta.name} {meta.TypeName()} ({meta.size})\t|", meta.name, meta.TypeName(), meta.size);
  93. }
  94. for (int i = 0; i < resData.Count; i++)
  95. {
  96. _logger.LogInformation($"|{resData[i].ToString()} \t");
  97. if (((i + 1) % resMeta.Count == 0))
  98. {
  99. _logger.LogInformation("");
  100. }
  101. }
  102. // Important: need to free result to avoid memory leak.
  103. TDengine.FreeResult(res);
  104. }
  105. }
  106. public void CheckRes(IntPtr conn, IntPtr res, String errorMsg)
  107. {
  108. if (TDengine.ErrorNo(res) != 0)
  109. {
  110. throw new Exception($"{errorMsg} since: {TDengine.Error(res)}");
  111. }
  112. }
  113. public void ExecuteInsertSQL(string sql)
  114. {
  115. var conn = Connection();
  116. try
  117. {
  118. //sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
  119. // "d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
  120. // "d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
  121. // "d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
  122. IntPtr res = TDengine.Query(conn, sql);
  123. CheckRes(conn, res, "failed to insert data");
  124. int affectedRows = TDengine.AffectRows(res);
  125. _logger.LogInformation("affectedRows {affectedRows}" , affectedRows);
  126. TDengine.FreeResult(res);
  127. }
  128. finally
  129. {
  130. TDengine.Close(conn);
  131. }
  132. }
  133. #region TDengine.Connector async query
  134. public void QueryCallback(IntPtr param, IntPtr taosRes, int code)
  135. {
  136. if (code == 0 && taosRes != IntPtr.Zero)
  137. {
  138. FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback);
  139. TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param);
  140. }
  141. else
  142. {
  143. _logger.LogInformation("async query data failed, failed code {code}",code);
  144. }
  145. }
  146. // Iteratively call this interface until "numOfRows" is no greater than 0.
  147. public void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows)
  148. {
  149. if (numOfRows > 0)
  150. {
  151. _logger.LogInformation("{numOfRows} rows async retrieved", numOfRows);
  152. IntPtr pdata = TDengine.GetRawBlock(taosRes);
  153. List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
  154. List<object> dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows);
  155. for (int i = 0; i < metaList.Count; i++)
  156. {
  157. _logger.LogInformation("{0} {1}({2}) \t|", metaList[i].name, metaList[i].type, metaList[i].size);
  158. }
  159. _logger.LogInformation("");
  160. for (int i = 0; i < dataList.Count; i++)
  161. {
  162. if (i != 0 && i % metaList.Count == 0)
  163. {
  164. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  165. }
  166. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  167. }
  168. TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param);
  169. }
  170. else
  171. {
  172. if (numOfRows == 0)
  173. {
  174. _logger.LogInformation("async retrieve complete.");
  175. }
  176. else
  177. {
  178. _logger.LogInformation("FetchRawBlockCallback callback error, error code {numOfRows}", numOfRows);
  179. }
  180. TDengine.FreeResult(taosRes);
  181. }
  182. }
  183. public void ExecuteQueryAsync(string sql)
  184. {
  185. var conn = Connection();
  186. QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  187. TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  188. }
  189. //public void ExecuteQuery(string sql)
  190. //{
  191. // var conn = Connection();
  192. // QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  193. // TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  194. //}
  195. public Aggregate GetAggregateValue(string field, string tbName, string? condition)
  196. {
  197. List<int> data = new();
  198. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  199. var conn = Connection();
  200. try
  201. {
  202. IntPtr res = TDengine.Query(conn, sql);
  203. // Check if query success
  204. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  205. {
  206. Console.Write(sql + " failure, ");
  207. // Get error message while Res is a not null pointer.
  208. if (res != IntPtr.Zero)
  209. {
  210. Console.Write("reason:" + TDengine.Error(res));
  211. }
  212. }
  213. else
  214. {
  215. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  216. //... do something with res ...
  217. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  218. List<object> resData = LibTaos.GetData(res);
  219. foreach (var meta in resMeta)
  220. {
  221. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  222. }
  223. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  224. // Important: need to free result to avoid memory leak.
  225. TDengine.FreeResult(res);
  226. }
  227. }
  228. finally
  229. {
  230. TDengine.Close(conn);
  231. }
  232. return new Aggregate
  233. {
  234. Max = data.Count.Equals(0) ? 0 : data[0],
  235. Min = data.Count.Equals(0) ? 0 : data[1],
  236. };
  237. }
  238. public int GetAvgExceptMaxMinValue(string field, string tbName, string? condition)
  239. {
  240. List<int> data = new();
  241. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  242. var aggregate= GetAggregateValue(field, tbName, condition);
  243. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < {aggregate.Max} and {field} > {aggregate.Min}";
  244. var conn = Connection();
  245. try
  246. {
  247. IntPtr res = TDengine.Query(conn, sqlAvg);
  248. // Check if query success
  249. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  250. {
  251. Console.Write(sqlAvg + " failure, ");
  252. // Get error message while Res is a not null pointer.
  253. if (res != IntPtr.Zero)
  254. {
  255. Console.Write("reason:" + TDengine.Error(res));
  256. }
  257. }
  258. else
  259. {
  260. Console.Write(sqlAvg + " success, {0} rows affected", TDengine.AffectRows(res));
  261. //... do something with res ...
  262. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  263. List<object> resData = LibTaos.GetData(res);
  264. foreach (var meta in resMeta)
  265. {
  266. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  267. }
  268. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  269. // Important: need to free result to avoid memory leak.
  270. TDengine.FreeResult(res);
  271. }
  272. }
  273. finally
  274. {
  275. TDengine.Close(conn);
  276. }
  277. return data.Count.Equals(0) ? 0 : data[0];
  278. }
  279. #endregion
  280. #region RestAPI
  281. public async Task<bool> GernalRestSql(string sql)
  282. {
  283. //"http://{server}:{port}/rest/sql/{db}"
  284. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  285. List<KeyValuePair<string, string>> headers = new()
  286. {
  287. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  288. };
  289. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  290. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  291. if (result != null)
  292. {
  293. if (res?.Code == 0)
  294. {
  295. _logger.LogInformation($"{nameof(GernalRestSql)},SQL 语句执行成功|{sql}");
  296. return true;
  297. }
  298. else
  299. {
  300. _logger.LogWarning($"{nameof(GernalRestSql)},SQL 语句执行失败||{sql}");
  301. return false;
  302. }
  303. }
  304. else
  305. {
  306. _logger.LogError($"{nameof(GernalRestSql)},TDengine 服务器IP:{_configTDengineService.Host} 错误,请联系运维人员");
  307. return false;
  308. }
  309. //return res.Code==0;
  310. }
  311. public async Task<string?> GernalRestSqlResTextAsync(string sql)
  312. {
  313. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  314. List<KeyValuePair<string, string>> headers = new()
  315. {
  316. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  317. };
  318. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  319. return result;
  320. }
  321. public async Task<Aggregate> GetAggregateValueAsync(string field,string tbName,string? condition)
  322. {
  323. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  324. var result = await GernalRestSqlResTextAsync(sql);
  325. var res = JsonConvert.DeserializeObject<Aggregate>(result!);
  326. List<dynamic> data = res?.Data!;
  327. return new Aggregate
  328. {
  329. Max = data.Count.Equals(0) ? 0 : data[0][0],
  330. Min = data.Count.Equals(0) ? 0 : data[0][1],
  331. };
  332. }
  333. public async Task<int> GetAvgExceptMaxMinValueAsync(string field, string tbName, string? condition)
  334. {
  335. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  336. var result = await GernalRestSqlResTextAsync(sql);
  337. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  338. List<dynamic> data = res?.Data!;
  339. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < { (data.Count.Equals(0)? 0: data[0][0]) } and {field} > {(data.Count.Equals(0) ? 0 : data[0][1])}";
  340. result = await GernalRestSqlResTextAsync(sqlAvg);
  341. res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  342. data = res?.Data!;
  343. return data.Count.Equals(0)?0:(int)data[0][0];
  344. }
  345. #endregion
  346. }
  347. }