Explorar el Código

推送到kafka

datasub12_fetal_heart_rate_0
H Vs hace 4 meses
padre
commit
9fecd30ba5
Se han modificado 11 ficheros con 302 adiciones y 6 borrados
  1. +2
    -0
      HealthMonitor.Model/Config/ServiceConfig.cs
  2. +5
    -1
      HealthMonitor.Service/Cache/DeviceCacheManager.cs
  3. +1
    -0
      HealthMonitor.Service/HealthMonitor.Service.csproj
  4. +15
    -0
      HealthMonitor.Service/MessageQueue/Kafka/IKafkaService.cs
  5. +122
    -0
      HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs
  6. +30
    -0
      HealthMonitor.Service/MessageQueue/MqProcessLogic.cs
  7. +117
    -5
      HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs
  8. +6
    -0
      HealthMonitor.WebApi/Program.cs
  9. +1
    -0
      HealthMonitor.WebApi/appsettings.Development.json
  10. +2
    -0
      HealthMonitor.WebApi/appsettings.production.json
  11. +1
    -0
      HealthMonitor.WebApi/appsettings.test.json

+ 2
- 0
HealthMonitor.Model/Config/ServiceConfig.cs Ver fichero

@@ -14,5 +14,7 @@
public string IotAuth { get; set; } = default!;

public string IotCore { get; set; } = default!;

public string KafkaServerAddress { get; set; } = default!;
}
}

+ 5
- 1
HealthMonitor.Service/Cache/DeviceCacheManager.cs Ver fichero

@@ -34,7 +34,11 @@ namespace HealthMonitor.Service.Cache
_hisFetalHeartRateApiClient = hisFetalHeartRateApiClient;
}


/// <summary>
/// 获取device_id
/// </summary>
/// <param name="sn"></param>
/// <returns></returns>
public async Task<GpsDevice?> GetDeviceBySerialNoAsync(string sn)
{
string key = CACHE_KEY_DEVICE + sn;


+ 1
- 0
HealthMonitor.Service/HealthMonitor.Service.csproj Ver fichero

@@ -8,6 +8,7 @@

<ItemGroup>
<PackageReference Include="dotnet-etcd" Version="6.0.1" />
<PackageReference Include="Confluent.Kafka" Version="1.9.2" />
<PackageReference Include="CSRedisCore" Version="3.8.3" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />


+ 15
- 0
HealthMonitor.Service/MessageQueue/Kafka/IKafkaService.cs Ver fichero

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HealthMonitor.Service.MessageQueue.Kafka
{
public interface IKafkaService
{
Task PublishAsync<T>(string topicName, T message) where T : class;

Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;
}
}

+ 122
- 0
HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs Ver fichero

@@ -0,0 +1,122 @@
using HealthMonitor.Common;
using HealthMonitor.Model.Config;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Confluent.Kafka;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.DirectoryServices.Protocols;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HealthMonitor.Service.MessageQueue.Kafka
{
public class KafkaService : IKafkaService
{

private readonly ILogger<KafkaService> _logger;
private readonly ServiceConfig _configService;
public KafkaService(IOptions<ServiceConfig> _optConfigService, ILogger<KafkaService> logger)
{
_configService = _optConfigService.Value;
_logger = logger;
}

public async Task PublishAsync<T>(string topicName, T message) where T : class
{
try
{
Type messageType = typeof(T);
var config = new ProducerConfig
{
BootstrapServers = _configService.KafkaServerAddress,
EnableIdempotence = true,
Acks = Acks.All,
MessageSendMaxRetries = 3
};

using var producer = new ProducerBuilder<string, string>(config).Build();
await producer.ProduceAsync(topicName, new Message<string, string>
{
Key = Guid.NewGuid().ToString(),
Value = JsonConvert.SerializeObject(message)
});
}
catch (ProduceException<Null, string> ex)
{
_logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
}
}

public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class
{
var config = new ConsumerConfig
{
BootstrapServers = _configService.KafkaServerAddress,
GroupId = "Consumer",
EnableAutoCommit = false, // 禁止AutoCommit
Acks = Acks.Leader, // 假设只需要Leader响应即可
AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(topics);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
if (consumeResult!.IsPartitionEOF)
{
Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
T? messageResult = null;
try
{
messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message!.Value)!;
}
catch (Exception ex)
{
var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message!.Value}】 :{ex.StackTrace?.ToString()}";
Console.WriteLine(errorMessage);
messageResult = null;
}
if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
{
messageFunc(messageResult);
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine(e.Message);
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}

await Task.CompletedTask;
}
}
}


+ 30
- 0
HealthMonitor.Service/MessageQueue/MqProcessLogic.cs Ver fichero

@@ -0,0 +1,30 @@
using HealthMonitor.Service.MessageQueue.Kafka;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HealthMonitor.Service.MessageQueue
{
public class MqProcessLogic
{
private readonly ILogger<MqProcessLogic> _logger;

private readonly KafkaService _serviceKafka;
public MqProcessLogic(ILogger<MqProcessLogic> logger, KafkaService serviceKafka)
{
_logger = logger;
_serviceKafka = serviceKafka;
}
public async Task ProcessIMEIEventMessageAsync(string messagesId,string topicName,object eventData)
{
await _serviceKafka.PublishAsync(topicName, eventData);
_logger.LogInformation($"推送消息 {messagesId} 内容:{JsonConvert.SerializeObject(eventData)}");
}
}

}

+ 117
- 5
HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs Ver fichero

@@ -1,20 +1,24 @@
using HealthMonitor.Common;
using Etcdserverpb;
using HealthMonitor.Common;
using HealthMonitor.Common.helper;
using HealthMonitor.Model.Service.Mapper;
using HealthMonitor.Service.Biz;
using HealthMonitor.Service.Biz.db;
using HealthMonitor.Service.Cache;
using HealthMonitor.Service.Etcd;
using HealthMonitor.Service.MessageQueue;
using HealthMonitor.Service.Resolver.Interface;
using HealthMonitor.Service.Sub;
using HealthMonitor.Service.Sub.Topic.Model;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using SqlSugar;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using TelpoDataService.Util.Clients;
@@ -44,11 +48,14 @@ namespace HealthMonitor.Service.Resolver

private readonly FetalMovementNormalValueRangeCacheManager _mgrFetalMovementNormalValueRangeCache;

private readonly MqProcessLogic _serviceMqProcess;




public PregnancyHeartRateResolver(ILogger<PregnancyHeartRateResolver> logger,
HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr,
HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr,
MqProcessLogic serviceMqProcess,
IotApiService iotApiService, TDengineService serviceDengine, FetalMovementNormalValueRangeCacheManager fetalMovementNormalValueRangeCacheMgr,
GpsLocationHistoryAccessorClient<HisGpsFetalHeartRate> hisFetalHeartApiClient,
GpsLocationHistoryAccessorClient<HisGpsFetalMovement> hisFetalMovementApiClient
@@ -60,6 +67,7 @@ namespace HealthMonitor.Service.Resolver
_serviceTDengine = serviceDengine;
_deviceCacheMgr = deviceCacheMgr;
_serviceIotApi = iotApiService;
_serviceMqProcess= serviceMqProcess;
_hisFetalHeartApiClient = hisFetalHeartApiClient;
_hisFetalMovementApiClient = hisFetalMovementApiClient;
_mgrFetalMovementNormalValueRangeCache = fetalMovementNormalValueRangeCacheMgr;
@@ -452,7 +460,7 @@ namespace HealthMonitor.Service.Resolver
{
var avgPhr = lastPhr.Select(i => i.PregnancyHeartRate).Average();
// 计算一般心率得到胎心系数
await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString());

}
#endregion
@@ -474,7 +482,8 @@ namespace HealthMonitor.Service.Resolver
.Select(i => i.PregnancyHeartRate).Average();
// 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig
// 计算一般心率得到胎心系数
await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
//await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString());

}

@@ -490,7 +499,9 @@ namespace HealthMonitor.Service.Resolver
.Select(i => i.PregnancyHeartRate).Average();
// 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig
// 计算一般心率得到胎心系数
await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
//await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString());

}

// 删除高频状态的首条记录
@@ -566,6 +577,28 @@ namespace HealthMonitor.Service.Resolver
// 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTimeFHR, fetalHeartRateIsAbnormal);

// 推送送微信
var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
var fhrMsgId = $"{heartRate.Serialno}-{sampleTimeFHR}-{Guid.NewGuid().ToString("D")[^3..]}";
var topic = "topic.push.wx";
var fhrMsg = new
{
messageId = fhrMsgId,
topic = topic,
time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTimeFHR.Length < 13 ? sampleTimeFHR.PadRight(13, '0') : sampleTimeFHR)).ToString("yyyy-MM-dd HH:mm:ss"),
data = new
{
deviceId = device?.DeviceId,
imei = heartRate.Serialno,
alarmTypeId = 12,
alarmDeviceName = heartRate.Serialno,
alarmRemarks = JsonConvert.SerializeObject(new { fetalHeartValue = fetalHeartRate, isAbnormal = fetalHeartRateIsAbnormal }),
address = string.Empty,
deviceKey = device?.DeviceId
}
};
await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic,fhrMsg).ConfigureAwait(false);

}
}

@@ -660,8 +693,31 @@ namespace HealthMonitor.Service.Resolver
};
await _hisFetalMovementApiClient.AddAsync(fm).ConfigureAwait(false);

// 发送到微信
var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
var fmMsgId = $"{heartRate.Serialno}-{fetalMovementSampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
var topic = "topic.push.wx";
var fmMsg = new
{
messageId=Guid.NewGuid().ToString("D"),
topic= topic,
time= DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(fetalMovementSampleTime.Length < 13 ? fetalMovementSampleTime.PadRight(13, '0') : fetalMovementSampleTime)).ToString("yyyy-MM-dd HH:mm:ss"),
data=new {
deviceId= device?.DeviceId,
imei = heartRate.Serialno,
alarmTypeId=12,
alarmDeviceName = heartRate.Serialno,
alarmRemarks=JsonConvert.SerializeObject(new { fetalMovementValue= fetalMovement, isAbnormal= feltalMovementIsAbnormal }),
address=string.Empty,
deviceKey=device?.DeviceId
}
};
await _serviceMqProcess.ProcessIMEIEventMessageAsync(fmMsgId, topic, fmMsg).ConfigureAwait(false);


// 设置入库缓存记录
await _deviceCacheMgr.SetFetalMovementAsync(heartRate.Serialno, fetalMovementSampleTime,fm);
}
else
{
@@ -855,6 +911,62 @@ namespace HealthMonitor.Service.Resolver
}
}

private async Task SaveAndPushFreqFetalHeartRateAsync(HisGpsHeartRate heartRate, int upperAlarmThreshold, int lowerAlarmThreshold, double avgPhr,string sampleTime)
{
var commonPHR = await _serviceTDengine.InitPregnancyCommonHeartRateModeAsync(heartRate.Serialno);
if (commonPHR != null)
{
// 保存到TDengine数据库
await _serviceTDengine.InsertAsync<PregnancyCommonHeartRateModel>("hm_pchr", commonPHR);
// 计算胎心=孕妇心率*系数
var fetalHeartRate = SafeType.SafeInt(avgPhr * commonPHR?.StatModeAvgFprCoefficient!);
var isAbnormal = fetalHeartRate > upperAlarmThreshold ? 1 : (fetalHeartRate < lowerAlarmThreshold ? 2 : 0);

// 保存到 数据服务 MySQL 数据库
HisGpsFetalHeartRate gpsFetalHeartRate = new()
{
FetalHeartRateId = Guid.NewGuid().ToString("D"),
PersonId = commonPHR!.PersonId,
Serialno = heartRate.Serialno,
HeartRate = fetalHeartRate,
SampleTime = sampleTime.Length > 10 ? sampleTime.Substring(0, 10) : sampleTime,
IsAbnormal = isAbnormal,
StatStartTime = commonPHR.StatStartTime,
StatEndTime = commonPHR.StatEndTime,
CreateTime = DateTime.Now,
Method = 1,
IsDisplay = 1,
DeviceKey = commonPHR!.DeviceKey
};
await _hisFetalHeartApiClient.AddAsync(gpsFetalHeartRate).ConfigureAwait(false);

// 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal);

// 推送到微信
var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
var fhrMsgId = $"{heartRate.Serialno}-{sampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
var topic = "topic.push.wx";
var fhrMsg = new
{
messageId = fhrMsgId,
topic = topic,
time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTime.Length < 13 ? sampleTime.PadRight(13, '0') : sampleTime)).ToString("yyyy-MM-dd HH:mm:ss"),
data = new
{
deviceId = device?.DeviceId,
imei = heartRate.Serialno,
alarmTypeId = 12,
alarmDeviceName = heartRate.Serialno,
alarmRemarks = JsonConvert.SerializeObject(new { fetalHeartValue = fetalHeartRate, isAbnormal = isAbnormal }),
address = string.Empty,
deviceKey = device?.DeviceId
}
};
await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic, fhrMsg).ConfigureAwait(false);

}
}
private async Task SetIntervalTriggerAsync(string key,string imei, long interval)
{
// var key = $"health_monitor/schedule_push/{type}/imei/{imei}";


+ 6
- 0
HealthMonitor.WebApi/Program.cs Ver fichero

@@ -34,6 +34,8 @@ using Microsoft.Extensions.DependencyInjection.Extensions;
using HealthMonitor.Service.Etcd;
using HealthMonitor.WebApi.Middleware;
using HealthMonitor.Service.Biz;
using HealthMonitor.Service.MessageQueue.Kafka;
using HealthMonitor.Service.MessageQueue;

namespace HealthMonitor.WebApi
{
@@ -202,6 +204,10 @@ namespace HealthMonitor.WebApi
.AddHostedService<Worker>();
#endregion

#region kafka
builder.Services.AddSingleton<MqProcessLogic>();
builder.Services.AddSingleton<KafkaService>();
#endregion

builder.Host.UseSerilog();



+ 1
- 0
HealthMonitor.WebApi/appsettings.Development.json Ver fichero

@@ -25,6 +25,7 @@
"IdleTimeout": 20000
},
"ServiceConfig": {
"KafkaServerAddress": "47.116.67.214:9092",
"TelpoDataUrl": "https://id.gdssjl.com/data/",
"EtcdServerAddress": "http://192.168.2.121:2379",
"IotWebApiUrl": "http://id.gdssjl.com/webapi/api/",


+ 2
- 0
HealthMonitor.WebApi/appsettings.production.json Ver fichero

@@ -22,11 +22,13 @@
"IdleTimeout": 20000
},
"ServiceConfig": {
"KafkaServerAddress": "172.19.42.40:9092,172.19.42.41:9092,172.19.42.48:9092",
"TelpoDataUrl": "https://ai.gdssjl.com/data/",
"EtcdServerAddress": "http://172.19.42.40:2379",
"IotWebApiUrl": "http://ai.gdssjl.com/webapi/api/",
"IotAuth": "http://ai.ssjlai.com/auth/identityController",
"IotCore": "https://ai.ssjlai.com/gateway/core/api/v1/open/OpenIot"

},
"BoodPressResolverConfig": {
"EnableBPRefPush": true


+ 1
- 0
HealthMonitor.WebApi/appsettings.test.json Ver fichero

@@ -30,6 +30,7 @@
"IdleTimeout": 20000
},
"ServiceConfig": {
"KafkaServerAddress": "172.19.42.53:9092",
"TelpoDataUrl": "https://id.gdssjl.com/data/",
"EtcdServerAddress": "http://172.19.42.44:2379",
"IotWebApiUrl": "http://id.gdssjl.com/webapi/api/",


Cargando…
Cancelar
Guardar