From 9fecd30ba5483e0e88abaef0eeb8f1b176a78dff Mon Sep 17 00:00:00 2001 From: H Vs Date: Sat, 27 Jul 2024 10:41:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E9=80=81=E5=88=B0kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- HealthMonitor.Model/Config/ServiceConfig.cs | 2 + .../Cache/DeviceCacheManager.cs | 6 +- .../HealthMonitor.Service.csproj | 1 + .../MessageQueue/Kafka/IKafkaService.cs | 15 +++ .../MessageQueue/Kafka/KafkaService.cs | 122 ++++++++++++++++++ .../MessageQueue/MqProcessLogic.cs | 30 +++++ .../Resolver/PregnancyHeartRateResolver.cs | 122 +++++++++++++++++- HealthMonitor.WebApi/Program.cs | 6 + .../appsettings.Development.json | 1 + .../appsettings.production.json | 2 + HealthMonitor.WebApi/appsettings.test.json | 1 + 11 files changed, 302 insertions(+), 6 deletions(-) create mode 100644 HealthMonitor.Service/MessageQueue/Kafka/IKafkaService.cs create mode 100644 HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs create mode 100644 HealthMonitor.Service/MessageQueue/MqProcessLogic.cs diff --git a/HealthMonitor.Model/Config/ServiceConfig.cs b/HealthMonitor.Model/Config/ServiceConfig.cs index ac40e72..a528d80 100644 --- a/HealthMonitor.Model/Config/ServiceConfig.cs +++ b/HealthMonitor.Model/Config/ServiceConfig.cs @@ -14,5 +14,7 @@ public string IotAuth { get; set; } = default!; public string IotCore { get; set; } = default!; + + public string KafkaServerAddress { get; set; } = default!; } } diff --git a/HealthMonitor.Service/Cache/DeviceCacheManager.cs b/HealthMonitor.Service/Cache/DeviceCacheManager.cs index d7291d5..abf17cc 100644 --- a/HealthMonitor.Service/Cache/DeviceCacheManager.cs +++ b/HealthMonitor.Service/Cache/DeviceCacheManager.cs @@ -34,7 +34,11 @@ namespace HealthMonitor.Service.Cache _hisFetalHeartRateApiClient = hisFetalHeartRateApiClient; } - + /// + /// 获取device_id + /// + /// + /// public async Task GetDeviceBySerialNoAsync(string sn) { string key = CACHE_KEY_DEVICE + sn; diff --git a/HealthMonitor.Service/HealthMonitor.Service.csproj b/HealthMonitor.Service/HealthMonitor.Service.csproj index c41ac2e..94bca01 100644 --- a/HealthMonitor.Service/HealthMonitor.Service.csproj +++ b/HealthMonitor.Service/HealthMonitor.Service.csproj @@ -8,6 +8,7 @@ + diff --git a/HealthMonitor.Service/MessageQueue/Kafka/IKafkaService.cs b/HealthMonitor.Service/MessageQueue/Kafka/IKafkaService.cs new file mode 100644 index 0000000..7087f36 --- /dev/null +++ b/HealthMonitor.Service/MessageQueue/Kafka/IKafkaService.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Service.MessageQueue.Kafka +{ + public interface IKafkaService + { + Task PublishAsync(string topicName, T message) where T : class; + + Task SubscribeAsync(IEnumerable topics, Action messageFunc, CancellationToken cancellationToken = default) where T : class; + } +} diff --git a/HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs b/HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs new file mode 100644 index 0000000..2d1ef40 --- /dev/null +++ b/HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs @@ -0,0 +1,122 @@ +using HealthMonitor.Common; +using HealthMonitor.Model.Config; +using Microsoft.EntityFrameworkCore.Diagnostics; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Confluent.Kafka; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.DirectoryServices.Protocols; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Service.MessageQueue.Kafka +{ + public class KafkaService : IKafkaService + { + + private readonly ILogger _logger; + private readonly ServiceConfig _configService; + public KafkaService(IOptions _optConfigService, ILogger logger) + { + _configService = _optConfigService.Value; + _logger = logger; + } + + public async Task PublishAsync(string topicName, T message) where T : class + { + try + { + Type messageType = typeof(T); + var config = new ProducerConfig + { + BootstrapServers = _configService.KafkaServerAddress, + EnableIdempotence = true, + Acks = Acks.All, + MessageSendMaxRetries = 3 + }; + + using var producer = new ProducerBuilder(config).Build(); + await producer.ProduceAsync(topicName, new Message + { + Key = Guid.NewGuid().ToString(), + Value = JsonConvert.SerializeObject(message) + }); + } + catch (ProduceException ex) + { + _logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}"); + } + } + + public async Task SubscribeAsync(IEnumerable topics, Action messageFunc, CancellationToken cancellationToken = default) where T : class + { + var config = new ConsumerConfig + { + BootstrapServers = _configService.KafkaServerAddress, + GroupId = "Consumer", + EnableAutoCommit = false, // 禁止AutoCommit + Acks = Acks.Leader, // 假设只需要Leader响应即可 + AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起 + }; + using (var consumer = new ConsumerBuilder(config).Build()) + { + consumer.Subscribe(topics); + try + { + while (true) + { + try + { + var consumeResult = consumer.Consume(cancellationToken); + Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'."); + if (consumeResult!.IsPartitionEOF) + { + Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); + continue; + } + T? messageResult = null; + try + { + messageResult = JsonConvert.DeserializeObject(consumeResult.Message!.Value)!; + } + catch (Exception ex) + { + var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message!.Value}】 :{ex.StackTrace?.ToString()}"; + Console.WriteLine(errorMessage); + messageResult = null; + } + if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) + { + messageFunc(messageResult); + try + { + consumer.Commit(consumeResult); + } + catch (KafkaException e) + { + Console.WriteLine(e.Message); + } + } + } + catch (ConsumeException e) + { + Console.WriteLine($"Consume error: {e.Error.Reason}"); + } + } + } + catch (OperationCanceledException) + { + Console.WriteLine("Closing consumer."); + consumer.Close(); + } + } + + await Task.CompletedTask; + } + } +} + diff --git a/HealthMonitor.Service/MessageQueue/MqProcessLogic.cs b/HealthMonitor.Service/MessageQueue/MqProcessLogic.cs new file mode 100644 index 0000000..8a5940b --- /dev/null +++ b/HealthMonitor.Service/MessageQueue/MqProcessLogic.cs @@ -0,0 +1,30 @@ +using HealthMonitor.Service.MessageQueue.Kafka; +using Microsoft.EntityFrameworkCore.Diagnostics; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace HealthMonitor.Service.MessageQueue +{ + public class MqProcessLogic + { + private readonly ILogger _logger; + + private readonly KafkaService _serviceKafka; + public MqProcessLogic(ILogger logger, KafkaService serviceKafka) + { + _logger = logger; + _serviceKafka = serviceKafka; + } + public async Task ProcessIMEIEventMessageAsync(string messagesId,string topicName,object eventData) + { + await _serviceKafka.PublishAsync(topicName, eventData); + _logger.LogInformation($"推送消息 {messagesId} 内容:{JsonConvert.SerializeObject(eventData)}"); + } + } + +} diff --git a/HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs b/HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs index 9d0e85b..7a9e61e 100644 --- a/HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs +++ b/HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs @@ -1,20 +1,24 @@ -using HealthMonitor.Common; +using Etcdserverpb; +using HealthMonitor.Common; using HealthMonitor.Common.helper; using HealthMonitor.Model.Service.Mapper; using HealthMonitor.Service.Biz; using HealthMonitor.Service.Biz.db; using HealthMonitor.Service.Cache; using HealthMonitor.Service.Etcd; +using HealthMonitor.Service.MessageQueue; using HealthMonitor.Service.Resolver.Interface; using HealthMonitor.Service.Sub; using HealthMonitor.Service.Sub.Topic.Model; using Microsoft.EntityFrameworkCore.Metadata; using Microsoft.Extensions.Logging; using Newtonsoft.Json; +using Newtonsoft.Json.Linq; using SqlSugar; using System; using System.Collections.Generic; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; using TelpoDataService.Util.Clients; @@ -44,11 +48,14 @@ namespace HealthMonitor.Service.Resolver private readonly FetalMovementNormalValueRangeCacheManager _mgrFetalMovementNormalValueRangeCache; + private readonly MqProcessLogic _serviceMqProcess; + public PregnancyHeartRateResolver(ILogger logger, - HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr, + HttpHelper httpHelper, EtcdService serviceEtcd, DeviceCacheManager deviceCacheMgr, + MqProcessLogic serviceMqProcess, IotApiService iotApiService, TDengineService serviceDengine, FetalMovementNormalValueRangeCacheManager fetalMovementNormalValueRangeCacheMgr, GpsLocationHistoryAccessorClient hisFetalHeartApiClient, GpsLocationHistoryAccessorClient hisFetalMovementApiClient @@ -60,6 +67,7 @@ namespace HealthMonitor.Service.Resolver _serviceTDengine = serviceDengine; _deviceCacheMgr = deviceCacheMgr; _serviceIotApi = iotApiService; + _serviceMqProcess= serviceMqProcess; _hisFetalHeartApiClient = hisFetalHeartApiClient; _hisFetalMovementApiClient = hisFetalMovementApiClient; _mgrFetalMovementNormalValueRangeCache = fetalMovementNormalValueRangeCacheMgr; @@ -452,7 +460,7 @@ namespace HealthMonitor.Service.Resolver { var avgPhr = lastPhr.Select(i => i.PregnancyHeartRate).Average(); // 计算一般心率得到胎心系数 - await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr); + await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString()); } #endregion @@ -474,7 +482,8 @@ namespace HealthMonitor.Service.Resolver .Select(i => i.PregnancyHeartRate).Average(); // 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig // 计算一般心率得到胎心系数 - await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr); + //await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr); + await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString()); } @@ -490,7 +499,9 @@ namespace HealthMonitor.Service.Resolver .Select(i => i.PregnancyHeartRate).Average(); // 推送胎心数据到 api/v1/open/OpenIot/SetFetalHeartRateConfig // 计算一般心率得到胎心系数 - await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr); + //await SaveAndPushFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr); + await SaveAndPushFreqFetalHeartRateAsync(heartRate, upperAlarmThreshold, lowerAlarmThreshold, avgPhr, DateTimeUtil.ConvertToTimeStamp(DateTime.Now).ToString()); + } // 删除高频状态的首条记录 @@ -566,6 +577,28 @@ namespace HealthMonitor.Service.Resolver // 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTimeFHR, fetalHeartRateIsAbnormal); + // 推送送微信 + var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false); + var fhrMsgId = $"{heartRate.Serialno}-{sampleTimeFHR}-{Guid.NewGuid().ToString("D")[^3..]}"; + var topic = "topic.push.wx"; + var fhrMsg = new + { + messageId = fhrMsgId, + topic = topic, + time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTimeFHR.Length < 13 ? sampleTimeFHR.PadRight(13, '0') : sampleTimeFHR)).ToString("yyyy-MM-dd HH:mm:ss"), + data = new + { + deviceId = device?.DeviceId, + imei = heartRate.Serialno, + alarmTypeId = 12, + alarmDeviceName = heartRate.Serialno, + alarmRemarks = JsonConvert.SerializeObject(new { fetalHeartValue = fetalHeartRate, isAbnormal = fetalHeartRateIsAbnormal }), + address = string.Empty, + deviceKey = device?.DeviceId + } + }; + await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic,fhrMsg).ConfigureAwait(false); + } } @@ -660,8 +693,31 @@ namespace HealthMonitor.Service.Resolver }; await _hisFetalMovementApiClient.AddAsync(fm).ConfigureAwait(false); + // 发送到微信 + var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false); + var fmMsgId = $"{heartRate.Serialno}-{fetalMovementSampleTime}-{Guid.NewGuid().ToString("D")[^3..]}"; + var topic = "topic.push.wx"; + var fmMsg = new + { + messageId=Guid.NewGuid().ToString("D"), + topic= topic, + time= DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(fetalMovementSampleTime.Length < 13 ? fetalMovementSampleTime.PadRight(13, '0') : fetalMovementSampleTime)).ToString("yyyy-MM-dd HH:mm:ss"), + data=new { + deviceId= device?.DeviceId, + imei = heartRate.Serialno, + alarmTypeId=12, + alarmDeviceName = heartRate.Serialno, + alarmRemarks=JsonConvert.SerializeObject(new { fetalMovementValue= fetalMovement, isAbnormal= feltalMovementIsAbnormal }), + address=string.Empty, + deviceKey=device?.DeviceId + } + }; + await _serviceMqProcess.ProcessIMEIEventMessageAsync(fmMsgId, topic, fmMsg).ConfigureAwait(false); + + // 设置入库缓存记录 await _deviceCacheMgr.SetFetalMovementAsync(heartRate.Serialno, fetalMovementSampleTime,fm); + } else { @@ -855,6 +911,62 @@ namespace HealthMonitor.Service.Resolver } } + private async Task SaveAndPushFreqFetalHeartRateAsync(HisGpsHeartRate heartRate, int upperAlarmThreshold, int lowerAlarmThreshold, double avgPhr,string sampleTime) + { + var commonPHR = await _serviceTDengine.InitPregnancyCommonHeartRateModeAsync(heartRate.Serialno); + if (commonPHR != null) + { + // 保存到TDengine数据库 + await _serviceTDengine.InsertAsync("hm_pchr", commonPHR); + // 计算胎心=孕妇心率*系数 + var fetalHeartRate = SafeType.SafeInt(avgPhr * commonPHR?.StatModeAvgFprCoefficient!); + var isAbnormal = fetalHeartRate > upperAlarmThreshold ? 1 : (fetalHeartRate < lowerAlarmThreshold ? 2 : 0); + + // 保存到 数据服务 MySQL 数据库 + HisGpsFetalHeartRate gpsFetalHeartRate = new() + { + FetalHeartRateId = Guid.NewGuid().ToString("D"), + PersonId = commonPHR!.PersonId, + Serialno = heartRate.Serialno, + HeartRate = fetalHeartRate, + SampleTime = sampleTime.Length > 10 ? sampleTime.Substring(0, 10) : sampleTime, + IsAbnormal = isAbnormal, + StatStartTime = commonPHR.StatStartTime, + StatEndTime = commonPHR.StatEndTime, + CreateTime = DateTime.Now, + Method = 1, + IsDisplay = 1, + DeviceKey = commonPHR!.DeviceKey + }; + await _hisFetalHeartApiClient.AddAsync(gpsFetalHeartRate).ConfigureAwait(false); + + // 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig + await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal); + + // 推送到微信 + var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false); + var fhrMsgId = $"{heartRate.Serialno}-{sampleTime}-{Guid.NewGuid().ToString("D")[^3..]}"; + var topic = "topic.push.wx"; + var fhrMsg = new + { + messageId = fhrMsgId, + topic = topic, + time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTime.Length < 13 ? sampleTime.PadRight(13, '0') : sampleTime)).ToString("yyyy-MM-dd HH:mm:ss"), + data = new + { + deviceId = device?.DeviceId, + imei = heartRate.Serialno, + alarmTypeId = 12, + alarmDeviceName = heartRate.Serialno, + alarmRemarks = JsonConvert.SerializeObject(new { fetalHeartValue = fetalHeartRate, isAbnormal = isAbnormal }), + address = string.Empty, + deviceKey = device?.DeviceId + } + }; + await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic, fhrMsg).ConfigureAwait(false); + + } + } private async Task SetIntervalTriggerAsync(string key,string imei, long interval) { // var key = $"health_monitor/schedule_push/{type}/imei/{imei}"; diff --git a/HealthMonitor.WebApi/Program.cs b/HealthMonitor.WebApi/Program.cs index 31f20f3..e4f36b3 100644 --- a/HealthMonitor.WebApi/Program.cs +++ b/HealthMonitor.WebApi/Program.cs @@ -34,6 +34,8 @@ using Microsoft.Extensions.DependencyInjection.Extensions; using HealthMonitor.Service.Etcd; using HealthMonitor.WebApi.Middleware; using HealthMonitor.Service.Biz; +using HealthMonitor.Service.MessageQueue.Kafka; +using HealthMonitor.Service.MessageQueue; namespace HealthMonitor.WebApi { @@ -202,6 +204,10 @@ namespace HealthMonitor.WebApi .AddHostedService(); #endregion + #region kafka + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + #endregion builder.Host.UseSerilog(); diff --git a/HealthMonitor.WebApi/appsettings.Development.json b/HealthMonitor.WebApi/appsettings.Development.json index 9bc2665..626681c 100644 --- a/HealthMonitor.WebApi/appsettings.Development.json +++ b/HealthMonitor.WebApi/appsettings.Development.json @@ -25,6 +25,7 @@ "IdleTimeout": 20000 }, "ServiceConfig": { + "KafkaServerAddress": "47.116.67.214:9092", "TelpoDataUrl": "https://id.gdssjl.com/data/", "EtcdServerAddress": "http://192.168.2.121:2379", "IotWebApiUrl": "http://id.gdssjl.com/webapi/api/", diff --git a/HealthMonitor.WebApi/appsettings.production.json b/HealthMonitor.WebApi/appsettings.production.json index 88c3dac..d1ccc40 100644 --- a/HealthMonitor.WebApi/appsettings.production.json +++ b/HealthMonitor.WebApi/appsettings.production.json @@ -22,11 +22,13 @@ "IdleTimeout": 20000 }, "ServiceConfig": { + "KafkaServerAddress": "172.19.42.40:9092,172.19.42.41:9092,172.19.42.48:9092", "TelpoDataUrl": "https://ai.gdssjl.com/data/", "EtcdServerAddress": "http://172.19.42.40:2379", "IotWebApiUrl": "http://ai.gdssjl.com/webapi/api/", "IotAuth": "http://ai.ssjlai.com/auth/identityController", "IotCore": "https://ai.ssjlai.com/gateway/core/api/v1/open/OpenIot" + }, "BoodPressResolverConfig": { "EnableBPRefPush": true diff --git a/HealthMonitor.WebApi/appsettings.test.json b/HealthMonitor.WebApi/appsettings.test.json index 06be6f4..bf9b4ab 100644 --- a/HealthMonitor.WebApi/appsettings.test.json +++ b/HealthMonitor.WebApi/appsettings.test.json @@ -30,6 +30,7 @@ "IdleTimeout": 20000 }, "ServiceConfig": { + "KafkaServerAddress": "172.19.42.53:9092", "TelpoDataUrl": "https://id.gdssjl.com/data/", "EtcdServerAddress": "http://172.19.42.44:2379", "IotWebApiUrl": "http://id.gdssjl.com/webapi/api/",