using Confluent.Kafka; using Microsoft.Extensions.Options; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Reflection.PortableExecutable; using System.Text; using System.Threading.Tasks; using TelpoPush.Models.MqTemplates; using TelpoPush.Service.Biz; namespace TelpoPush.Worker.ThirdSsl.Handlers { public class ThirdSslProcess { private readonly static object _syncLocker = new object(); private readonly IHostEnvironment _env; private readonly ILogger _logger; private readonly IZkRealHRMonitorService _zkRealHRMonitorService; public ThirdSslProcess( IHostEnvironment env, ILogger logger, IZkRealHRMonitorService zkRealHRMonitorService) { _env = env; _logger = logger; _zkRealHRMonitorService= zkRealHRMonitorService; } public async Task SendSslThird(string? message, string topic, Headers headers) { //lock (_syncLocker) //{ _logger.LogInformation($"获取kafka数据Message:<{message}>"); if (!string.IsNullOrEmpty(message)) { BaseModel model = JsonConvert.DeserializeObject(message); if (!string.IsNullOrEmpty(model.Content.ToString())) { switch (model.TopicName) { case "topic.push.telpo.zkheartrate": await DataServiceZkRealHRMonitor(model); break; default: break; } } } // } } public async Task DataServiceZkRealHRMonitor(BaseModel model) { string content = model.Content.ToString().ToLower(); if (content.Contains("heartrates")) { var data = JsonConvert.DeserializeObject(content); await _zkRealHRMonitorService.Save(data.heartRates, model.MessageId); } else if (content.Contains("anomalycancel")) { var data = JsonConvert.DeserializeObject(content); await _zkRealHRMonitorService.SaveAnomalyCancel(data.anomalyCancel, model.MessageId); } } } }