using Confluent.Kafka; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Reflection.PortableExecutable; using System.Text; using System.Threading.Tasks; using TelpoPush.Common; using TelpoPush.Models.Config; using TelpoPush.Service.Mq.Kafka; namespace TelpoPush.Worker.ThirdSsl.Handlers { public class KafkaSubscribe { private readonly ILogger _logger; private readonly IHostEnvironment _env; private readonly IKafkaService _kafkaService; private readonly ThirdSslProcess _thirdSslProcess; private readonly ServiceConfig _configService; public KafkaSubscribe( ILogger logger, IHostEnvironment env, IKafkaService kafkaService, ThirdSslProcess thirdSslProcess) { _logger = logger; _env = env; _kafkaService = kafkaService; _thirdSslProcess = thirdSslProcess; } public async Task SubscribeAsync() { //string msg = "{\"MessageTime\":\"2024-05-13 10:29:09.407\",\"TopicName\":\"topic.push.telpo.zkheartrate\",\"MessageId\":\"1715567350000\",\"IMEI\":\"868437060014409\",\"Content\":\"{\\\"heartRates\\\":{\\\"imei\\\":\\\"868437060014409\\\",\\\"agencyid\\\":\\\"77d562ac-dd78-40ea-af6d-224ada9d70dc\\\",\\\"data\\\":[{\\\"time\\\":1715567340,\\\"value\\\":172,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567342,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567344,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567346,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567348,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567350,\\\"value\\\":174,\\\"isAnomaly\\\":1}]}}\"}"; //await _thirdSslProcess.SendSslThird(msg, "", null); //string msg2 = "{\"MessageTime\":\"2024-05-13 14:10:43.814\",\"TopicName\":\"topic.push.telpo.zkheartrate\",\"MessageId\":\"1715580644000\",\"IMEI\":\"868437060014409\",\"Content\":\"{\\\"anomalyCancel\\\":{\\\"imei\\\":\\\"868437060014409\\\",\\\"agencyid\\\":\\\"77d562ac-dd78-40ea-af6d-224ada9d70dc\\\",\\\"time\\\":1715580644}}\"}"; //await _thirdSslProcess.SendSslThird(msg2, "", null); //await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5); TaskFactory factory = new TaskFactory(lcts); try { await factory.StartNew(() => { _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); }); } catch (Exception ex) { _logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}"); } } async void DoReceive(string topic, string message, Headers headers) { await _thirdSslProcess.SendSslThird(message, topic, headers); } } }