diff --git a/HealthMonitor.Model/Config/ServiceConfig.cs b/HealthMonitor.Model/Config/ServiceConfig.cs
index ac40e72..a528d80 100644
--- a/HealthMonitor.Model/Config/ServiceConfig.cs
+++ b/HealthMonitor.Model/Config/ServiceConfig.cs
@@ -14,5 +14,7 @@
public string IotAuth { get; set; } = default!;
public string IotCore { get; set; } = default!;
+
+ public string KafkaServerAddress { get; set; } = default!;
}
}
diff --git a/HealthMonitor.Service/Cache/DeviceCacheManager.cs b/HealthMonitor.Service/Cache/DeviceCacheManager.cs
index d7291d5..abf17cc 100644
--- a/HealthMonitor.Service/Cache/DeviceCacheManager.cs
+++ b/HealthMonitor.Service/Cache/DeviceCacheManager.cs
@@ -34,7 +34,11 @@ namespace HealthMonitor.Service.Cache
_hisFetalHeartRateApiClient = hisFetalHeartRateApiClient;
}
-
+ ///
+ /// 获取device_id
+ ///
+ ///
+ ///
public async Task GetDeviceBySerialNoAsync(string sn)
{
string key = CACHE_KEY_DEVICE + sn;
diff --git a/HealthMonitor.Service/HealthMonitor.Service.csproj b/HealthMonitor.Service/HealthMonitor.Service.csproj
index c41ac2e..94bca01 100644
--- a/HealthMonitor.Service/HealthMonitor.Service.csproj
+++ b/HealthMonitor.Service/HealthMonitor.Service.csproj
@@ -8,6 +8,7 @@
+
diff --git a/HealthMonitor.Service/MessageQueue/Kafka/IKafkaService.cs b/HealthMonitor.Service/MessageQueue/Kafka/IKafkaService.cs
new file mode 100644
index 0000000..7087f36
--- /dev/null
+++ b/HealthMonitor.Service/MessageQueue/Kafka/IKafkaService.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace HealthMonitor.Service.MessageQueue.Kafka
+{
+ public interface IKafkaService
+ {
+ Task PublishAsync(string topicName, T message) where T : class;
+
+ Task SubscribeAsync(IEnumerable topics, Action messageFunc, CancellationToken cancellationToken = default) where T : class;
+ }
+}
diff --git a/HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs b/HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs
new file mode 100644
index 0000000..2d1ef40
--- /dev/null
+++ b/HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs
@@ -0,0 +1,122 @@
+using HealthMonitor.Common;
+using HealthMonitor.Model.Config;
+using Microsoft.EntityFrameworkCore.Diagnostics;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+using Confluent.Kafka;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.DirectoryServices.Protocols;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace HealthMonitor.Service.MessageQueue.Kafka
+{
+ public class KafkaService : IKafkaService
+ {
+
+ private readonly ILogger _logger;
+ private readonly ServiceConfig _configService;
+ public KafkaService(IOptions _optConfigService, ILogger logger)
+ {
+ _configService = _optConfigService.Value;
+ _logger = logger;
+ }
+
+ public async Task PublishAsync(string topicName, T message) where T : class
+ {
+ try
+ {
+ Type messageType = typeof(T);
+ var config = new ProducerConfig
+ {
+ BootstrapServers = _configService.KafkaServerAddress,
+ EnableIdempotence = true,
+ Acks = Acks.All,
+ MessageSendMaxRetries = 3
+ };
+
+ using var producer = new ProducerBuilder(config).Build();
+ await producer.ProduceAsync(topicName, new Message
+ {
+ Key = Guid.NewGuid().ToString(),
+ Value = JsonConvert.SerializeObject(message)
+ });
+ }
+ catch (ProduceException ex)
+ {
+ _logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
+ }
+ }
+
+ public async Task SubscribeAsync(IEnumerable topics, Action messageFunc, CancellationToken cancellationToken = default) where T : class
+ {
+ var config = new ConsumerConfig
+ {
+ BootstrapServers = _configService.KafkaServerAddress,
+ GroupId = "Consumer",
+ EnableAutoCommit = false, // 禁止AutoCommit
+ Acks = Acks.Leader, // 假设只需要Leader响应即可
+ AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
+ };
+ using (var consumer = new ConsumerBuilder(config).Build())
+ {
+ consumer.Subscribe(topics);
+ try
+ {
+ while (true)
+ {
+ try
+ {
+ var consumeResult = consumer.Consume(cancellationToken);
+ Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
+ if (consumeResult!.IsPartitionEOF)
+ {
+ Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
+ continue;
+ }
+ T? messageResult = null;
+ try
+ {
+ messageResult = JsonConvert.DeserializeObject(consumeResult.Message!.Value)!;
+ }
+ catch (Exception ex)
+ {
+ var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message!.Value}】 :{ex.StackTrace?.ToString()}";
+ Console.WriteLine(errorMessage);
+ messageResult = null;
+ }
+ if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
+ {
+ messageFunc(messageResult);
+ try
+ {
+ consumer.Commit(consumeResult);
+ }
+ catch (KafkaException e)
+ {
+ Console.WriteLine(e.Message);
+ }
+ }
+ }
+ catch (ConsumeException e)
+ {
+ Console.WriteLine($"Consume error: {e.Error.Reason}");
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ Console.WriteLine("Closing consumer.");
+ consumer.Close();
+ }
+ }
+
+ await Task.CompletedTask;
+ }
+ }
+}
+
diff --git a/HealthMonitor.Service/MessageQueue/MqProcessLogic.cs b/HealthMonitor.Service/MessageQueue/MqProcessLogic.cs
new file mode 100644
index 0000000..8a5940b
--- /dev/null
+++ b/HealthMonitor.Service/MessageQueue/MqProcessLogic.cs
@@ -0,0 +1,30 @@
+using HealthMonitor.Service.MessageQueue.Kafka;
+using Microsoft.EntityFrameworkCore.Diagnostics;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace HealthMonitor.Service.MessageQueue
+{
+ public class MqProcessLogic
+ {
+ private readonly ILogger _logger;
+
+ private readonly KafkaService _serviceKafka;
+ public MqProcessLogic(ILogger logger, KafkaService serviceKafka)
+ {
+ _logger = logger;
+ _serviceKafka = serviceKafka;
+ }
+ public async Task ProcessIMEIEventMessageAsync(string messagesId,string topicName,object eventData)
+ {
+ await _serviceKafka.PublishAsync(topicName, eventData);
+ _logger.LogInformation($"推送消息 {messagesId} 内容:{JsonConvert.SerializeObject(eventData)}");
+ }
+ }
+
+}
diff --git a/HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs b/HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs
index 9d0e85b..7a9e61e 100644
--- a/HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs
+++ b/HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs
@@ -1,20 +1,24 @@
-using HealthMonitor.Common;
+using Etcdserverpb;
+using HealthMonitor.Common;
using HealthMonitor.Common.helper;
using HealthMonitor.Model.Service.Mapper;
using HealthMonitor.Service.Biz;
using HealthMonitor.Service.Biz.db;
using HealthMonitor.Service.Cache;
using HealthMonitor.Service.Etcd;
+using HealthMonitor.Service.MessageQueue;
using HealthMonitor.Service.Resolver.Interface;
using HealthMonitor.Service.Sub;
using HealthMonitor.Service.Sub.Topic.Model;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
using SqlSugar;
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Net;
using System.Text;
using System.Threading.Tasks;
using TelpoDataService.Util.Clients;
@@ -44,11 +48,14 @@ namespace HealthMonitor.Service.Resolver
private readonly FetalMovementNormalValueRangeCacheManager _mgrFetalMovementNormalValueRangeCache;
+ private readonly MqProcessLogic _serviceMqProcess;
+
public PregnancyHeartRateResolver(ILogger logger,
- HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr,
+ HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr,
+ MqProcessLogic serviceMqProcess,
IotApiService iotApiService, TDengineService serviceDengine, FetalMovementNormalValueRangeCacheManager fetalMovementNormalValueRangeCacheMgr,
GpsLocationHistoryAccessorClient hisFetalHeartApiClient,
GpsLocationHistoryAccessorClient hisFetalMovementApiClient
@@ -60,6 +67,7 @@ namespace HealthMonitor.Service.Resolver
_serviceTDengine = serviceDengine;
_deviceCacheMgr = deviceCacheMgr;
_serviceIotApi = iotApiService;
+ _serviceMqProcess= serviceMqProcess;
_hisFetalHeartApiClient = hisFetalHeartApiClient;
_hisFetalMovementApiClient = hisFetalMovementApiClient;
_mgrFetalMovementNormalValueRangeCache = fetalMovementNormalValueRangeCacheMgr;
@@ -452,7 +460,7 @@ namespace HealthMonitor.Service.Resolver
{
var avgPhr = lastPhr.Select(i => i.PregnancyHeartRate).Average();
// 计算一般心率得到胎心系数
- await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
+ await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString());
}
#endregion
@@ -474,7 +482,8 @@ namespace HealthMonitor.Service.Resolver
.Select(i => i.PregnancyHeartRate).Average();
// 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig
// 计算一般心率得到胎心系数
- await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
+ //await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
+ await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString());
}
@@ -490,7 +499,9 @@ namespace HealthMonitor.Service.Resolver
.Select(i => i.PregnancyHeartRate).Average();
// 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig
// 计算一般心率得到胎心系数
- await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
+ //await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr);
+ await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString());
+
}
// 删除高频状态的首条记录
@@ -566,6 +577,28 @@ namespace HealthMonitor.Service.Resolver
// 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTimeFHR, fetalHeartRateIsAbnormal);
+ // 推送送微信
+ var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
+ var fhrMsgId = $"{heartRate.Serialno}-{sampleTimeFHR}-{Guid.NewGuid().ToString("D")[^3..]}";
+ var topic = "topic.push.wx";
+ var fhrMsg = new
+ {
+ messageId = fhrMsgId,
+ topic = topic,
+ time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTimeFHR.Length < 13 ? sampleTimeFHR.PadRight(13, '0') : sampleTimeFHR)).ToString("yyyy-MM-dd HH:mm:ss"),
+ data = new
+ {
+ deviceId = device?.DeviceId,
+ imei = heartRate.Serialno,
+ alarmTypeId = 12,
+ alarmDeviceName = heartRate.Serialno,
+ alarmRemarks = JsonConvert.SerializeObject(new { fetalHeartValue = fetalHeartRate, isAbnormal = fetalHeartRateIsAbnormal }),
+ address = string.Empty,
+ deviceKey = device?.DeviceId
+ }
+ };
+ await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic,fhrMsg).ConfigureAwait(false);
+
}
}
@@ -660,8 +693,31 @@ namespace HealthMonitor.Service.Resolver
};
await _hisFetalMovementApiClient.AddAsync(fm).ConfigureAwait(false);
+ // 发送到微信
+ var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
+ var fmMsgId = $"{heartRate.Serialno}-{fetalMovementSampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
+ var topic = "topic.push.wx";
+ var fmMsg = new
+ {
+ messageId=Guid.NewGuid().ToString("D"),
+ topic= topic,
+ time= DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(fetalMovementSampleTime.Length < 13 ? fetalMovementSampleTime.PadRight(13, '0') : fetalMovementSampleTime)).ToString("yyyy-MM-dd HH:mm:ss"),
+ data=new {
+ deviceId= device?.DeviceId,
+ imei = heartRate.Serialno,
+ alarmTypeId=12,
+ alarmDeviceName = heartRate.Serialno,
+ alarmRemarks=JsonConvert.SerializeObject(new { fetalMovementValue= fetalMovement, isAbnormal= feltalMovementIsAbnormal }),
+ address=string.Empty,
+ deviceKey=device?.DeviceId
+ }
+ };
+ await _serviceMqProcess.ProcessIMEIEventMessageAsync(fmMsgId, topic, fmMsg).ConfigureAwait(false);
+
+
// 设置入库缓存记录
await _deviceCacheMgr.SetFetalMovementAsync(heartRate.Serialno, fetalMovementSampleTime,fm);
+
}
else
{
@@ -855,6 +911,62 @@ namespace HealthMonitor.Service.Resolver
}
}
+ private async Task SaveAndPushFreqFetalHeartRateAsync(HisGpsHeartRate heartRate, int upperAlarmThreshold, int lowerAlarmThreshold, double avgPhr,string sampleTime)
+ {
+ var commonPHR = await _serviceTDengine.InitPregnancyCommonHeartRateModeAsync(heartRate.Serialno);
+ if (commonPHR != null)
+ {
+ // 保存到TDengine数据库
+ await _serviceTDengine.InsertAsync("hm_pchr", commonPHR);
+ // 计算胎心=孕妇心率*系数
+ var fetalHeartRate = SafeType.SafeInt(avgPhr * commonPHR?.StatModeAvgFprCoefficient!);
+ var isAbnormal = fetalHeartRate > upperAlarmThreshold ? 1 : (fetalHeartRate < lowerAlarmThreshold ? 2 : 0);
+
+ // 保存到 数据服务 MySQL 数据库
+ HisGpsFetalHeartRate gpsFetalHeartRate = new()
+ {
+ FetalHeartRateId = Guid.NewGuid().ToString("D"),
+ PersonId = commonPHR!.PersonId,
+ Serialno = heartRate.Serialno,
+ HeartRate = fetalHeartRate,
+ SampleTime = sampleTime.Length > 10 ? sampleTime.Substring(0, 10) : sampleTime,
+ IsAbnormal = isAbnormal,
+ StatStartTime = commonPHR.StatStartTime,
+ StatEndTime = commonPHR.StatEndTime,
+ CreateTime = DateTime.Now,
+ Method = 1,
+ IsDisplay = 1,
+ DeviceKey = commonPHR!.DeviceKey
+ };
+ await _hisFetalHeartApiClient.AddAsync(gpsFetalHeartRate).ConfigureAwait(false);
+
+ // 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
+ await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal);
+
+ // 推送到微信
+ var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
+ var fhrMsgId = $"{heartRate.Serialno}-{sampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
+ var topic = "topic.push.wx";
+ var fhrMsg = new
+ {
+ messageId = fhrMsgId,
+ topic = topic,
+ time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTime.Length < 13 ? sampleTime.PadRight(13, '0') : sampleTime)).ToString("yyyy-MM-dd HH:mm:ss"),
+ data = new
+ {
+ deviceId = device?.DeviceId,
+ imei = heartRate.Serialno,
+ alarmTypeId = 12,
+ alarmDeviceName = heartRate.Serialno,
+ alarmRemarks = JsonConvert.SerializeObject(new { fetalHeartValue = fetalHeartRate, isAbnormal = isAbnormal }),
+ address = string.Empty,
+ deviceKey = device?.DeviceId
+ }
+ };
+ await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic, fhrMsg).ConfigureAwait(false);
+
+ }
+ }
private async Task SetIntervalTriggerAsync(string key,string imei, long interval)
{
// var key = $"health_monitor/schedule_push/{type}/imei/{imei}";
diff --git a/HealthMonitor.WebApi/Program.cs b/HealthMonitor.WebApi/Program.cs
index 31f20f3..e4f36b3 100644
--- a/HealthMonitor.WebApi/Program.cs
+++ b/HealthMonitor.WebApi/Program.cs
@@ -34,6 +34,8 @@ using Microsoft.Extensions.DependencyInjection.Extensions;
using HealthMonitor.Service.Etcd;
using HealthMonitor.WebApi.Middleware;
using HealthMonitor.Service.Biz;
+using HealthMonitor.Service.MessageQueue.Kafka;
+using HealthMonitor.Service.MessageQueue;
namespace HealthMonitor.WebApi
{
@@ -202,6 +204,10 @@ namespace HealthMonitor.WebApi
.AddHostedService();
#endregion
+ #region kafka
+ builder.Services.AddSingleton();
+ builder.Services.AddSingleton();
+ #endregion
builder.Host.UseSerilog();
diff --git a/HealthMonitor.WebApi/appsettings.Development.json b/HealthMonitor.WebApi/appsettings.Development.json
index 9bc2665..626681c 100644
--- a/HealthMonitor.WebApi/appsettings.Development.json
+++ b/HealthMonitor.WebApi/appsettings.Development.json
@@ -25,6 +25,7 @@
"IdleTimeout": 20000
},
"ServiceConfig": {
+ "KafkaServerAddress": "47.116.67.214:9092",
"TelpoDataUrl": "https://id.gdssjl.com/data/",
"EtcdServerAddress": "http://192.168.2.121:2379",
"IotWebApiUrl": "http://id.gdssjl.com/webapi/api/",
diff --git a/HealthMonitor.WebApi/appsettings.production.json b/HealthMonitor.WebApi/appsettings.production.json
index 88c3dac..d1ccc40 100644
--- a/HealthMonitor.WebApi/appsettings.production.json
+++ b/HealthMonitor.WebApi/appsettings.production.json
@@ -22,11 +22,13 @@
"IdleTimeout": 20000
},
"ServiceConfig": {
+ "KafkaServerAddress": "172.19.42.40:9092,172.19.42.41:9092,172.19.42.48:9092",
"TelpoDataUrl": "https://ai.gdssjl.com/data/",
"EtcdServerAddress": "http://172.19.42.40:2379",
"IotWebApiUrl": "http://ai.gdssjl.com/webapi/api/",
"IotAuth": "http://ai.ssjlai.com/auth/identityController",
"IotCore": "https://ai.ssjlai.com/gateway/core/api/v1/open/OpenIot"
+
},
"BoodPressResolverConfig": {
"EnableBPRefPush": true
diff --git a/HealthMonitor.WebApi/appsettings.test.json b/HealthMonitor.WebApi/appsettings.test.json
index 06be6f4..bf9b4ab 100644
--- a/HealthMonitor.WebApi/appsettings.test.json
+++ b/HealthMonitor.WebApi/appsettings.test.json
@@ -30,6 +30,7 @@
"IdleTimeout": 20000
},
"ServiceConfig": {
+ "KafkaServerAddress": "172.19.42.53:9092",
"TelpoDataUrl": "https://id.gdssjl.com/data/",
"EtcdServerAddress": "http://172.19.42.44:2379",
"IotWebApiUrl": "http://id.gdssjl.com/webapi/api/",