using Confluent.Kafka; using GpsCardGatewayPosition.Model.Config; using GpsCardGatewayPosition.Service.MqProducer.Model; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Reflection.PortableExecutable; using System.Text; using System.Threading.Tasks; namespace GpsCardGatewayPosition.Service.MqProducer { /// /// 消息生产者 /// public class MessageProducer { private IProducer _producer; private readonly ServiceConfig _configService; private readonly ILogger _logger; public MessageProducer(IOptions _optConfigService, ILogger logger) { _configService = _optConfigService.Value; _logger = logger; var config = new ProducerConfig { BootstrapServers = _configService.MqServerAddress, EnableIdempotence = true, Acks = Acks.All, //LingerMs=5000, //BatchNumMessages =1000, MessageSendMaxRetries = 3 }; _producer = new ProducerBuilder(config).Build(); } public Headers CreateHeader(Dictionary pair = null) { if (pair == null) { return null; } else { Headers headers = new Headers(); foreach (var item in pair) { headers.Add(item.Key, BitConverter.GetBytes(item.Value)); } return headers; } } // public async Task ProduceAsync(string topic, object message, Headers header = null) public async Task ProduceAsync(List topic, object message) { try { foreach (var item in topic) { // producer = new ProducerBuilder(config).Build(); await _producer.ProduceAsync(item.Topic, new Message { Headers = item.Headers, Value = JsonConvert.SerializeObject(message) }).ConfigureAwait(false); } } catch (ProduceException ex) { _logger.LogError($"推送到kafka失败,topic: {topic},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}"); } } } }