using HealthMonitor.Common; using HealthMonitor.Common.helper; using HealthMonitor.Model.Config; using HealthMonitor.Service.Biz.db.Dto; using HealthMonitor.Util.Models; using Microsoft.EntityFrameworkCore.Metadata.Internal; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Xml.Linq; using TDengineDriver; using TDengineDriver.Impl; namespace HealthMonitor.Service.Biz.db { public class TDengineService { private readonly ILogger _logger; private readonly HttpHelper _httpHelper=default!; private readonly TDengineServiceConfig _configTDengineService; public TDengineService(ILogger logger, IOptions configTDengineService, HttpHelper httpHelper ) { _logger = logger; _configTDengineService = configTDengineService.Value; _httpHelper = httpHelper; } public IntPtr Connection() { string host = _configTDengineService.Host; string user = _configTDengineService.UserName; string db = _configTDengineService.DB; short port = _configTDengineService.Port; string password = _configTDengineService.Password; //#if DEBUG // //string configDir = "C:/TDengine/cfg"; // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir); // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing"); //#endif IntPtr conn = TDengine.Connect(host, user, password, db, port); if (conn == IntPtr.Zero) { _logger.LogError($"连接 TDengine 失败...."); } else { _logger.LogInformation($"连接 TDengine 成功...."); } return conn; } public void ExecuteSQL(IntPtr conn, string sql) { IntPtr res = TDengine.Query(conn, sql); // Check if query success if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0)) { Console.Write(sql + " failure, "); // Get error message while Res is a not null pointer. if (res != IntPtr.Zero) { Console.Write("reason:" + TDengine.Error(res)); } } else { Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res)); //... do something with res ... // Important: need to free result to avoid memory leak. TDengine.FreeResult(res); } } public void ExecuteQuerySQL(IntPtr conn, string sql) { IntPtr res = TDengine.Query(conn, sql); // Check if query success if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0)) { Console.Write(sql + " failure, "); // Get error message while Res is a not null pointer. if (res != IntPtr.Zero) { Console.Write("reason:" + TDengine.Error(res)); } } else { Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res)); //... do something with res ... List resMeta = LibTaos.GetMeta(res); List resData = LibTaos.GetData(res); foreach (var meta in resMeta) { _logger.LogInformation("\t|{meta.name} {meta.TypeName()} ({meta.size})\t|", meta.name, meta.TypeName(), meta.size); } for (int i = 0; i < resData.Count; i++) { _logger.LogInformation($"|{resData[i].ToString()} \t"); if (((i + 1) % resMeta.Count == 0)) { _logger.LogInformation(""); } } // Important: need to free result to avoid memory leak. TDengine.FreeResult(res); } } public void CheckRes(IntPtr conn, IntPtr res, String errorMsg) { if (TDengine.ErrorNo(res) != 0) { throw new Exception($"{errorMsg} since: {TDengine.Error(res)}"); } } public void ExecuteInsertSQL(string sql) { var conn = Connection(); try { //sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " + // "d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " + // "d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " + // "d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)"; //#if DEBUG // //string configDir = "C:/TDengine/cfg"; // //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir); // TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing"); //#endif _logger.LogInformation($"Insert SQL: {sql}"); IntPtr res = TDengine.Query(conn, sql); CheckRes(conn, res, "failed to insert data"); int affectedRows = TDengine.AffectRows(res); _logger.LogInformation("affectedRows {affectedRows}" , affectedRows); TDengine.FreeResult(res); } finally { TDengine.Close(conn); } } #region TDengine.Connector async query public void QueryCallback(IntPtr param, IntPtr taosRes, int code) { if (code == 0 && taosRes != IntPtr.Zero) { FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback); TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param); } else { _logger.LogInformation("async query data failed, failed code {code}",code); } } // Iteratively call this interface until "numOfRows" is no greater than 0. public void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows) { if (numOfRows > 0) { _logger.LogInformation("{numOfRows} rows async retrieved", numOfRows); IntPtr pdata = TDengine.GetRawBlock(taosRes); List metaList = TDengine.FetchFields(taosRes); List dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows); for (int i = 0; i < metaList.Count; i++) { _logger.LogInformation("{0} {1}({2}) \t|", metaList[i].name, metaList[i].type, metaList[i].size); } _logger.LogInformation(""); for (int i = 0; i < dataList.Count; i++) { if (i != 0 && i % metaList.Count == 0) { _logger.LogInformation("{dataList[i]}\t|", dataList[i]); } _logger.LogInformation("{dataList[i]}\t|", dataList[i]); } TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param); } else { if (numOfRows == 0) { _logger.LogInformation("async retrieve complete."); } else { _logger.LogInformation("FetchRawBlockCallback callback error, error code {numOfRows}", numOfRows); } TDengine.FreeResult(taosRes); } } public void ExecuteQueryAsync(string sql) { var conn = Connection(); QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback); TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero); } //public void ExecuteQuery(string sql) //{ // var conn = Connection(); // QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback); // TDengine.QueryAsync(conn, sql, queryAsyncCallback, IntPtr.Zero); //} public Aggregate GetAggregateValue(string field, string tbName, string? condition) { List data = new(); var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var conn = Connection(); try { IntPtr res = TDengine.Query(conn, sql); // Check if query success if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0)) { Console.Write(sql + " failure, "); // Get error message while Res is a not null pointer. if (res != IntPtr.Zero) { Console.Write("reason:" + TDengine.Error(res)); } } else { Console.Write(sql + " success, {0} rows affected", TDengine.AffectRows(res)); //... do something with res ... List resMeta = LibTaos.GetMeta(res); List resData = LibTaos.GetData(res); foreach (var meta in resMeta) { Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|"); } resData.ForEach(x => data.Add(SafeType.SafeInt(x))); // Important: need to free result to avoid memory leak. TDengine.FreeResult(res); } } finally { TDengine.Close(conn); } return new Aggregate { Max = data.Count.Equals(0) ? 0 : data[0], Min = data.Count.Equals(0) ? 0 : data[1], }; } public int GetAvgExceptMaxMinValue(string field, string tbName, string? condition) { List data = new(); var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var aggregate= GetAggregateValue(field, tbName, condition); var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < {aggregate.Max} and {field} > {aggregate.Min}"; var conn = Connection(); try { IntPtr res = TDengine.Query(conn, sqlAvg); // Check if query success if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0)) { Console.Write(sqlAvg + " failure, "); // Get error message while Res is a not null pointer. if (res != IntPtr.Zero) { Console.Write("reason:" + TDengine.Error(res)); } } else { Console.Write(sqlAvg + " success, {0} rows affected", TDengine.AffectRows(res)); //... do something with res ... List resMeta = LibTaos.GetMeta(res); List resData = LibTaos.GetData(res); foreach (var meta in resMeta) { Console.Write($"\t|{meta.name} {meta.TypeName()} ({meta.size})\t|"); } resData.ForEach(x => data.Add(SafeType.SafeInt(x))); // Important: need to free result to avoid memory leak. TDengine.FreeResult(res); } } finally { TDengine.Close(conn); } return data.Count.Equals(0) ? 0 : data[0]; } #endregion #region RestAPI public async Task ExecuteQuerySQLRestResponse(string sql) { var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}"; List> headers = new() { new KeyValuePair("Authorization", "Basic " + _configTDengineService.Token) }; var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false); return result; } public async Task ExecuteSelectRestResponseAsync( string tbName, string condition="1", string field = "*") { var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}"; var sql = $"SELECT {field} FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; List> headers = new() { new KeyValuePair("Authorization", "Basic " + _configTDengineService.Token) }; var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false); return result; } public async Task GernalRestSql(string sql) { //"http://{server}:{port}/rest/sql/{db}" var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}"; List> headers = new() { new KeyValuePair("Authorization", "Basic " + _configTDengineService.Token) }; var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false); var res = JsonConvert.DeserializeObject(result!); if (result != null) { if (res?.Code == 0) { _logger.LogInformation($"{nameof(GernalRestSql)},SQL 语句执行成功|{sql}"); return true; } else { _logger.LogWarning($"{nameof(GernalRestSql)},SQL 语句执行失败||{sql}"); return false; } } else { _logger.LogError($"{nameof(GernalRestSql)},TDengine 服务器IP:{_configTDengineService.Host} 错误,请联系运维人员"); return false; } //return res.Code==0; } public async Task GernalRestSqlResTextAsync(string sql) { _logger.LogInformation($"SQL: {nameof(GernalRestSqlResTextAsync)}--{sql}"); var url = $"http://{_configTDengineService.Host}:{_configTDengineService.RestPort}/rest/sql/{_configTDengineService.DB}"; List> headers = new() { new KeyValuePair("Authorization", "Basic " + _configTDengineService.Token) }; var result = await _httpHelper.HttpToPostAsync(url, sql, headers).ConfigureAwait(false); return result; } /// /// 最大值,最小值聚合 /// /// /// /// /// public async Task GetAggregateValueAsync(string field,string tbName,string? condition) { var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var result = await GernalRestSqlResTextAsync(sql); var res = JsonConvert.DeserializeObject(result!); List data = res?.Data!; return new Aggregate { Max = data.Count.Equals(0) ? 0 : data[0][0], Min = data.Count.Equals(0) ? 0 : data[0][1], }; } /// /// 去除最大值和最小值后的平均值 /// /// /// /// /// public async Task GetAvgExceptMaxMinValueAsync(string field, string tbName, string? condition) { var sql = $"SELECT MAX({field}), MIN({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var result = await GernalRestSqlResTextAsync(sql); var res = JsonConvert.DeserializeObject(result!); _logger.LogInformation($"最大小值:{sql}"); List data = res?.Data!; var sqlAvg = $"SELECT AVG({field}) FROM {_configTDengineService.DB}.{tbName} WHERE {condition} AND {field} < { (data.Count.Equals(0)? 0: data[0][0]) } and {field} > {(data.Count.Equals(0) ? 0 : data[0][1])}"; result = await GernalRestSqlResTextAsync(sqlAvg); _logger.LogInformation($"sqlAvg:{sqlAvg}"); res = JsonConvert.DeserializeObject(result!); data = res?.Data!; return data.Count.Equals(0)?0:(int)data[0][0]; } /// /// 获取最后的记录 /// /// /// /// public async Task GetLastAsync(string tbName, string? condition) { var sql = $"SELECT last_row(*) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var result = await GernalRestSqlResTextAsync(sql); var res = JsonConvert.DeserializeObject(result!); if(res?.Data?.Count==0) return null; List data = res?.Data!; return data[0] as JArray; } public async Task GetCount(string tbName, string? condition) { var sql = $"SELECT count(ts) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; var result = await GernalRestSqlResTextAsync(sql); var res = JsonConvert.DeserializeObject(result!); List data = res?.Data!; return SafeType.SafeInt(data[0][0]); } #endregion /// /// 平均值算法(去除最大值,最小值和大于标定值的平均值) /// /// /// /// /// /// public static decimal AverageAfterRemovingOneMinMaxRef(List collection, int max, int min,int refValue) { collection.Remove(max); collection.Remove(min); collection.RemoveAll(_ => _ > refValue); if (collection.Count < 2) { throw new ArgumentException($"数据集{collection.ToArray()},去掉一个最大值 {max}和一个最小值{min},异常值(大于标定值{refValue}),后数据值不足"); } return (decimal)collection.Average(x => x); //var values = ParseData.Select(valueSelector).ToList(); //collection = values.Select(i => (int)i).ToArray(); //if (values.Count <= 2) //{ // throw new ArgumentException("Not enough elements to remove."); //} //// Remove the specified number of minimum and maximum values ////values.RemoveAll(_ => _ == values.Min()); ////values.RemoveAll(_ => _ == values.Max()); //max = (int)values.Max(); //min = (int)values.Min(); //values.Remove(max); //values.Remove(min); //// Remove values less than the specified threshold //values.RemoveAll(_ => _ > numToRemove); //// Calculate and return the average of the remaining values //return values.Average(); } } }