using HealthMonitor.Common; using HealthMonitor.Model.Config; using Microsoft.EntityFrameworkCore.Diagnostics; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using Confluent.Kafka; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.DirectoryServices.Protocols; using System.Linq; using System.Text; using System.Threading.Tasks; namespace HealthMonitor.Service.MessageQueue.Kafka { public class KafkaService : IKafkaService { private readonly ILogger _logger; private readonly ServiceConfig _configService; public KafkaService(IOptions _optConfigService, ILogger logger) { _configService = _optConfigService.Value; _logger = logger; } public async Task PublishAsync(string topicName, T message) where T : class { try { Type messageType = typeof(T); var config = new ProducerConfig { BootstrapServers = _configService.KafkaServerAddress, EnableIdempotence = true, Acks = Acks.All, MessageSendMaxRetries = 3 }; using var producer = new ProducerBuilder(config).Build(); await producer.ProduceAsync(topicName, new Message { Key = Guid.NewGuid().ToString(), Value = JsonConvert.SerializeObject(message) }); } catch (ProduceException ex) { _logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}"); } } public async Task PublishAsync(string topicName, int dataType, T message) where T : class { try { Type messageType = typeof(T); var config = new ProducerConfig { BootstrapServers = _configService.KafkaServerAddress, EnableIdempotence = true, Acks = Acks.All, MessageSendMaxRetries = 3 }; Headers headers = new() { { "DataType", BitConverter.GetBytes(dataType) } }; using var producer = new ProducerBuilder(config).Build(); await producer.ProduceAsync(topicName, new Message { Headers= headers, Key = Guid.NewGuid().ToString(), Value = JsonConvert.SerializeObject(message) }); } catch (ProduceException ex) { _logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}"); } } public async Task SubscribeAsync(IEnumerable topics, Action messageFunc, CancellationToken cancellationToken = default) where T : class { var config = new ConsumerConfig { BootstrapServers = _configService.KafkaServerAddress, GroupId = "Consumer", EnableAutoCommit = false, // 禁止AutoCommit Acks = Acks.Leader, // 假设只需要Leader响应即可 AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起 }; using (var consumer = new ConsumerBuilder(config).Build()) { consumer.Subscribe(topics); try { while (true) { try { var consumeResult = consumer.Consume(cancellationToken); Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'."); if (consumeResult!.IsPartitionEOF) { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } T? messageResult = null; try { messageResult = JsonConvert.DeserializeObject(consumeResult.Message!.Value)!; } catch (Exception ex) { var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message!.Value}】 :{ex.StackTrace?.ToString()}"; Console.WriteLine(errorMessage); messageResult = null; } if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) { messageFunc(messageResult); try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine(e.Message); } } } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { Console.WriteLine("Closing consumer."); consumer.Close(); } } await Task.CompletedTask; } } }