You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

410 satır
16KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Common.helper;
  3. using HealthMonitor.Model.Config;
  4. using HealthMonitor.Service.Biz.db.Dto;
  5. using Microsoft.EntityFrameworkCore.Metadata.Internal;
  6. using Microsoft.Extensions.Logging;
  7. using Microsoft.Extensions.Options;
  8. using Newtonsoft.Json;
  9. using System;
  10. using System.Collections.Generic;
  11. using System.Linq;
  12. using System.Text;
  13. using System.Threading.Tasks;
  14. using TDengineDriver;
  15. using TDengineDriver.Impl;
  16. namespace HealthMonitor.Service.Biz.db
  17. {
  18. public class TDengineService
  19. {
  20. private readonly ILogger<TDengineService> _logger;
  21. private readonly HttpHelper _httpHelper=default!;
  22. private readonly TDengineServiceConfig _configTDengineService;
  23. public TDengineService(ILogger<TDengineService> logger,
  24. IOptions<TDengineServiceConfig> configTDengineService,
  25. HttpHelper httpHelper
  26. )
  27. {
  28. _logger = logger;
  29. _configTDengineService = configTDengineService.Value;
  30. _httpHelper = httpHelper;
  31. }
  32. public IntPtr Connection()
  33. {
  34. string host = _configTDengineService.Host;
  35. string user = _configTDengineService.UserName;
  36. string db = _configTDengineService.DB;
  37. short port = _configTDengineService.Port;
  38. string password = _configTDengineService.Password;
  39. //#if DEBUG
  40. // //string configDir = "C:/TDengine/cfg";
  41. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  42. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  43. //#endif
  44. IntPtr conn = TDengine.Connect(host, user, password, db, port);
  45. if (conn == IntPtr.Zero)
  46. {
  47. _logger.LogError($"连接 TDengine 失败....");
  48. }
  49. else
  50. {
  51. _logger.LogInformation($"连接 TDengine 成功....");
  52. }
  53. return conn;
  54. }
  55. public void ExecuteSQL(IntPtr conn, string sql)
  56. {
  57. IntPtr res = TDengine.Query(conn, sql);
  58. // Check if query success
  59. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  60. {
  61. Console.Write(sql + " failure, ");
  62. // Get error message while Res is a not null pointer.
  63. if (res != IntPtr.Zero)
  64. {
  65. Console.Write("reason:" + TDengine.Error(res));
  66. }
  67. }
  68. else
  69. {
  70. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  71. //... do something with res ...
  72. // Important: need to free result to avoid memory leak.
  73. TDengine.FreeResult(res);
  74. }
  75. }
  76. public void ExecuteQuerySQL(IntPtr conn, string sql)
  77. {
  78. IntPtr res = TDengine.Query(conn, sql);
  79. // Check if query success
  80. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  81. {
  82. Console.Write(sql + " failure, ");
  83. // Get error message while Res is a not null pointer.
  84. if (res != IntPtr.Zero)
  85. {
  86. Console.Write("reason:" + TDengine.Error(res));
  87. }
  88. }
  89. else
  90. {
  91. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  92. //... do something with res ...
  93. List<TDengineDriver.TDengineMeta> resMeta = LibTaos.GetMeta(res);
  94. List<object> resData = LibTaos.GetData(res);
  95. foreach (var meta in resMeta)
  96. {
  97. _logger.LogInformation("\t|{meta.name} {meta.TypeName()} ({meta.size})\t|", meta.name, meta.TypeName(), meta.size);
  98. }
  99. for (int i = 0; i < resData.Count; i++)
  100. {
  101. _logger.LogInformation($"|{resData[i].ToString()} \t");
  102. if (((i + 1) % resMeta.Count == 0))
  103. {
  104. _logger.LogInformation("");
  105. }
  106. }
  107. // Important: need to free result to avoid memory leak.
  108. TDengine.FreeResult(res);
  109. }
  110. }
  111. public void CheckRes(IntPtr conn, IntPtr res, String errorMsg)
  112. {
  113. if (TDengine.ErrorNo(res) != 0)
  114. {
  115. throw new Exception($"{errorMsg} since: {TDengine.Error(res)}");
  116. }
  117. }
  118. public void ExecuteInsertSQL(string sql)
  119. {
  120. var conn = Connection();
  121. try
  122. {
  123. //sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
  124. // "d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
  125. // "d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
  126. // "d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
  127. //#if DEBUG
  128. // //string configDir = "C:/TDengine/cfg";
  129. // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
  130. // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
  131. //#endif
  132. IntPtr res = TDengine.Query(conn, sql);
  133. CheckRes(conn, res, "failed to insert data");
  134. int affectedRows = TDengine.AffectRows(res);
  135. _logger.LogInformation("affectedRows {affectedRows}" , affectedRows);
  136. TDengine.FreeResult(res);
  137. }
  138. finally
  139. {
  140. TDengine.Close(conn);
  141. }
  142. }
  143. #region TDengine.Connector async query
  144. public void QueryCallback(IntPtr param, IntPtr taosRes, int code)
  145. {
  146. if (code == 0 && taosRes != IntPtr.Zero)
  147. {
  148. FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback);
  149. TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param);
  150. }
  151. else
  152. {
  153. _logger.LogInformation("async query data failed, failed code {code}",code);
  154. }
  155. }
  156. // Iteratively call this interface until "numOfRows" is no greater than 0.
  157. public void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows)
  158. {
  159. if (numOfRows > 0)
  160. {
  161. _logger.LogInformation("{numOfRows} rows async retrieved", numOfRows);
  162. IntPtr pdata = TDengine.GetRawBlock(taosRes);
  163. List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
  164. List<object> dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows);
  165. for (int i = 0; i < metaList.Count; i++)
  166. {
  167. _logger.LogInformation("{0} {1}({2}) \t|", metaList[i].name, metaList[i].type, metaList[i].size);
  168. }
  169. _logger.LogInformation("");
  170. for (int i = 0; i < dataList.Count; i++)
  171. {
  172. if (i != 0 && i % metaList.Count == 0)
  173. {
  174. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  175. }
  176. _logger.LogInformation("{dataList[i]}\t|", dataList[i]);
  177. }
  178. TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param);
  179. }
  180. else
  181. {
  182. if (numOfRows == 0)
  183. {
  184. _logger.LogInformation("async retrieve complete.");
  185. }
  186. else
  187. {
  188. _logger.LogInformation("FetchRawBlockCallback callback error, error code {numOfRows}", numOfRows);
  189. }
  190. TDengine.FreeResult(taosRes);
  191. }
  192. }
  193. public void ExecuteQueryAsync(string sql)
  194. {
  195. var conn = Connection();
  196. QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  197. TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  198. }
  199. //public void ExecuteQuery(string sql)
  200. //{
  201. // var conn = Connection();
  202. // QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  203. // TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero);
  204. //}
  205. public Aggregate GetAggregateValue(string field, string tbName, string? condition)
  206. {
  207. List<int> data = new();
  208. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  209. var conn = Connection();
  210. try
  211. {
  212. IntPtr res = TDengine.Query(conn, sql);
  213. // Check if query success
  214. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  215. {
  216. Console.Write(sql + " failure, ");
  217. // Get error message while Res is a not null pointer.
  218. if (res != IntPtr.Zero)
  219. {
  220. Console.Write("reason:" + TDengine.Error(res));
  221. }
  222. }
  223. else
  224. {
  225. Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res));
  226. //... do something with res ...
  227. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  228. List<object> resData = LibTaos.GetData(res);
  229. foreach (var meta in resMeta)
  230. {
  231. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  232. }
  233. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  234. // Important: need to free result to avoid memory leak.
  235. TDengine.FreeResult(res);
  236. }
  237. }
  238. finally
  239. {
  240. TDengine.Close(conn);
  241. }
  242. return new Aggregate
  243. {
  244. Max = data.Count.Equals(0) ? 0 : data[0],
  245. Min = data.Count.Equals(0) ? 0 : data[1],
  246. };
  247. }
  248. public int GetAvgExceptMaxMinValue(string field, string tbName, string? condition)
  249. {
  250. List<int> data = new();
  251. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  252. var aggregate= GetAggregateValue(field, tbName, condition);
  253. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < {aggregate.Max} and {field} > {aggregate.Min}";
  254. var conn = Connection();
  255. try
  256. {
  257. IntPtr res = TDengine.Query(conn, sqlAvg);
  258. // Check if query success
  259. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  260. {
  261. Console.Write(sqlAvg + " failure, ");
  262. // Get error message while Res is a not null pointer.
  263. if (res != IntPtr.Zero)
  264. {
  265. Console.Write("reason:" + TDengine.Error(res));
  266. }
  267. }
  268. else
  269. {
  270. Console.Write(sqlAvg + " success, {0} rows affected", TDengine.AffectRows(res));
  271. //... do something with res ...
  272. List<TDengineMeta> resMeta = LibTaos.GetMeta(res);
  273. List<object> resData = LibTaos.GetData(res);
  274. foreach (var meta in resMeta)
  275. {
  276. Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|");
  277. }
  278. resData.ForEach(x => data.Add(SafeType.SafeInt(x)));
  279. // Important: need to free result to avoid memory leak.
  280. TDengine.FreeResult(res);
  281. }
  282. }
  283. finally
  284. {
  285. TDengine.Close(conn);
  286. }
  287. return data.Count.Equals(0) ? 0 : data[0];
  288. }
  289. #endregion
  290. #region RestAPI
  291. public async Task<bool> GernalRestSql(string sql)
  292. {
  293. //"http://{server}:{port}/rest/sql/{db}"
  294. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  295. List<KeyValuePair<string, string>> headers = new()
  296. {
  297. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  298. };
  299. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  300. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  301. if (result != null)
  302. {
  303. if (res?.Code == 0)
  304. {
  305. _logger.LogInformation($"{nameof(GernalRestSql)},SQL 语句执行成功|{sql}");
  306. return true;
  307. }
  308. else
  309. {
  310. _logger.LogWarning($"{nameof(GernalRestSql)},SQL 语句执行失败||{sql}");
  311. return false;
  312. }
  313. }
  314. else
  315. {
  316. _logger.LogError($"{nameof(GernalRestSql)},TDengine 服务器IP:{_configTDengineService.Host} 错误,请联系运维人员");
  317. return false;
  318. }
  319. //return res.Code==0;
  320. }
  321. public async Task<string?> GernalRestSqlResTextAsync(string sql)
  322. {
  323. var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}";
  324. List<KeyValuePair<string, string>> headers = new()
  325. {
  326. new KeyValuePair<string, string>("Authorization", "Basic " + _configTDengineService.Token)
  327. };
  328. var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false);
  329. return result;
  330. }
  331. public async Task<Aggregate> GetAggregateValueAsync(string field,string tbName,string? condition)
  332. {
  333. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  334. var result = await GernalRestSqlResTextAsync(sql);
  335. var res = JsonConvert.DeserializeObject<Aggregate>(result!);
  336. List<dynamic> data = res?.Data!;
  337. return new Aggregate
  338. {
  339. Max = data.Count.Equals(0) ? 0 : data[0][0],
  340. Min = data.Count.Equals(0) ? 0 : data[0][1],
  341. };
  342. }
  343. public async Task<int> GetAvgExceptMaxMinValueAsync(string field, string tbName, string? condition)
  344. {
  345. var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}";
  346. var result = await GernalRestSqlResTextAsync(sql);
  347. var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  348. List<dynamic> data = res?.Data!;
  349. var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < { (data.Count.Equals(0)? 0: data[0][0]) } and {field} > {(data.Count.Equals(0) ? 0 : data[0][1])}";
  350. result = await GernalRestSqlResTextAsync(sqlAvg);
  351. res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!);
  352. data = res?.Data!;
  353. return data.Count.Equals(0)?0:(int)data[0][0];
  354. }
  355. #endregion
  356. }
  357. }