using Confluent.Kafka; using TelpoPush.Fence.Worker.Common; using TelpoPush.Fence.Worker.Service.Mq; namespace TelpoPush.Fence.Worker.Handlers { public class KafkaSubscribe { private readonly ILogger _logger; private readonly IHostEnvironment _env; private readonly IKafkaService _kafkaService; private readonly FenceProcess _feneProcess; public KafkaSubscribe( ILogger logger, IHostEnvironment env, IKafkaService kafkaService, FenceProcess feneProcess) { _logger = logger; _env = env; _kafkaService = kafkaService; _feneProcess = feneProcess; } public async Task SubscribeAsync() { #if DEBUG _logger.LogInformation("11312"); //string message = "{\"messageId\":\"1798637996003269632\",\"topic\":\"topic.push.position\",\"time\":\"2024-06-06 16:43:21\",\"data\":{\"deviceId\":\"f9f395b1-a6c3-4919-bf24-5ed963d79fad\",\"imei\":\"861281060086083\",\"wifiInfo\":\"88:86:03:df:a3:6c,-59,|88:86:03:e0:09:64,-63,|e8:84:c6:ef:cc:14,-68,|34:da:b7:69:d5:00,-68,|88:86:03:e0:09:4c,-68,|88:86:03:e0:0a:5c,-74,|88:86:03:df:b7:54,-83,|88:86:03:df:dc:18,-87,\",\"address\":\"通钦街道合作市第三小学\",\"baiduLatitude\":34.9966307394798,\"baiduLongitude\":102.918411172862,\"gaodeLatitude\":34.9908855355319,\"gaodeLongitude\":102.911835939226,\"originalLatitude\":34.9920144084504,\"originalLongitude\":102.910114012956,\"locationType\":3,\"lastUpdate\":\"2024-06-06 16:43:21\",\"utcDate\":\"2024-06-06 08:43:21\",\"radius\":50}}"; //await _feneProcess.SendFence(message); //// await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); #else LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5); TaskFactory factory = new TaskFactory(lcts); try { await factory.StartNew(async () => { await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); }); } catch (Exception ex) { _logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}"); } #endif } async void DoReceive(string message) { await _feneProcess.SendFence(message); } } }