Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

TDengineService.cs 17KB

1 ano atrás
1 ano atrás
1 ano atrás
1 ano atrás
1 ano atrás
1 ano atrás

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Common.helper;
  3. using HealthMonitor.Model.Config;
  4. using HealthMonitor.Service.Biz.db.Dto;
  5. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  6. using Microsoft.Extensions.Logging;
  7. using Microsoft.Extensions.Options;
  8. using Newtonsoft.Json;
  9. using Newtonsoft.Json.Linq;
  10. using System;
  11. using System.Collections.Generic;
  12. using System.Linq;
  13. using System.Text;
  14. using System.Threading.Tasks;
  15. using System.Xml.Linq;
  16. using TDengineDriver;
  17. using TDengineDriver.Impl;
  18. namespace HealthMonitor.Service.Biz.db
  19. {
  20. public class TDengineService
  21. {
  22. private readonly ILogger<TDengineService> _logger;
  23. private readonly HttpHelper _httpHelper=default!;
  24. private readonly TDengineServiceConfig _configTDengineService;
  25. public TDengineService(ILogger<TDengineService> logger,
  26. IOptions<TDengineServiceConfig> configTDengineService,
  27. HttpHelper httpHelper
  28. )
  29. {
  30. _logger = logger;
  31. _configTDengineService = configTDengineService.Value;
  32. _httpHelper = httpHelper;
  33. }
  34. public IntPtr Connection()
  35. {
  36. string host = _configTDengineService.Host;
  37. string user = _configTDengineService.UserName;
  38. string db = _configTDengineService.DB;
  39. short port = _configTDengineService.Port;
  40. string password = _configTDengineService.Password;
  41. //#if DEBUG
  42. // //string configDir = "C:/TDengine/cfg";
  43. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  44. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  45. //#endif
  46. IntPtr conn = TDengine.Connect(host, user, password, db, port);
  47. if (conn == IntPtr.Zero)
  48. {
  49. _logger.LogError($"连接 TDengine 失败....");
  50. }
  51. else
  52. {
  53. _logger.LogInformation($"连接 TDengine 成功....");
  54. }
  55. return conn;
  56. }
  57. public void ExecuteSQL(IntPtr conn, string sql)
  58. {
  59. IntPtr res = TDengine.Query(conn, sql);
  60. // Check if query success
  61. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  62. {
  63. Console.Write(sql + " failure, ");
  64. // Get error message while Res is a not null pointer.
  65. if (res != IntPtr.Zero)
  66. {
  67. Console.Write("reason:" + TDengine.Error(res));
  68. }
  69. }
  70. else
  71. {
  72. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  73. //... do something with res ...
  74. // Important: need to free result to avoid memory leak.
  75. TDengine.FreeResult(res);
  76. }
  77. }
  78. public void ExecuteQuerySQL(IntPtr conn, string sql)
  79. {
  80. IntPtr res = TDengine.Query(conn, sql);
  81. // Check if query success
  82. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  83. {
  84. Console.Write(sql + " failure, ");
  85. // Get error message while Res is a not null pointer.
  86. if (res != IntPtr.Zero)
  87. {
  88. Console.Write("reason:" + TDengine.Error(res));
  89. }
  90. }
  91. else
  92. {
  93. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  94. //... do something with res ...
  95. List<TDengineDriver.TDengineMeta> resMeta = LibTaos.GetMeta(res);
  96. List<object> resData = LibTaos.GetData(res);
  97. foreach (var meta in resMeta)
  98. {
  99. _logger.LogInformation("\t|{meta.name} {meta.TypeName()} ({meta.size})\t|", meta.name, meta.TypeName(), meta.size);
  100. }
  101. for (int i = 0; i < resData.Count; i++)
  102. {
  103. _logger.LogInformation($"|{resData[i].ToString()} \t");
  104. if (((i + 1) % resMeta.Count == 0))
  105. {
  106. _logger.LogInformation("");
  107. }
  108. }
  109. // Important: need to free result to avoid memory leak.
  110. TDengine.FreeResult(res);
  111. }
  112. }
  113. public void CheckRes(IntPtr conn, IntPtr res, String errorMsg)
  114. {
  115. if (TDengine.ErrorNo(res) != 0)
  116. {
  117. throw new Exception($"{errorMsg} since: {TDengine.Error(res)}");
  118. }
  119. }
  120. public void ExecuteInsertSQL(string sql)
  121. {
  122. var conn = Connection();
  123. try
  124. {
  125. //sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
  126. // "d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
  127. // "d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
  128. // "d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
  129. //#if DEBUG
  130. // //string configDir = "C:/TDengine/cfg";
  131. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  132. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  133. //#endif
  134. IntPtr res = TDengine.Query(conn, sql);
  135. CheckRes(conn, res, "failed to insert data");
  136. int affectedRows = TDengine.AffectRows(res);
  137. _logger.LogInformation("affectedRows {affectedRows}" , affectedRows);
  138. TDengine.FreeResult(res);
  139. }
  140. finally
  141. {
  142. TDengine.Close(conn);
  143. }
  144. }
  145. #region TDengine.Connector async query
  146. public void QueryCallback(IntPtr param, IntPtr taosRes, int code)
  147. {
  148. if (code == 0 && taosRes != IntPtr.Zero)
  149. {
  150. FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback);
  151. TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param);
  152. }
  153. else
  154. {
  155. _logger.LogInformation("async query data failed, failed code {code}",code);
  156. }
  157. }
  158. // Iteratively call this interface until "numOfRows" is no greater than 0.
  159. public void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows)
  160. {
  161. if (numOfRows > 0)
  162. {
  163. _logger.LogInformation("{numOfRows} rows async retrieved", numOfRows);
  164. IntPtr pdata = TDengine.GetRawBlock(taosRes);
  165. List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
  166. List<object> dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows);
  167. for (int i = 0; i < metaList.Count; i++)
  168. {
  169. _logger.LogInformation("{0} {1}({2}) \t|", metaList[i].name, metaList[i].type, metaList[i].size);
  170. }
  171. _logger.LogInformation("");
  172. for (int i = 0; i < dataList.Count; i++)
  173. {
  174. if (i != 0 && i % metaList.Count == 0)
  175. {
  176. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  177. }
  178. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  179. }
  180. TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param);
  181. }
  182. else
  183. {
  184. if (numOfRows == 0)
  185. {
  186. _logger.LogInformation("async retrieve complete.");
  187. }
  188. else
  189. {
  190. _logger.LogInformation("FetchRawBlockCallback callback error, error code {numOfRows}", numOfRows);
  191. }
  192. TDengine.FreeResult(taosRes);
  193. }
  194. }
  195. public void ExecuteQueryAsync(string sql)
  196. {
  197. var conn = Connection();
  198. QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  199. TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  200. }
  201. //public void ExecuteQuery(string sql)
  202. //{
  203. // var conn = Connection();
  204. // QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  205. // TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  206. //}
  207. public Aggregate GetAggregateValue(string field, string tbName, string? condition)
  208. {
  209. List<int> data = new();
  210. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  211. var conn = Connection();
  212. try
  213. {
  214. IntPtr res = TDengine.Query(conn, sql);
  215. // Check if query success
  216. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  217. {
  218. Console.Write(sql + " failure, ");
  219. // Get error message while Res is a not null pointer.
  220. if (res != IntPtr.Zero)
  221. {
  222. Console.Write("reason:" + TDengine.Error(res));
  223. }
  224. }
  225. else
  226. {
  227. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  228. //... do something with res ...
  229. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  230. List<object> resData = LibTaos.GetData(res);
  231. foreach (var meta in resMeta)
  232. {
  233. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  234. }
  235. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  236. // Important: need to free result to avoid memory leak.
  237. TDengine.FreeResult(res);
  238. }
  239. }
  240. finally
  241. {
  242. TDengine.Close(conn);
  243. }
  244. return new Aggregate
  245. {
  246. Max = data.Count.Equals(0) ? 0 : data[0],
  247. Min = data.Count.Equals(0) ? 0 : data[1],
  248. };
  249. }
  250. public int GetAvgExceptMaxMinValue(string field, string tbName, string? condition)
  251. {
  252. List<int> data = new();
  253. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  254. var aggregate= GetAggregateValue(field, tbName, condition);
  255. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < {aggregate.Max} and {field} > {aggregate.Min}";
  256. var conn = Connection();
  257. try
  258. {
  259. IntPtr res = TDengine.Query(conn, sqlAvg);
  260. // Check if query success
  261. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  262. {
  263. Console.Write(sqlAvg + " failure, ");
  264. // Get error message while Res is a not null pointer.
  265. if (res != IntPtr.Zero)
  266. {
  267. Console.Write("reason:" + TDengine.Error(res));
  268. }
  269. }
  270. else
  271. {
  272. Console.Write(sqlAvg + " success, {0} rows affected", TDengine.AffectRows(res));
  273. //... do something with res ...
  274. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  275. List<object> resData = LibTaos.GetData(res);
  276. foreach (var meta in resMeta)
  277. {
  278. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  279. }
  280. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  281. // Important: need to free result to avoid memory leak.
  282. TDengine.FreeResult(res);
  283. }
  284. }
  285. finally
  286. {
  287. TDengine.Close(conn);
  288. }
  289. return data.Count.Equals(0) ? 0 : data[0];
  290. }
  291. #endregion
  292. #region RestAPI
  293. public async Task<bool> GernalRestSql(string sql)
  294. {
  295. //"http://{server}:{port}/rest/sql/{db}"
  296. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  297. List<KeyValuePair<string, string>> headers = new()
  298. {
  299. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  300. };
  301. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  302. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  303. if (result != null)
  304. {
  305. if (res?.Code == 0)
  306. {
  307. _logger.LogInformation($"{nameof(GernalRestSql)},SQL 语句执行成功|{sql}");
  308. return true;
  309. }
  310. else
  311. {
  312. _logger.LogWarning($"{nameof(GernalRestSql)},SQL 语句执行失败||{sql}");
  313. return false;
  314. }
  315. }
  316. else
  317. {
  318. _logger.LogError($"{nameof(GernalRestSql)},TDengine 服务器IP:{_configTDengineService.Host} 错误,请联系运维人员");
  319. return false;
  320. }
  321. //return res.Code==0;
  322. }
  323. public async Task<string?> GernalRestSqlResTextAsync(string sql)
  324. {
  325. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  326. List<KeyValuePair<string, string>> headers = new()
  327. {
  328. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  329. };
  330. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  331. return result;
  332. }
  333. public async Task<Aggregate> GetAggregateValueAsync(string field,string tbName,string? condition)
  334. {
  335. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  336. var result = await GernalRestSqlResTextAsync(sql);
  337. var res = JsonConvert.DeserializeObject<Aggregate>(result!);
  338. List<dynamic> data = res?.Data!;
  339. return new Aggregate
  340. {
  341. Max = data.Count.Equals(0) ? 0 : data[0][0],
  342. Min = data.Count.Equals(0) ? 0 : data[0][1],
  343. };
  344. }
  345. public async Task<int> GetAvgExceptMaxMinValueAsync(string field, string tbName, string? condition)
  346. {
  347. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  348. var result = await GernalRestSqlResTextAsync(sql);
  349. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  350. List<dynamic> data = res?.Data!;
  351. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < { (data.Count.Equals(0)? 0: data[0][0]) } and {field} > {(data.Count.Equals(0) ? 0 : data[0][1])}";
  352. result = await GernalRestSqlResTextAsync(sqlAvg);
  353. res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  354. data = res?.Data!;
  355. return data.Count.Equals(0)?0:(int)data[0][0];
  356. }
  357. public async Task<JArray?> GetLastAsync(string tbName, string? condition)
  358. {
  359. var sql = $"SELECT last_row(*) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  360. var result = await GernalRestSqlResTextAsync(sql);
  361. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  362. List<dynamic> data = res?.Data!;
  363. return data[0] as JArray;
  364. }
  365. #endregion
  366. }
  367. }