@@ -1,7 +0,0 @@ | |||
namespace HealthMonitor.Common | |||
{ | |||
public class Class1 | |||
{ | |||
} | |||
} |
@@ -11,4 +11,8 @@ | |||
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\HealthMonitor.Model\HealthMonitor.Model.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,19 @@ | |||
using HealthMonitor.Model.Enum; | |||
namespace HealthMonitor.Common | |||
{ | |||
public class MessageTypeUtils | |||
{ | |||
public static string TryToDescription(int type) | |||
{ | |||
//if (type >= (int)TopicType.Property) | |||
//{ | |||
// return ((MessageType)type).ToDescription(); | |||
//} | |||
//else return "Unknown"; | |||
return "Unknown"; | |||
} | |||
} | |||
} |
@@ -1,7 +0,0 @@ | |||
namespace HealthMonitor.Model | |||
{ | |||
public class Class1 | |||
{ | |||
} | |||
} |
@@ -0,0 +1,15 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.ComponentModel; | |||
using System.Linq; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace HealthMonitor.Model.Enum | |||
{ | |||
public enum MessageType | |||
{ | |||
[Description(nameof(BloodPressInfo))] | |||
BloodPressInfo, | |||
} | |||
} |
@@ -0,0 +1,13 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace HealthMonitor.Model.Enum | |||
{ | |||
public enum TopicType | |||
{ | |||
Property = 0x200, | |||
} | |||
} |
@@ -76,14 +76,14 @@ namespace HealthMonitor.Service.Resolver | |||
//验证这个信息是否存在 | |||
if (person == null || person?.Person.BornDate == null) | |||
{ | |||
_logger.LogWarning("验证这个信息是否存在"); | |||
_logger.LogWarning("验证个人信息,找不到个人信息,跳过此消息"); | |||
return; | |||
} | |||
// 验证年龄是否在范围 (2 - 120) | |||
var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!); | |||
if (age < 1 || age > 120) | |||
{ | |||
_logger.LogWarning("验证年龄是否在范围 (2 - 120)"); | |||
_logger.LogWarning("验证年龄,不在范围 (2 - 120)岁,跳过此消息"); | |||
return; | |||
} | |||
#endregion | |||
@@ -20,9 +20,21 @@ namespace HealthMonitor.Service.Sub | |||
_resolver = resolver; | |||
} | |||
public IResolver? GetMsgResolver() | |||
{ | |||
TryDequeue(out var msg); | |||
//public IResolver? GetMsgResolver() | |||
//{ | |||
// TryDequeue(out var msg); | |||
// if (msg == null) | |||
// { | |||
// return null; | |||
// } | |||
// _resolver.SetResolveInfo(msg); | |||
// return _resolver; | |||
//} | |||
public IResolver? GetMsgResolver(PackageMsgModel msg) | |||
{ | |||
//TryDequeue(out var msg); | |||
if (msg == null) | |||
{ | |||
return null; | |||
@@ -38,7 +38,7 @@ namespace HealthMonitor.Service.Sub | |||
public TDengineDataSubcribe( | |||
TDengineService serviceDengine, | |||
TDengineService serviceTDengine, | |||
PersonCacheManager personCacheMgr, | |||
BloodPressReferenceValueCacheManager bpRefValCacheManager, | |||
IResolverFactory resolverFactory, | |||
@@ -47,7 +47,7 @@ namespace HealthMonitor.Service.Sub | |||
ILogger<TDengineDataSubcribe> logger | |||
) | |||
{ | |||
_serviceTDengine = serviceDengine; | |||
_serviceTDengine = serviceTDengine; | |||
_personCacheMgr = personCacheMgr; | |||
_bpRefValCacheManager = bpRefValCacheManager; | |||
_logger = logger; | |||
@@ -80,28 +80,30 @@ namespace HealthMonitor.Service.Sub | |||
public void DoTDengineConnect() | |||
{ | |||
string host = _configTDengineService.Host; | |||
short port = 6030; | |||
string username = _configTDengineService.UserName; | |||
string password = _configTDengineService.Password; | |||
string dbname = _configTDengineService.DB; | |||
//#if DEBUG | |||
// //string configDir = "C:/TDengine/cfg"; | |||
// //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir); | |||
// TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing"); | |||
//#endif | |||
var conn = TDengine.Connect(host, username, password, dbname, port); | |||
if (conn == IntPtr.Zero) | |||
{ | |||
_logger.LogError("reason:{TDengine.Error(conn)}", TDengine.Error(conn)); | |||
} | |||
else | |||
{ | |||
_logger.LogInformation($"连接 TDengine 成功...."); | |||
} | |||
// string host = _configTDengineService.Host; | |||
// short port = 6030; | |||
// string username = _configTDengineService.UserName; | |||
// string password = _configTDengineService.Password; | |||
// string dbname = _configTDengineService.DB; | |||
////#if DEBUG | |||
//// //string configDir = "C:/TDengine/cfg"; | |||
//// //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir); | |||
//// TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing"); | |||
////#endif | |||
// var conn = TDengine.Connect(host, username, password, dbname, port); | |||
// if (conn == IntPtr.Zero) | |||
// { | |||
// _logger.LogError("reason:{TDengine.Error(conn)}", TDengine.Error(conn)); | |||
// } | |||
// else | |||
// { | |||
// _logger.LogInformation($"连接 TDengine 成功...."); | |||
// } | |||
var conn = _serviceTDengine.Connection(); | |||
DoReceive(conn); | |||
} | |||
@@ -137,6 +139,7 @@ namespace HealthMonitor.Service.Sub | |||
if (TDengine.ErrorNo(res) != 0) | |||
{ | |||
_logger.LogError($"create topic failed, reason:{TDengine.Error(res)}"); | |||
throw new Exception($"create topic failed, reason:{TDengine.Error(res)}"); | |||
} | |||
@@ -187,7 +190,8 @@ namespace HealthMonitor.Service.Sub | |||
} | |||
consumer.Commit(consumeRes); | |||
_logger.LogInformation("监听中...."); | |||
//_logger.LogInformation("监听中...."); | |||
Console.WriteLine("监听中...."); | |||
} | |||
// close consumer after use.Otherwise will lead memory leak. | |||
@@ -1,4 +1,5 @@ | |||
using HealthMonitor.Service.Resolver.Interface; | |||
using HealthMonitor.Common; | |||
using HealthMonitor.Service.Resolver.Interface; | |||
using HealthMonitor.Service.Sub; | |||
using HealthMonitor.Service.Sub.Interface; | |||
using System.Collections.Concurrent; | |||
@@ -20,22 +21,29 @@ namespace HealthMonitor.WebApi | |||
public async Task<bool> ResolveAsync() | |||
{ | |||
var resolver = _msgQueueManager.GetMsgResolver(); | |||
try | |||
_msgQueueManager.TryDequeue(out var msg); | |||
if (msg == null) return false; | |||
var resolver = _msgQueueManager.GetMsgResolver(msg!); | |||
using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId })) | |||
using (new CustomizeStopWatch(msg!.Topic, _logger)) | |||
{ | |||
if (resolver != null) | |||
try | |||
{ | |||
await resolver.ExecuteMessageAsync().ConfigureAwait(false); | |||
if (resolver != null) | |||
{ | |||
await resolver.ExecuteMessageAsync().ConfigureAwait(false); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
_logger.LogError($"未处理异常 message: {ex.Message}\n {ex.StackTrace}"); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
_logger.LogError($"未处理异常 message: {ex.Message}\n {ex.StackTrace}"); | |||
} | |||
return true; | |||
} | |||