@@ -6,17 +6,19 @@ | |||||
/// 数据服务Host Url | /// 数据服务Host Url | ||||
/// </summary> | /// </summary> | ||||
public string TelpoDataUrl { get; set; } = default!; | public string TelpoDataUrl { get; set; } = default!; | ||||
///// <summary> | |||||
///// Kafka服务地址 | |||||
///// </summary> | |||||
//public string MqServerAddress { get; set; } | |||||
///// <summary> | |||||
///// 服务守护消息kafka服务地址 | |||||
///// </summary> | |||||
//public string ServiceGuardMqAddress { get; set; } | |||||
///// <summary> | |||||
///// 服务守护消息主题 | |||||
///// </summary> | |||||
//public string ServiceGuardMqTopic { get; set; } | |||||
} | |||||
public string EtcdServerAddress { get; set; } = default!; | |||||
///// <summary> | |||||
///// Kafka服务地址 | |||||
///// </summary> | |||||
//public string MqServerAddress { get; set; } | |||||
///// <summary> | |||||
///// 服务守护消息kafka服务地址 | |||||
///// </summary> | |||||
//public string ServiceGuardMqAddress { get; set; } | |||||
///// <summary> | |||||
///// 服务守护消息主题 | |||||
///// </summary> | |||||
//public string ServiceGuardMqTopic { get; set; } | |||||
} | |||||
} | } |
@@ -6,11 +6,13 @@ using Microsoft.EntityFrameworkCore.Metadata.Internal; | |||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
using Microsoft.Extensions.Options; | using Microsoft.Extensions.Options; | ||||
using Newtonsoft.Json; | using Newtonsoft.Json; | ||||
using Newtonsoft.Json.Linq; | |||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Linq; | using System.Linq; | ||||
using System.Text; | using System.Text; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using System.Xml.Linq; | |||||
using TDengineDriver; | using TDengineDriver; | ||||
using TDengineDriver.Impl; | using TDengineDriver.Impl; | ||||
@@ -403,6 +405,16 @@ namespace HealthMonitor.Service.Biz.db | |||||
return data.Count.Equals(0)?0:(int)data[0][0]; | return data.Count.Equals(0)?0:(int)data[0][0]; | ||||
} | } | ||||
public async Task<JArray?> GetLastAsync(string tbName, string? condition) | |||||
{ | |||||
var sql = $"SELECT last_row(*) FROM {_configTDengineService.DB}.{tbName} WHERE {condition}"; | |||||
var result = await GernalRestSqlResTextAsync(sql); | |||||
var res = JsonConvert.DeserializeObject<TDengineRestResBase>(result!); | |||||
List<dynamic> data = res?.Data!; | |||||
return data[0] as JArray; | |||||
} | |||||
#endregion | #endregion | ||||
} | } | ||||
@@ -0,0 +1,127 @@ | |||||
using dotnet_etcd; | |||||
using Etcdserverpb; | |||||
using Google.Protobuf; | |||||
using HealthMonitor.Model.Config; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Net.Sockets; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace HealthMonitor.Service.Etcd | |||||
{ | |||||
public class EtcdService | |||||
{ | |||||
private readonly ServiceConfig _configService; | |||||
private readonly ILogger<EtcdService> _logger; | |||||
private readonly EtcdClient _etcdClient; | |||||
public EtcdService(IOptions<ServiceConfig> _optConfigService, ILogger<EtcdService> logger) | |||||
{ | |||||
_configService = _optConfigService.Value; | |||||
_logger = logger; | |||||
_etcdClient = new EtcdClient(_configService.EtcdServerAddress); | |||||
} | |||||
public async Task<string> GetValAsync(string key) | |||||
{ | |||||
return await _etcdClient.GetValAsync(key); | |||||
} | |||||
public async Task<RangeResponse> GetResponseAsync(string key) | |||||
{ | |||||
return await _etcdClient.GetAsync(key); | |||||
} | |||||
public async Task<RangeResponse> GetValWithPrefixAsync(string keyWithPREFIX) | |||||
{ | |||||
return await _etcdClient.GetRangeAsync(keyWithPREFIX); | |||||
} | |||||
// | |||||
public async Task PutValAsync(string key, string value, long? ttl = null, bool? keepalived = false) | |||||
{ | |||||
try | |||||
{ | |||||
PutRequest request = new() | |||||
{ | |||||
Key = ByteString.CopyFromUtf8(key), | |||||
Value = ByteString.CopyFromUtf8(value), | |||||
}; | |||||
if (ttl != null) | |||||
{ | |||||
long leaseId = _etcdClient.LeaseGrant(new LeaseGrantRequest { TTL = (long)ttl }).ID; | |||||
request = new PutRequest | |||||
{ | |||||
Key = ByteString.CopyFromUtf8(key), | |||||
Value = ByteString.CopyFromUtf8(value), | |||||
Lease = leaseId | |||||
}; | |||||
if ((bool)keepalived!) | |||||
{ | |||||
//await _etcdClient.PutAsync(request); | |||||
_ = _etcdClient.LeaseKeepAlive(leaseId, CancellationToken.None); | |||||
//await _etcdClient.LeaseKeepAlive(leaseId, CancellationToken.None); | |||||
request = new PutRequest | |||||
{ | |||||
Key = ByteString.CopyFromUtf8(key), | |||||
Value = ByteString.CopyFromUtf8(value), | |||||
Lease = leaseId | |||||
}; | |||||
//await _etcdClient.PutAsync(request); | |||||
//return; | |||||
} | |||||
} | |||||
await _etcdClient.PutAsync(request); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
_logger.LogInformation($"{nameof(EtcdService)} --- {nameof(PutValAsync)} 出错\n{ex.Message}\n{ex.StackTrace}"); | |||||
} | |||||
// await _etcdClient.PutAsync(key, value,null); | |||||
} | |||||
public async Task PutValAsync(string key, string value, DateTime deadline) | |||||
{ | |||||
PutRequest request = new() | |||||
{ | |||||
Key = ByteString.CopyFromUtf8(key), | |||||
Value = ByteString.CopyFromUtf8(value), | |||||
}; | |||||
await _etcdClient.PutAsync(request, null, deadline); | |||||
// await _etcdClient.PutAsync(key, value,null); | |||||
} | |||||
public async Task DeleteKeyValAsync(string key) | |||||
{ | |||||
await _etcdClient.DeleteAsync(key); | |||||
} | |||||
public async Task DeleteKeyValWithPrefixAsync(string keyWithPREFIX) | |||||
{ | |||||
await _etcdClient.DeleteRangeAsync(keyWithPREFIX); | |||||
} | |||||
public async Task WacthKeysWithPrefixAsync(string keyWithPREFIX, Action<WatchEvent[]> action) | |||||
{ | |||||
await _etcdClient.WatchRangeAsync(keyWithPREFIX, action); | |||||
} | |||||
public async Task WacthKeysWithPrefixResponseAsync(string keyWithPREFIX, Action<WatchResponse> action) | |||||
{ | |||||
await _etcdClient.WatchRangeAsync(keyWithPREFIX, action); | |||||
} | |||||
} | |||||
} |
@@ -7,6 +7,7 @@ | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="dotnet-etcd" Version="6.0.1" /> | |||||
<PackageReference Include="CSRedisCore" Version="3.8.3" /> | <PackageReference Include="CSRedisCore" Version="3.8.3" /> | ||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" /> | <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" /> | ||||
<PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" /> | <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" /> | ||||
@@ -3,6 +3,7 @@ using HealthMonitor.Common; | |||||
using HealthMonitor.Common.helper; | using HealthMonitor.Common.helper; | ||||
using HealthMonitor.Service.Biz.db; | using HealthMonitor.Service.Biz.db; | ||||
using HealthMonitor.Service.Cache; | using HealthMonitor.Service.Cache; | ||||
using HealthMonitor.Service.Etcd; | |||||
using HealthMonitor.Service.Resolver.Interface; | using HealthMonitor.Service.Resolver.Interface; | ||||
using HealthMonitor.Service.Sub; | using HealthMonitor.Service.Sub; | ||||
using HealthMonitor.Service.Sub.Topic.Model; | using HealthMonitor.Service.Sub.Topic.Model; | ||||
@@ -33,10 +34,13 @@ namespace HealthMonitor.Service.Resolver | |||||
private readonly AsyncLocal<string> _messageId = new(); | private readonly AsyncLocal<string> _messageId = new(); | ||||
private readonly AsyncLocal<HisGpsBloodPress> _msgData = new(); | private readonly AsyncLocal<HisGpsBloodPress> _msgData = new(); | ||||
private readonly EtcdService _serviceEtcd; | |||||
public BloodpressResolver( | public BloodpressResolver( | ||||
TDengineService serviceDengine, | TDengineService serviceDengine, | ||||
BloodPressReferenceValueCacheManager bpRefValCacheManager, | BloodPressReferenceValueCacheManager bpRefValCacheManager, | ||||
PersonCacheManager personCacheMgr, HttpHelper httpHelper, | PersonCacheManager personCacheMgr, HttpHelper httpHelper, | ||||
EtcdService serviceEtcd, | |||||
ILogger<BloodpressResolver> logger) | ILogger<BloodpressResolver> logger) | ||||
{ | { | ||||
_httpHelper = httpHelper; | _httpHelper = httpHelper; | ||||
@@ -44,6 +48,7 @@ namespace HealthMonitor.Service.Resolver | |||||
_bpRefValCacheManager = bpRefValCacheManager; | _bpRefValCacheManager = bpRefValCacheManager; | ||||
_logger = logger; | _logger = logger; | ||||
_personCacheMgr = personCacheMgr; | _personCacheMgr = personCacheMgr; | ||||
_serviceEtcd = serviceEtcd; | |||||
} | } | ||||
public void SetResolveInfo(PackageMsgModel msg) | public void SetResolveInfo(PackageMsgModel msg) | ||||
@@ -102,12 +107,16 @@ namespace HealthMonitor.Service.Resolver | |||||
var systolicRefValue = bpRef?.Systolic;//? | var systolicRefValue = bpRef?.Systolic;//? | ||||
var diastolicRefValue = bpRef?.Diastolic;//? | var diastolicRefValue = bpRef?.Diastolic;//? | ||||
int duration = 2; | |||||
int duration = 7; | |||||
// 获取历史数据 | // 获取历史数据 | ||||
//DateTime now = DateTime.Now; | |||||
DateTime now = (DateTime)bp.LastUpdate!; //测试 | |||||
DateTime startTime = now.AddDays(-duration); | |||||
DateTime endTime = now; | |||||
////DateTime now = DateTime.Now; | |||||
//DateTime now = (DateTime)bp.LastUpdate!; //测试 | |||||
//DateTime startTime = now.AddDays(-duration); | |||||
//DateTime endTime = now; | |||||
DateTime endTime = (DateTime)bp.LastUpdate!; //测试 | |||||
DateTime startTime = endTime.AddDays(-duration); | |||||
// | // | ||||
var systolicAggregate = await _serviceTDengine.GetAggregateValueAsync("systolic_value", "stb_hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'"); | var systolicAggregate = await _serviceTDengine.GetAggregateValueAsync("systolic_value", "stb_hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'"); | ||||
@@ -194,43 +203,120 @@ namespace HealthMonitor.Service.Resolver | |||||
_serviceTDengine.ExecuteInsertSQL(sql); | _serviceTDengine.ExecuteInsertSQL(sql); | ||||
// 发送到 设置设备血压标定参数 | // 发送到 设置设备血压标定参数 | ||||
var url = $"http://id.ssjlai.com/webapi/api/Command/SetBloodPressCalibrationConfig"; | |||||
List<KeyValuePair<string, string>> headers = new() | |||||
#endregion | |||||
// 注册定时下发事件 | |||||
// 获取当前时间 | |||||
//DateTime sNow = DateTime.Now; | |||||
//// 计算距离明天12点的时间间隔 | |||||
//TimeSpan timeUntil = new DateTime(sNow.Year, sNow.Month, sNow.Day, 12, 0, 0) - sNow; | |||||
//// 如果当前时间已经超过了12点,将等待到明天 | |||||
//if (timeUntil < TimeSpan.Zero) | |||||
//{ | |||||
// timeUntil = timeUntil.Add(TimeSpan.FromHours(24)); | |||||
//} | |||||
//var data = new | |||||
//{ | |||||
// imei = bp.Serialno, | |||||
// systolicCalibrationValue = systolicRefValue, //收缩压标定值,值为0 表示不生效 | |||||
// diastolicCalibrationValue = diastolicRefValue, //舒张压标定值,值为0表示不生效 | |||||
// systolicIncValue = systolicInc, //收缩压显示增量,值为0 表示不生效 | |||||
// diastolicIncValue = diastolicInc //舒张压显示增量,值为0 表示不生效 | |||||
//}; | |||||
//var url = $"http://id.ssjlai.com/webapi/api/Command/SetBloodPressCalibrationConfig"; | |||||
//List<KeyValuePair<string, string>> headers = new() | |||||
//{ | |||||
// new KeyValuePair<string, string>("AuthKey", "key1") | |||||
//}; | |||||
//var data = new | |||||
//{ | |||||
// imei = bp.Serialno, | |||||
// systolicCalibrationValue = systolicRefValue, //收缩压标定值,值为0 表示不生效 | |||||
// diastolicCalibrationValue = diastolicRefValue, //舒张压标定值,值为0表示不生效 | |||||
// systolicIncValue = systolicInc, //收缩压显示增量,值为0 表示不生效 | |||||
// diastolicIncValue = diastolicInc //舒张压显示增量,值为0 表示不生效 | |||||
//}; | |||||
//var result = JsonConvert.SerializeObject(data); | |||||
var result = bp.Serialno; | |||||
var key = $"health_moniter/schedule_push/imei/{bp.Serialno}"; | |||||
var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false); | |||||
if (string.IsNullOrWhiteSpace(schedule_push)) | |||||
{ | { | ||||
new KeyValuePair<string, string>("AuthKey", "key1") | |||||
}; | |||||
// 注册首次下推 | |||||
#if DEBUG | |||||
await _serviceEtcd.PutValAsync(key, result, 60*1, false).ConfigureAwait(false); | |||||
#else | |||||
//DateTime sNow = DateTime.Now; | |||||
//// 计算距离19:59:55点的时间间隔 | |||||
//TimeSpan timeUntil = new DateTime(sNow.Year, sNow.Month, sNow.Day, 19, 59, 55) - sNow; | |||||
// 生效 | |||||
if (DateTime.Now.Hour == 1) | |||||
{ | |||||
var data = new | |||||
{ | |||||
imei = bp.Serialno, | |||||
systolicCalibrationValue = systolicRefValue, //收缩压标定值,值为0 表示不生效 | |||||
diastolicCalibrationValue = diastolicRefValue, //舒张压标定值,值为0表示不生效 | |||||
systolicIncValue = systolicInc, //收缩压显示增量,值为0 表示不生效 | |||||
diastolicIncValue = diastolicInc //舒张压显示增量,值为0 表示不生效 | |||||
}; | |||||
var result = await _httpHelper.HttpToPostAsync(url, data, headers).ConfigureAwait(false); | |||||
_logger.LogInformation($"将 {JsonConvert.SerializeObject(data)}发送到 {url} 并且返回 {JsonConvert.SerializeObject(result)}"); | |||||
} | |||||
// 不生效 | |||||
else if (DateTime.Now.Hour == 3) | |||||
{ | |||||
var data = new | |||||
//// 如果当前时间已经超过了12点,将等待到明天 | |||||
//if (timeUntil < TimeSpan.Zero) | |||||
//{ | |||||
// timeUntil = timeUntil.Add(TimeSpan.FromHours(24)); | |||||
//} | |||||
//var ttl = (long)timeUntil.TotalSeconds; | |||||
var interval = 0; | |||||
// 获取当前时间 | |||||
DateTime now = DateTime.Now; | |||||
// 计算距离下一个$interval天后的8点的时间间隔 | |||||
DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 19, 59, 58).AddDays(interval); | |||||
TimeSpan timeUntilNextRun = nextRunTime - now; | |||||
// 如果当前时间已经超过了8点,将等待到明天后的8点 | |||||
if (timeUntilNextRun < TimeSpan.Zero) | |||||
{ | { | ||||
imei = bp.Serialno, | |||||
systolicCalibrationValue = 0, //收缩压标定值,值为0 表示不生效 | |||||
diastolicCalibrationValue = 0, //舒张压标定值,值为0表示不生效 | |||||
systolicIncValue = 0, //收缩压显示增量,值为0 表示不生效 | |||||
diastolicIncValue = 0 //舒张压显示增量,值为0 表示不生效 | |||||
}; | |||||
var result = await _httpHelper.HttpToPostAsync(url, data, headers).ConfigureAwait(false); | |||||
_logger.LogInformation($"将 {JsonConvert.SerializeObject(data)}发送到 {url} 并且返回 {JsonConvert.SerializeObject(result)}"); | |||||
timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1)); | |||||
} | |||||
var ttl = timeUntilNextRun.Seconds; | |||||
await _serviceEtcd.PutValAsync(key, result,ttl, false).ConfigureAwait(false); | |||||
#endif | |||||
} | } | ||||
#endregion | |||||
//// 生效 | |||||
//if (DateTime.Now.Hour == 1) | |||||
//{ | |||||
// var data = new | |||||
// { | |||||
// imei = bp.Serialno, | |||||
// systolicCalibrationValue = systolicRefValue, //收缩压标定值,值为0 表示不生效 | |||||
// diastolicCalibrationValue = diastolicRefValue, //舒张压标定值,值为0表示不生效 | |||||
// systolicIncValue = systolicInc, //收缩压显示增量,值为0 表示不生效 | |||||
// diastolicIncValue = diastolicInc //舒张压显示增量,值为0 表示不生效 | |||||
// }; | |||||
// var result = await _httpHelper.HttpToPostAsync(url, data, headers).ConfigureAwait(false); | |||||
// _logger.LogInformation($"将 {JsonConvert.SerializeObject(data)}发送到 {url} 并且返回 {JsonConvert.SerializeObject(result)}"); | |||||
//} | |||||
//// 不生效 | |||||
//else if (DateTime.Now.Hour == 3) | |||||
//{ | |||||
// var data = new | |||||
// { | |||||
// imei = bp.Serialno, | |||||
// systolicCalibrationValue = 0, //收缩压标定值,值为0 表示不生效 | |||||
// diastolicCalibrationValue = 0, //舒张压标定值,值为0表示不生效 | |||||
// systolicIncValue = 0, //收缩压显示增量,值为0 表示不生效 | |||||
// diastolicIncValue = 0 //舒张压显示增量,值为0 表示不生效 | |||||
// }; | |||||
// var result = await _httpHelper.HttpToPostAsync(url, data, headers).ConfigureAwait(false); | |||||
// _logger.LogInformation($"将 {JsonConvert.SerializeObject(data)}发送到 {url} 并且返回 {JsonConvert.SerializeObject(result)}"); | |||||
//} | |||||
} | } | ||||
@@ -189,7 +189,7 @@ namespace HealthMonitor.Service.Sub | |||||
consumer.Commit(consumeRes); | consumer.Commit(consumeRes); | ||||
//_logger.LogInformation("监听中...."); | //_logger.LogInformation("监听中...."); | ||||
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff},监听中...."); | |||||
// Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff},监听中...."); | |||||
} | } | ||||
// close consumer after use.Otherwise will lead memory leak. | // close consumer after use.Otherwise will lead memory leak. | ||||
@@ -0,0 +1,85 @@ | |||||
using HealthMonitor.Common; | |||||
using Newtonsoft.Json; | |||||
namespace HealthMonitor.WebApi.Middleware | |||||
{ | |||||
public class LoggingMiddleware | |||||
{ | |||||
private readonly RequestDelegate _next; | |||||
private readonly ILogger<LoggingMiddleware> _logger; | |||||
public LoggingMiddleware(RequestDelegate next, ILogger<LoggingMiddleware> logger) | |||||
{ | |||||
_next = next; | |||||
_logger = logger; | |||||
} | |||||
public async Task InvokeAsync(HttpContext context) | |||||
{ | |||||
//// 在请求处理之前记录日志 | |||||
//using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = "" })) | |||||
using (new CustomizeStopWatch(nameof(LoggingMiddleware), _logger)) | |||||
{ | |||||
var request = await FormatRequest(context.Request); | |||||
_logger.LogInformation(request); | |||||
var originalBodyStream = context.Response.Body; | |||||
using var responseBody = new MemoryStream(); | |||||
context.Response.Body = responseBody; | |||||
await _next(context); | |||||
var response = await FormatResponse(context.Response); | |||||
_logger.LogInformation(response); | |||||
await responseBody.CopyToAsync(originalBodyStream); | |||||
} | |||||
} | |||||
private async Task<string> FormatRequest(HttpRequest request) | |||||
{ | |||||
request.EnableBuffering(); | |||||
var body = await new StreamReader(request.Body).ReadToEndAsync(); | |||||
var formattedBody = FormatJson(body); | |||||
request.Body.Position = 0; | |||||
return $"请求: {request.Scheme} {request.Host}{request.Path} {request.QueryString} {formattedBody}"; | |||||
} | |||||
private async Task<string> FormatResponse(HttpResponse response) | |||||
{ | |||||
response.Body.Seek(0, SeekOrigin.Begin); | |||||
var body = await new StreamReader(response.Body).ReadToEndAsync(); | |||||
var formattedBody = FormatJson(body); | |||||
response.Body.Seek(0, SeekOrigin.Begin); | |||||
return $"响应: {response.StatusCode}: {formattedBody}"; | |||||
} | |||||
private static string FormatJson(string json) | |||||
{ | |||||
if (string.IsNullOrEmpty(json)) | |||||
{ | |||||
return string.Empty; | |||||
} | |||||
try | |||||
{ | |||||
var obj = JsonConvert.DeserializeObject(json); | |||||
// return JsonConvert.SerializeObject(obj, Formatting.Indented); | |||||
return JsonConvert.SerializeObject(obj); | |||||
} | |||||
catch | |||||
{ | |||||
return json; | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -31,6 +31,8 @@ using Serilog.Core; | |||||
using HealthMonitor.WebApi.HttpLog; | using HealthMonitor.WebApi.HttpLog; | ||||
using Microsoft.Extensions.Http; | using Microsoft.Extensions.Http; | ||||
using Microsoft.Extensions.DependencyInjection.Extensions; | using Microsoft.Extensions.DependencyInjection.Extensions; | ||||
using HealthMonitor.Service.Etcd; | |||||
using HealthMonitor.WebApi.Middleware; | |||||
namespace HealthMonitor.WebApi | namespace HealthMonitor.WebApi | ||||
{ | { | ||||
@@ -189,6 +191,7 @@ namespace HealthMonitor.WebApi | |||||
builder.Services | builder.Services | ||||
.AddSingleton<TDengineDataSubcribe>() | .AddSingleton<TDengineDataSubcribe>() | ||||
.AddSingleton<EtcdService>() | |||||
.AddHostedService<Worker>(); | .AddHostedService<Worker>(); | ||||
#endregion | #endregion | ||||
@@ -278,6 +281,7 @@ namespace HealthMonitor.WebApi | |||||
app.UseAuthorization(); | app.UseAuthorization(); | ||||
app.UseMiddleware<LoggingMiddleware>(); | |||||
app.MapControllers(); | app.MapControllers(); | ||||
@@ -1,6 +1,15 @@ | |||||
using HealthMonitor.Common; | |||||
using dotnet_etcd; | |||||
using Etcdserverpb; | |||||
using HealthMonitor.Common; | |||||
using HealthMonitor.Common.helper; | |||||
using HealthMonitor.Core.Common.Extensions; | using HealthMonitor.Core.Common.Extensions; | ||||
using HealthMonitor.Service.Biz.db; | |||||
using HealthMonitor.Service.Etcd; | |||||
using HealthMonitor.Service.Sub; | using HealthMonitor.Service.Sub; | ||||
using Microsoft.EntityFrameworkCore.Metadata.Internal; | |||||
using Newtonsoft.Json; | |||||
using Newtonsoft.Json.Linq; | |||||
using System.Threading.Channels; | |||||
using TDengineDriver; | using TDengineDriver; | ||||
using TDengineTMQ; | using TDengineTMQ; | ||||
@@ -12,15 +21,21 @@ namespace HealthMonitor.WebApi | |||||
private readonly IServiceProvider _services; | private readonly IServiceProvider _services; | ||||
private readonly TDengineDataSubcribe _tdEngineDataSubcribe; | private readonly TDengineDataSubcribe _tdEngineDataSubcribe; | ||||
private readonly PackageProcess _processor; | private readonly PackageProcess _processor; | ||||
private readonly TDengineService _serviceTDengine; | |||||
private readonly EtcdService _serviceEtcd; | |||||
private readonly HttpHelper _httpHelper = default!; | |||||
private CancellationTokenSource _tokenSource=default!; | private CancellationTokenSource _tokenSource=default!; | ||||
public Worker(ILogger<Worker> logger, IServiceProvider services, PackageProcess processor,TDengineDataSubcribe tdEngineDataSubcribe) | |||||
public Worker(ILogger<Worker> logger, IServiceProvider services, PackageProcess processor,TDengineDataSubcribe tdEngineDataSubcribe, TDengineService serviceDengine, HttpHelper httpHelper, EtcdService serviceEtcd) | |||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
_tdEngineDataSubcribe = tdEngineDataSubcribe; | _tdEngineDataSubcribe = tdEngineDataSubcribe; | ||||
_services = services; | _services = services; | ||||
_processor = processor; | _processor = processor; | ||||
_serviceEtcd = serviceEtcd; | |||||
_serviceTDengine = serviceDengine; | |||||
_httpHelper = httpHelper; | |||||
} | } | ||||
public override Task StartAsync(CancellationToken cancellationToken) | public override Task StartAsync(CancellationToken cancellationToken) | ||||
@@ -53,7 +68,7 @@ namespace HealthMonitor.WebApi | |||||
_logger.LogInformation("------ResolveAsync"); | _logger.LogInformation("------ResolveAsync"); | ||||
while (!_tokenSource.IsCancellationRequested) | while (!_tokenSource.IsCancellationRequested) | ||||
{ | { | ||||
// | |||||
// | |||||
await _processor.ResolveAsync().ConfigureAwait(false); | await _processor.ResolveAsync().ConfigureAwait(false); | ||||
// await _tdEngineDataSubcribe.ProcessMsg(); | // await _tdEngineDataSubcribe.ProcessMsg(); | ||||
} | } | ||||
@@ -70,9 +85,169 @@ namespace HealthMonitor.WebApi | |||||
} | } | ||||
}, TaskCreationOptions.LongRunning); | }, TaskCreationOptions.LongRunning); | ||||
Task.Run(() => | |||||
_serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents) | |||||
, stoppingToken); | |||||
// watch | |||||
//factory.StartNew(() => | |||||
//{ | |||||
// while (!_tokenSource.IsCancellationRequested) | |||||
// { | |||||
// //_serviceEtcd.WacthKeysWithPrefixAsync($"health_moniter/schedule_push", watchEvents => WatchEvents(watchEvents)); | |||||
// _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents); | |||||
// } | |||||
//}, TaskCreationOptions.LongRunning); | |||||
// _serviceEtcd.WacthKeysWithPrefixResponseAsync($"health_moniter/schedule_push", WatchEvents); | |||||
return Task.Delay(1000, _tokenSource.Token); | return Task.Delay(1000, _tokenSource.Token); | ||||
} | } | ||||
private void WatchEvents(WatchEvent[] response) | |||||
{ | |||||
foreach (WatchEvent e1 in response) | |||||
{ | |||||
// Console.WriteLine($"{nameof(WatchEventsAsync)} --- {e1.Key}:{e1.Value}:{e1.Type}"); | |||||
switch (e1.Type.ToString()) | |||||
{ | |||||
//case "Put": | |||||
// // 获取时间点计算TTL | |||||
// break; | |||||
case "Delete": | |||||
// TTL到了重新计算TTL,下发 | |||||
Console.WriteLine($"--- {e1.Key}:{e1.Value}:{e1.Type}"); | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
private void WatchEvents(WatchResponse response) | |||||
{ | |||||
response.Events.ToList().ForEach(async e => | |||||
{ | |||||
switch (e.Type.ToString()) | |||||
{ | |||||
case "Put": | |||||
// 获取时间点计算TTL | |||||
Console.BackgroundColor = ConsoleColor.Blue; | |||||
Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}"); | |||||
Console.BackgroundColor = ConsoleColor.Black; | |||||
break; | |||||
case "Delete": | |||||
// TTL到了重新计算TTL,下发 | |||||
Console.BackgroundColor = ConsoleColor.Green; | |||||
Console.WriteLine($"--- {e.Type}--{e.Kv.Key.ToStringUtf8()}--{e.Kv.Value.ToStringUtf8()}---{DateTime.Now}"); | |||||
// var key = $"health_moniter/schedule_push/imei/{bp.Serialno}"; | |||||
var key = e.Kv.Key.ToStringUtf8(); | |||||
var imeiDel = key.Split('/')[3]; | |||||
var schedule_push = await _serviceEtcd.GetValAsync(key).ConfigureAwait(false); | |||||
if (string.IsNullOrWhiteSpace(schedule_push)) | |||||
{ | |||||
var startTime=DateTime.Now; | |||||
// 下发增量值 | |||||
#region 定时下发增量值 | |||||
var last= await _serviceTDengine.GetLastAsync("stb_hm_bloodpress_stats_inc", $"serialno='{imeiDel}' order by last_update desc"); | |||||
var ts = last?[0]; | |||||
if (DateTime.TryParse(ts?.ToString(),out DateTime newTs)) | |||||
{ | |||||
// 7 天有效数据 | |||||
if (newTs.AddDays(7) > DateTime.Now) | |||||
{ | |||||
Console.WriteLine(ts); | |||||
var data = new | |||||
{ | |||||
imei = imeiDel, | |||||
systolicCalibrationValue = last?[5], //收缩压标定值,值为0 表示不生效 | |||||
diastolicCalibrationValue = last?[12], //舒张压标定值,值为0表示不生效 | |||||
systolicIncValue = last?[10], //收缩压显示增量,值为0 表示不生效 | |||||
diastolicIncValue = last?[17] //舒张压显示增量,值为0 表示不生效 | |||||
}; | |||||
var str = JsonConvert.SerializeObject(data); | |||||
var url = $"http://id.ssjlai.com/webapi/api/Command/SetBloodPressCalibrationConfig"; | |||||
List<KeyValuePair<string, string>> headers = new() | |||||
{ | |||||
new KeyValuePair<string, string>("AuthKey", "key1") | |||||
}; | |||||
var result = await _httpHelper.HttpToPostAsync(url, data, headers).ConfigureAwait(false); | |||||
_logger.LogInformation($"向{imeiDel}下发增量值数据:{str},响应:{result}"); | |||||
var res=JsonConvert.DeserializeObject(result!) as JToken; | |||||
if (res!["message"]!.ToString().Equals("ok")) | |||||
{ | |||||
// 注册下次下推 | |||||
var endTime = DateTime.Now; | |||||
#if DEBUG1 | |||||
long ttl = (long)((60 * 1000-(endTime-startTime).TotalMilliseconds)/1000); | |||||
await _serviceEtcd.PutValAsync(key, imeiDel,ttl, false).ConfigureAwait(false); | |||||
#else | |||||
// 每$interval天,晚上8点 | |||||
var interval = 1; | |||||
// 获取当前时间 | |||||
DateTime now = DateTime.Now; | |||||
// 计算距离下一个$interval天后的8点的时间间隔 | |||||
DateTime nextRunTime = new DateTime(now.Year, now.Month, now.Day, 20, 0, 0).AddDays(interval); | |||||
TimeSpan timeUntilNextRun = nextRunTime - now; | |||||
// 如果当前时间已经超过了8点,将等待到明天后的8点 | |||||
if (timeUntilNextRun < TimeSpan.Zero) | |||||
{ | |||||
timeUntilNextRun = timeUntilNextRun.Add(TimeSpan.FromDays(1)); | |||||
} | |||||
// var ttl = timeUntilNextRun.TotalMilliseconds; | |||||
long ttl = (long)((timeUntilNextRun.TotalMilliseconds-(endTime-startTime).TotalMilliseconds)/1000); | |||||
await _serviceEtcd.PutValAsync(key, imeiDel, ttl, false).ConfigureAwait(false); | |||||
#endif | |||||
} | |||||
else | |||||
{ | |||||
_logger.LogInformation($"错误响应,没有下推数据"); | |||||
} | |||||
Console.WriteLine(str); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
_logger.LogInformation($"向{imeiDel}准备下推的数据已经失效ts,{ts}"); | |||||
} | |||||
#endregion | |||||
//if (imeiDel.Equals("861281060086216")) | |||||
//{ | |||||
// var result = await _httpHelper.HttpToPostAsync(url, data, headers).ConfigureAwait(false); | |||||
// _logger.LogInformation($"向{imeiDel}下发增量值数据:{str},响应:{result}"); | |||||
// Console.WriteLine(str); | |||||
//} | |||||
Console.BackgroundColor = ConsoleColor.Black; | |||||
} | |||||
break; | |||||
} | |||||
}); | |||||
//if (response.Events.Count == 0) | |||||
//{ | |||||
// Console.WriteLine(response); | |||||
//} | |||||
//else | |||||
//{ | |||||
// Console.WriteLine($"{response.Events[0].Kv.Key.ToStringUtf8()}:{response.Events.Kv.Value.ToStringUtf8()}"); | |||||
//} | |||||
} | |||||
} | } | ||||
} | } |
@@ -1,7 +1,7 @@ | |||||
{ | { | ||||
"AllowedHosts": "*", | "AllowedHosts": "*", | ||||
"TDengineServiceConfig": { | "TDengineServiceConfig": { | ||||
"Host": "172.16.255.180", | |||||
"Host": "192.168.2.121", | |||||
"UserName": "root", | "UserName": "root", | ||||
"Password": "taosdata", | "Password": "taosdata", | ||||
"Token": "cm9vdDp0YW9zZGF0YQ==", | "Token": "cm9vdDp0YW9zZGF0YQ==", | ||||
@@ -11,8 +11,8 @@ | |||||
}, | }, | ||||
"Redis": { | "Redis": { | ||||
//"Server": "172.16.192.26:8090", | //"Server": "172.16.192.26:8090", | ||||
//"Server": "47.116.142.20:8090", // 测试环境 | |||||
"Server": "139.224.254.18:8090", // 正式环境 | |||||
"Server": "47.116.142.20:8090", // 测试环境 | |||||
//"Server": "139.224.254.18:8090", // 正式环境 | |||||
"Password": "telpo#1234", | "Password": "telpo#1234", | ||||
//"Server": "127.0.0.1:6379", | //"Server": "127.0.0.1:6379", | ||||
"DefaultDatabase": 2, | "DefaultDatabase": 2, | ||||
@@ -25,7 +25,8 @@ | |||||
"IdleTimeout": 20000 | "IdleTimeout": 20000 | ||||
}, | }, | ||||
"ServiceConfig": { | "ServiceConfig": { | ||||
"TelpoDataUrl": "https://id.ssjlai.com/data/" | |||||
"TelpoDataUrl": "https://id.ssjlai.com/data/", | |||||
"EtcdServerAddress": "http://192.168.2.121:2379" | |||||
}, | }, | ||||
"ConnectionStrings": { | "ConnectionStrings": { | ||||
//"GpsCard_Connection_String": "server=172.16.192.26;port=3304;database=user_operation_platform;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none", | //"GpsCard_Connection_String": "server=172.16.192.26;port=3304;database=user_operation_platform;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none", | ||||