Browse Source

胎心胎动数据推送到第三方

datasub12_fetal_heart_rate_0
H Vs 3 months ago
parent
commit
0b07626cd0
4 changed files with 83 additions and 143 deletions
  1. +30
    -0
      HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs
  2. +6
    -0
      HealthMonitor.Service/MessageQueue/MqProcessLogic.cs
  3. +25
    -139
      HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs
  4. +22
    -4
      HealthMonitor.WebApi/Worker.cs

+ 30
- 0
HealthMonitor.Service/MessageQueue/Kafka/KafkaService.cs View File

@@ -52,6 +52,36 @@ namespace HealthMonitor.Service.MessageQueue.Kafka
}
}

public async Task PublishAsync<T>(string topicName, int dataType, T message) where T : class
{
try
{
Type messageType = typeof(T);
var config = new ProducerConfig
{
BootstrapServers = _configService.KafkaServerAddress,
EnableIdempotence = true,
Acks = Acks.All,
MessageSendMaxRetries = 3
};
Headers headers = new()
{
{ "DataType", BitConverter.GetBytes(dataType) }
};
using var producer = new ProducerBuilder<string, string>(config).Build();
await producer.ProduceAsync(topicName, new Message<string, string>
{
Headers= headers,
Key = Guid.NewGuid().ToString(),
Value = JsonConvert.SerializeObject(message)
});
}
catch (ProduceException<Null, string> ex)
{
_logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
}
}

public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class
{
var config = new ConsumerConfig


+ 6
- 0
HealthMonitor.Service/MessageQueue/MqProcessLogic.cs View File

@@ -25,6 +25,12 @@ namespace HealthMonitor.Service.MessageQueue
await _serviceKafka.PublishAsync(topicName, eventData);
_logger.LogInformation($"推送消息 {messagesId} 内容:{JsonConvert.SerializeObject(eventData)}");
}

public async Task ProcessIMEIEventMessageAsync(string messagesId, string topicName, int dataTpe, object eventData)
{
await _serviceKafka.PublishAsync(topicName, dataTpe, eventData);
_logger.LogInformation($"推送消息 {messagesId},数据类型:{dataTpe}, 内容:{JsonConvert.SerializeObject(eventData)}");
}
}

}

+ 25
- 139
HealthMonitor.Service/Resolver/PregnancyHeartRateResolver.cs View File

@@ -1,4 +1,5 @@
using Etcdserverpb;
using Google.Protobuf.WellKnownTypes;
using HealthMonitor.Common;
using HealthMonitor.Common.helper;
using HealthMonitor.Model.Service.Mapper;
@@ -693,17 +694,36 @@ namespace HealthMonitor.Service.Resolver
// 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal);

// 推送到微信

var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
var fhrMsgId = $"{heartRate.Serialno}-{sampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
var fhrMsgTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTime.Length < 13 ? sampleTime.PadRight(13, '0') : sampleTime)).ToString("yyyy-MM-dd HH:mm:ss");
// 胎心数据推送到第三方
var topic = "topic.push.third";
var fhrThridMsg = new
{
messageId = fhrMsgId,
topic = topic,
time = fhrMsgTime,
data=new {
imei = heartRate.Serialno,
value= fetalHeartRate,
isAbnormal,
type= "fetalHeart"
}
};
await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic, 31,fhrThridMsg).ConfigureAwait(false);

// 胎心数据推送到微信
if (isAbnormal != 0)
{
var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
var fhrMsgId = $"{heartRate.Serialno}-{sampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
var topic = "topic.push.wx";

topic = "topic.push.wx";
var fhrMsg = new
{
messageId = fhrMsgId,
topic = topic,
time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTime.Length < 13 ? sampleTime.PadRight(13, '0') : sampleTime)).ToString("yyyy-MM-dd HH:mm:ss"),
time = fhrMsgTime,
data = new
{
deviceId = device?.DeviceId,
@@ -719,140 +739,6 @@ namespace HealthMonitor.Service.Resolver

}
}

private async Task SaveAndPushFreqFetalHeartRateAsync(HisGpsHeartRate heartRate, int upperAlarmThreshold, int lowerAlarmThreshold, double avgPhr, string sampleTime)
{
var commonPHR = await _serviceTDengine.InitPregnancyCommonHeartRateModeAsync(heartRate.Serialno);
if (commonPHR != null)
{
// 保存到TDengine数据库
await _serviceTDengine.InsertAsync<PregnancyCommonHeartRateModel>("hm_pchr", commonPHR);
// 计算胎心=孕妇心率*系数
var fetalHeartRate = SafeType.SafeInt(avgPhr * commonPHR?.StatModeAvgFprCoefficient!);
var isAbnormal = fetalHeartRate > upperAlarmThreshold ? 1 : (fetalHeartRate < lowerAlarmThreshold ? 2 : 0);

// 保存到 数据服务 MySQL 数据库
HisGpsFetalHeartRate gpsFetalHeartRate = new()
{
FetalHeartRateId = Guid.NewGuid().ToString("D"),
PersonId = commonPHR!.PersonId,
Serialno = heartRate.Serialno,
HeartRate = fetalHeartRate,
SampleTime = sampleTime.Length > 10 ? sampleTime.Substring(0, 10) : sampleTime,
IsAbnormal = isAbnormal,
StatStartTime = commonPHR.StatStartTime,
StatEndTime = commonPHR.StatEndTime,
CreateTime = DateTime.Now,
Method = 1,
IsDisplay = 1,
DeviceKey = commonPHR!.DeviceKey
};
await _hisFetalHeartApiClient.AddAsync(gpsFetalHeartRate).ConfigureAwait(false);

// 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal);

// 推送到微信
if (isAbnormal != 0)
{
var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
var fhrMsgId = $"{heartRate.Serialno}-{sampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
var topic = "topic.push.wx";
var fhrMsg = new
{
messageId = fhrMsgId,
topic = topic,
time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTime.Length < 13 ? sampleTime.PadRight(13, '0') : sampleTime)).ToString("yyyy-MM-dd HH:mm:ss"),
data = new
{
deviceId = device?.DeviceId,
imei = heartRate.Serialno,
alarmTypeId = 12,
alarmDeviceName = heartRate.Serialno,
alarmRemarks = JsonConvert.SerializeObject(new { fetalHeartValue = fetalHeartRate, isAbnormal = isAbnormal }),
address = string.Empty,
deviceKey = device?.DeviceId
}
};
await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic, fhrMsg).ConfigureAwait(false);

}


}
}

/// <summary>
///
/// </summary>
/// <param name="heartRate"></param>
/// <param name="upperAlarmThreshold"></param>
/// <param name="lowerAlarmThreshold"></param>
/// <param name="avgPhr"></param>
/// <param name="sampleTime"></param>
/// <returns></returns>
private async Task SaveAndPushFreqFetalHeartRate2Async(HisGpsHeartRate heartRate, int upperAlarmThreshold, int lowerAlarmThreshold, double avgPhr, string sampleTime)
{
var commonPHR = await _serviceTDengine.GetLastAsync<PregnancyCommonHeartRateModel>(heartRate.Serialno);
if (commonPHR != null)
{
// 保存到TDengine数据库
//await _serviceTDengine.InsertAsync<PregnancyCommonHeartRateModel>("hm_pchr", commonPHR);
// 计算胎心=孕妇心率*系数
var fetalHeartRate = SafeType.SafeInt(avgPhr * commonPHR?.StatModeAvgFprCoefficient!);
var isAbnormal = fetalHeartRate > upperAlarmThreshold ? 1 : (fetalHeartRate < lowerAlarmThreshold ? 2 : 0);

// 保存到 数据服务 MySQL 数据库
HisGpsFetalHeartRate gpsFetalHeartRate = new()
{
FetalHeartRateId = Guid.NewGuid().ToString("D"),
PersonId = commonPHR!.PersonId,
Serialno = heartRate.Serialno,
HeartRate = fetalHeartRate,
SampleTime = sampleTime.Length > 10 ? sampleTime.Substring(0, 10) : sampleTime,
IsAbnormal = isAbnormal,
StatStartTime = commonPHR.StatStartTime,
StatEndTime = commonPHR.StatEndTime,
CreateTime = DateTime.Now,
Method = 1,
IsDisplay = 1,
DeviceKey = commonPHR!.DeviceKey
};
await _hisFetalHeartApiClient.AddAsync(gpsFetalHeartRate).ConfigureAwait(false);

// 推送到api/v1/open/OpenIot/SetFetalHeartRateConfig
await _serviceIotApi.SetFetalHeartRateConfig(heartRate.Serialno, fetalHeartRate, sampleTime, isAbnormal);

// 推送到微信
if (isAbnormal != 0)
{
var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(heartRate.Serialno).ConfigureAwait(false);
var fhrMsgId = $"{heartRate.Serialno}-{sampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
var topic = "topic.push.wx";
var fhrMsg = new
{
messageId = fhrMsgId,
topic = topic,
time = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(sampleTime.Length < 13 ? sampleTime.PadRight(13, '0') : sampleTime)).ToString("yyyy-MM-dd HH:mm:ss"),
data = new
{
deviceId = device?.DeviceId,
imei = heartRate.Serialno,
alarmTypeId = 12,
alarmDeviceName = heartRate.Serialno,
alarmRemarks = JsonConvert.SerializeObject(new { fetalHeartValue = fetalHeartRate, isAbnormal = isAbnormal }),
address = string.Empty,
deviceKey = device?.DeviceId
}
};
await _serviceMqProcess.ProcessIMEIEventMessageAsync(fhrMsgId, topic, fhrMsg).ConfigureAwait(false);

}


}
}

private async Task SetIntervalTriggerAsync(string key,string imei, long interval)
{
// var key = $"health_monitor/schedule_push/{type}/imei/{imei}";


+ 22
- 4
HealthMonitor.WebApi/Worker.cs View File

@@ -561,12 +561,30 @@ namespace HealthMonitor.WebApi
};
await _hisFetalMovementApiClient.AddAsync(fm).ConfigureAwait(false);

// 发送到微信
var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(imeiDel).ConfigureAwait(false);
var fmMsgId = $"{imeiDel}-{fetalMovementSampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
var fmMsgTime = DateTimeUtil.GetDateTimeFromUnixTimeMilliseconds(long.Parse(fetalMovementSampleTime.Length < 13 ? fetalMovementSampleTime.PadRight(13, '0') : fetalMovementSampleTime)).ToString("yyyy-MM-dd HH:mm:ss");
// 胎动数据推送到第三方
var topic = "topic.push.third";
var fmThridMsg = new
{
messageId = fmMsgId,
topic = topic,
time = fmMsgTime,
data = new
{
imei = imeiDel,
value = fetalMovement,
isAbnormal= feltalMovementIsAbnormal,
type = "fetalMovement"
}
};
await _serviceMqProcess.ProcessIMEIEventMessageAsync(fmMsgId, topic, 31, fmThridMsg).ConfigureAwait(false);

// 胎动数据推送到微信
if (feltalMovementIsAbnormal != 0)
{
var device = await _deviceCacheMgr.GetDeviceBySerialNoAsync(imeiDel).ConfigureAwait(false);
var fmMsgId = $"{imeiDel}-{fetalMovementSampleTime}-{Guid.NewGuid().ToString("D")[^3..]}";
var topic = "topic.push.wx";
topic = "topic.push.wx";
var fmMsg = new
{
messageId = Guid.NewGuid().ToString("D"),


Loading…
Cancel
Save