using Confluent.Kafka; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using TelpoPush.Models.Config; namespace TelpoPush.Service.Mq.Kafka { public class KafkaService: IKafkaService { private readonly ConsumerConfig _consumerConfig; private readonly IHostEnvironment env; private readonly ILogger logger; private readonly ServiceConfig _configService; public KafkaService(ILogger _logger, IHostEnvironment _env, IOptions optConfigService) { //config = _configuration; _configService = optConfigService.Value; env = _env; logger = _logger; _consumerConfig = new ConsumerConfig { BootstrapServers = _configService.KafkaBootstrapServers, SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.ScramSha256, GroupId = _configService.KafkaGroupId, SaslUsername = _configService.KafkaSaslUsername, SaslPassword = _configService.KafkaSaslPassword, SslCaLocation = _configService.KafkaSslCaLocation, //SslCaLocation = @"D:\THOMAS\Project\SSJL\C#\Net8\TelpoPushThirdSsl\pem\ca-root.pem", EnableAutoCommit = false, // 禁止AutoCommit AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的开始消费起 CancellationDelayMaxMs=1 }; } public async Task SubscribeAsync(Action messageFunc, CancellationToken cancellationToken) { List topics = _configService.KafkaTopics; using (var consumer = new ConsumerBuilder(_consumerConfig) .SetErrorHandler((_, e) => { logger.LogError($"Error: {e.Reason}"); Console.WriteLine($"Error: {e.Reason}"); }) .SetStatisticsHandler((_, json) => { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中.."); }) .SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}"); Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}"); }) .SetPartitionsRevokedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}"); }) .Build()) { consumer.Subscribe(topics); try { while (true) { try { var consumeResult = consumer.Consume(cancellationToken); int DataType = -1, AlarmType = -1, OperType = -1; foreach (var item in consumeResult?.Headers) { if (item.Key == KafkaHeader.DataType) DataType = BitConverter.ToInt32(item.GetValueBytes(), 0); else if (item.Key == KafkaHeader.AlarmType) AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0); else if (item.Key == KafkaHeader.OperType) OperType = BitConverter.ToInt32(item.GetValueBytes(), 0); } var Headers = new { DataType, AlarmType, OperType }; logger.LogInformation($"[{env.EnvironmentName}]Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'."); if (consumeResult.IsPartitionEOF) { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } // 根据消息内容动态决定topic //var dynamicTopic = GetDynamicTopic(consumeResult.Message?.Value); //consumer.Subscribe(new List() { dynamicTopic }); string messageResult = null; Headers headers = null; try { messageResult = consumeResult.Message.Value; headers = consumeResult.Message.Headers; } catch (Exception ex) { var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}"; Console.WriteLine(errorMessage); messageResult = null; } if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/) { string topic = consumeResult.Topic; messageFunc(topic, messageResult, headers); try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine(e.Message); } } } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { Console.WriteLine("Closing consumer."); consumer.Close(); } } await Task.CompletedTask; } } }