Переглянути джерело

根据模型创建topic

td_orm
H Vs 1 рік тому
джерело
коміт
2c01f55acc
2 змінених файлів з 55 додано та 400 видалено
  1. +30
    -18
      HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs
  2. +25
    -382
      HealthMonitor.Service/Sub/TDengineDataSubcribe.cs

+ 30
- 18
HealthMonitor.Service/Resolver/Factory/ResolverFactory.cs Переглянути файл

@@ -23,28 +23,40 @@ namespace HealthMonitor.Service.Resolver.Factory

public PackageMsgModel? ParseAndWrap(ReceiveMessageModel msg)
{
var topic=msg.Topic;
switch (topic)
{
case "topic_hm_bp_stats":
//var topic=msg.Topic;


//switch (topic)
//{
// // case "topic_hm_bp_stats":
// case "topichmbloodpress":

// //TopicHmBloodPress bloodPress = JsonConvert.DeserializeObject<TopicHmBloodPress>(msg.Body)!;

//TopicHmBloodPress bloodPress = JsonConvert.DeserializeObject<TopicHmBloodPress>(msg.Body)!;
// return new PackageMsgModel()
// {
// DB= msg.DB,
// MessageId=msg.MessageId,
// Topic= topic,
// DetailData= msg.Body,

return new PackageMsgModel()
{
DB= msg.DB,
MessageId=msg.MessageId,
Topic= topic,
DetailData= msg.Body,
};
// };

// return bloodPress!;
// // return bloodPress!;

// default:
// break;
//}
//return null;

return new PackageMsgModel()
{
DB = msg.DB,
MessageId = msg.MessageId,
Topic = msg.Topic,
DetailData = msg.Body,

default:
break;
}
return null;
};
}
}
}

+ 25
- 382
HealthMonitor.Service/Sub/TDengineDataSubcribe.cs Переглянути файл

@@ -11,6 +11,7 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using TDengineDriver;
@@ -22,16 +23,14 @@ namespace HealthMonitor.Service.Sub
public class TDengineDataSubcribe
{
private readonly ILogger<TDengineDataSubcribe> _logger;
private IConsumer _consumer = default!;
private IntPtr _conn = default!;
private readonly MsgQueueManager _msgQueueManager;
private readonly TDengineService _serviceTDengine;
private readonly PersonCacheManager _personCacheMgr;
private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
private readonly IResolverFactory _resolverFactory;
private CancellationTokenSource _tokenSource = null;
private CancellationTokenSource? _tokenSource = null;

private int cnt = 0;
//private int cnt = 0;

public TDengineDataSubcribe(

@@ -50,7 +49,7 @@ namespace HealthMonitor.Service.Sub
_logger = logger;
_resolverFactory = resolverFactory;
_msgQueueManager = msgQueueManager;
_conn = GetConnection();
}
public void BeginListen(CancellationToken stoppingToken)
{
@@ -103,10 +102,25 @@ namespace HealthMonitor.Service.Sub
MsgWithTableName = "true",
TDConnectIp = "47.116.142.20",
};
string topic = "topic_hm_bp_stats";
//nameof(TopicHmBloodPress)
// string topic = "topic_hm_bp_stats";
string topic = nameof(TopicHmBloodPress).ToLower();
TopicHmBloodPress fields = new();
PropertyInfo[] props = fields.GetType().GetProperties();

// 获取 fields
string attributes = "";
foreach (PropertyInfo prop in props)
{
JsonPropertyAttribute attr = prop.GetCustomAttribute<JsonPropertyAttribute>();
if (attr != null)
{
attributes += attr.PropertyName + ",";
}
}
attributes = attributes.TrimEnd(',');

//create topic
IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");
IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select {attributes} from health_monitor.hm_bloodpress");

if (TDengine.ErrorNo(res) != 0)
{
@@ -120,7 +134,7 @@ namespace HealthMonitor.Service.Sub
// subscribe
consumer.Subscribe(topic);

while (!_tokenSource.IsCancellationRequested)
while (!_tokenSource!.IsCancellationRequested)
{
var consumeRes = consumer.Consume(300);
foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
@@ -163,12 +177,10 @@ namespace HealthMonitor.Service.Sub
}

// close consumer after use.Otherwise will lead memory leak.
_consumer.Close();
TDengine.Close(_conn);
consumer.Close();
TDengine.Close(Connection);
}



public void ParsePackage(ReceiveMessageModel model)
{
var msg = _resolverFactory.ParseAndWrap(model);
@@ -183,374 +195,5 @@ namespace HealthMonitor.Service.Sub

}

public void CreateConnection()
{
var cfg = new ConsumerConfig
{
GourpId = "group_1",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
MsgWithTableName = "true",
TDConnectIp = "47.116.142.20",
};
var conn = GetConnection();


}


// 创建消费者
public void CreateConsumer()
{
var cfg = new ConsumerConfig
{
GourpId = "group_1",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
MsgWithTableName = "true",
TDConnectIp = "47.116.142.20",
};
//IntPtr conn = GetConnection();
string topic = "topic_hm_bp_stats";
//create topic
IntPtr res = TDengine.Query(_conn, $"create topic if not exists {topic} as select * from health_monitor.hm_bloodpress");

if (TDengine.ErrorNo(res) != 0)
{
throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
}

// create consumer
var consumer = new ConsumerBuilder(cfg)
.Build();

// subscribe
consumer.Subscribe(topic);

_consumer = consumer;
}

//public IConsumer CreateConsumer()
//{
// var cfg = new ConsumerConfig
// {
// GourpId = "group_1",
// TDConnectUser = "root",
// TDConnectPasswd = "taosdata",
// MsgWithTableName = "true",
// TDConnectIp = "47.116.142.20",
// };
// var conn = GetConnection();
// //IntPtr conn = GetConnection();
// string topic = "topic_name";
// //create topic
// IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from ctb1");

// if (TDengine.ErrorNo(res) != 0)
// {
// throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
// }

// // create consumer
// var consumer = new ConsumerBuilder(cfg)
// .Build();

// // subscribe
// consumer.Subscribe(topic);

// return consumer;
//}

//public void ProcessMsg()
//{
// var consumerRes = _consumer.Consume(300);
// // process ConsumeResult
// foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
// {
// Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());

// kv.Value.Metas.ForEach(meta =>
// {
// Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
// });
// Console.WriteLine("");
// kv.Value.Datas.ForEach(data =>
// {
// Console.WriteLine(data.ToString());
// });
// }

// _consumer.Commit(consumerRes);
// // Console.WriteLine("\n================ {0} done ");
//}

/**
public async Task ProcessMsg()
{
var consumerRes = _consumer.Consume(300);

Console.WriteLine(consumerRes.Message.Count);
// process ConsumeResult
foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumerRes.Message)
{
//Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());

//kv.Value.Metas.ForEach(meta =>
//{
// Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
//});
//Console.WriteLine("----------");
//kv.Value.Datas.ForEach(data =>
//{
// Console.WriteLine(data.ToString());
//});
//Console.WriteLine("----------");
for (int i = 0; i < kv.Value.Datas.Count; i++)
{
// Console.Write($"|{kv.Value.Datas[i].ToString()} \t");
//Console.WriteLine("{0},{1},{2}", i, resMeta.Count, (i) % resMeta.Count);
if (((i + 1) % kv.Value.Metas.Count == 0))
{

try
{
cnt++;
string bloodpress_id = SafeType.SafeString(kv.Value.Datas[i - 8]);
string message_id = SafeType.SafeString(kv.Value.Datas[i - 7]);
string serialno = SafeType.SafeString(kv.Value.Datas[i - 6]);
int systolic_value = SafeType.SafeInt(kv.Value.Datas[i - 5]);
int diastolic_value = SafeType.SafeInt(kv.Value.Datas[i - 4]);
DateTime create_time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 3]) / 1000000);
DateTime last_update = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(SafeType.SafeInt64(kv.Value.Datas[i - 2]) / 1000000);
int method = SafeType.SafeInt(kv.Value.Datas[i - 1]);
bool is_display = SafeType.SafeBool(kv.Value.Datas[i]);


// Console.WriteLine("----------");
HisGpsBloodPress bp = new()
{
//BloodPressId = (string)kv.Value.Datas[i -8],
//MessageId = (string)kv.Value.Datas[i -7],
//Serialno = (string)kv.Value.Datas[i -6],
//SystolicValue = (int)kv.Value.Datas[i -5],
//DiastolicValue = (int)kv.Value.Datas[i -4],
//CreateTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -3]/1000000),
//LastUpdate = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds((long)kv.Value.Datas[i -2]/1000000),
//Method = (int)kv.Value.Datas[i -1],
//IsDisplay = (bool)kv.Value.Datas[i] ? 1 : 0,

BloodPressId = bloodpress_id,
MessageId = message_id,
Serialno = serialno,
SystolicValue = systolic_value,
DiastolicValue = diastolic_value,
CreateTime = create_time,
LastUpdate = last_update,
Method = method,
IsDisplay = is_display ? 1 : 0,
};


#region 获取个人信息

var person = await _personCacheMgr.GetDeviceGpsPersonCacheBySerialNoAsync(bp.MessageId, bp.Serialno).ConfigureAwait(false);
// 验证这个信息是否存在
if (person == null || person?.Person.BornDate == null)
{
Console.WriteLine("验证这个信息是否存在");
}
else
{
// 验证年龄是否在范围 (2 - 120)


var age = SafeType.SafeInt(DateTime.Today.Year - person?.Person.BornDate!.Value.Year!);
#endregion
if (age < 1 || age > 120)
{
Console.WriteLine("验证年龄是否在范围 (2 - 120)");
}
else
{
var gender = person?.Person.Gender == true ? 1 : 2;
var isHypertension = SafeType.SafeBool(person?.Person.Ishypertension!);
var height = SafeType.SafeDouble(person?.Person.Height!);
var weight = SafeType.SafeDouble(person?.Person.Weight!);


#region 插入当次BP数据
// 保存到TDengine

//var bpSql = $"INSERT INTO health_monitor.hm_bloodpress VALUES(" +
// $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
// $"'{bp.BloodPressId}'," +
// $"'{bp.MessageId}'," +
// $"'{bp.Serialno}'," +
// $"{bp.SystolicValue}," +
// $"{bp.DiastolicValue}," +
// $"'{bp.CreateTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
// $"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
// $"{bp.Method}," +
// $"{bp.IsDisplay == 1})";

//await _serviceTDengine.GernalRestSql(bpSql);

#endregion

#region 计算增量值
var bpRef = await _bpRefValCacheManager.GetBloodPressReferenceValueAsync(age, gender, isHypertension);

var systolicRefValue = bpRef?.Systolic;//?
var diastolicRefValue = bpRef?.Diastolic;//?
int duration = 30;
// 获取历史数据
//DateTime now = DateTime.Now;
DateTime now = (DateTime)bp.LastUpdate; //测试
DateTime startTime = now.AddDays(-duration);
DateTime endTime = now;

//
var systolicAggregate = await _serviceTDengine.GetAggregateValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");
var diastolicAggregate = await _serviceTDengine.GetAggregateValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}'");

// 最大值
var systolicMax = systolicAggregate.Max;
var diastolicMax = diastolicAggregate.Max;
// 最小值
var systolicMin = systolicAggregate.Min;
var diastolicMin = diastolicAggregate.Min;


// 计算去除最大值和最小值和异常值的平均值
var systolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("systolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and systolic_value < {systolicRefValue} ");
var diastolicAvg = await _serviceTDengine.GetAvgExceptMaxMinValue("diastolic_value", "hm_bloodpress", $"ts>='{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and ts <='{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}' and serialno='{bp.Serialno}' and diastolic_value < {diastolicRefValue}");

// 偏移参数
var avgOffset = 0.25M;

var systolicAvgOffset = avgOffset;
var diastolicAvgOffset = avgOffset;


// 增量值=(标定值-平均值)* 0.25
var systolicInc = systolicAvg.Equals(0M) ? 0 : (int)((systolicRefValue - systolicAvg) * systolicAvgOffset)!;
var diastolicInc = diastolicAvg.Equals(0M) ? 0 : (int)((diastolicRefValue - diastolicAvg) * diastolicAvgOffset)!;

#endregion

#region 插入BP增量值
var sql = $"INSERT INTO health_monitor.hm_bloodpress_stats_inc VALUES(" +
$"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
$"'{bp.BloodPressId}'," +
$"'{bp.MessageId}'," +
$"'{bp.Serialno}'," +
$"{bp.SystolicValue}," +
$"{systolicRefValue}," +
$"{systolicAvg}," +
$"{systolicMax}," +
$"{systolicMin}," +
$"{systolicAvgOffset}," +
$"{systolicInc}," +
$"{bp.DiastolicValue}," +
$"{diastolicRefValue}," +
$"{diastolicAvg}," +
$"{diastolicMax}," +
$"{diastolicMin}," +
$"{diastolicAvgOffset}," +
$"{diastolicInc}," +
$"{gender}," +
$"{age}," +
$"{height}," +
$"{weight}," +
$"'{bp.LastUpdate:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
$"{duration}," +
$"'{startTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
$"'{endTime:yyyy-MM-ddTHH:mm:ss.fffZ}'," +
$"'{string.Empty}')";
var res = await _serviceTDengine.GernalRestSql(sql);
#endregion

};
// Console.WriteLine("----------");
}


}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}

}
Console.WriteLine("++++++++++++++++++++++");

Console.Write($"总共增加{cnt}");
}

// Console.WriteLine("");



}

_consumer.Commit(consumerRes);
// Console.WriteLine("\n================ {0} done ");
}
*/



public async Task ProcessMsg()
{

}


public IntPtr GetConnection()
{
string host = "47.116.142.20";
short port = 6030;
string username = "root";
string password = "taosdata";
string dbname = "tmqdb";
var conn = TDengine.Connect(host, username, password, dbname, port);
if (conn == IntPtr.Zero)
{
throw new Exception("Connect to TDengine failed");
}
else
{
Console.WriteLine("Connect to TDengine success");
}
return conn;
}
// 关闭消费者
//public void CloseConsumer(IConsumer consumer, IntPtr conn)
//{
// List<string> topics = consumer.Subscription();
// topics.ForEach(t => Console.WriteLine("topic name:{0}", t));

// // unsubscribe
// consumer.Unsubscribe();

// // close consumer after use.Otherwise will lead memory leak.
// consumer.Close();
// TDengine.Close(conn);
//}

public void CloseConsumer()
{
List<string> topics = _consumer.Subscription();
topics.ForEach(t => Console.WriteLine("topic name:{0}", t));

// unsubscribe
_consumer.Unsubscribe();

// close consumer after use.Otherwise will lead memory leak.
_consumer.Close();
TDengine.Close(_conn);
}
}
}

Завантаження…
Відмінити
Зберегти