using Confluent.Kafka; using TelpoPush.WanJiaAn.Worker.Common; using TelpoPush.WanJiaAn.Worker.Service.Mq; namespace TelpoPush.WanJiaAn.Worker.Handlers { public class KafkaSubscribe { private readonly ILogger _logger; private readonly IHostEnvironment _env; private readonly IKafkaService _kafkaService; private readonly WanJiaAnProcess _WanJiaAnProcess; public KafkaSubscribe( ILogger logger, IHostEnvironment env, IKafkaService kafkaService, WanJiaAnProcess WanJiaAnProcess) { _logger = logger; _env = env; _kafkaService = kafkaService; _WanJiaAnProcess = WanJiaAnProcess; } public async Task SubscribeAsync() { #if DEBUG //_logger.LogInformation("11312"); //var temp = new Headers(); //string topic = "topic.push.WanJiaAn"; //temp.Add(new Header("DataType", new byte[] { 0, 0, 0, 0 })); //temp.Add(new Header("AlarmType", new byte[] { 2, 0, 0, 0 })); //string psych = "{\"messageId\":\"1790941606816612864\",\"topic\":\"topic.push.third\",\"time\":\"2024-05-16 11:05:27\",\"data\":{\"imei\":\"861281060093147\",\"atteryLowId\":\"861281060093147664577f9\",\"info\":\"设备电量低于15%\"}}"; //await _WanJiaAnProcess.SendWanJiaAn(psych, topic, temp); // await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); #else LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5); TaskFactory factory = new TaskFactory(lcts); try { await factory.StartNew(async () => { await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); }); } catch (Exception ex) { _logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}"); } #endif } async void DoReceive(string topic, string message, Headers headers) { await _WanJiaAnProcess.SendWanJiaAn(message, topic, headers); } } }