using Confluent.Kafka;
using GpsCardGatewayPosition.Model.Config;
using GpsCardGatewayPosition.Service.MqProducer.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection.PortableExecutable;
using System.Text;
using System.Threading.Tasks;
namespace GpsCardGatewayPosition.Service.MqProducer
{
///
/// 消息生产者
///
public class MessageProducer
{
private IProducer _producer;
private readonly ServiceConfig _configService;
private readonly ILogger _logger;
public MessageProducer(IOptions _optConfigService, ILogger logger)
{
_configService = _optConfigService.Value;
_logger = logger;
var config = new ProducerConfig
{
BootstrapServers = _configService.MqServerAddress,
EnableIdempotence = true,
Acks = Acks.All,
//LingerMs=5000,
//BatchNumMessages =1000,
MessageSendMaxRetries = 3
};
_producer = new ProducerBuilder(config).Build();
}
public Headers CreateHeader(Dictionary pair = null)
{
if (pair == null)
{
return null;
}
else
{
Headers headers = new Headers();
foreach (var item in pair)
{
headers.Add(item.Key, BitConverter.GetBytes(item.Value));
}
return headers;
}
}
// public async Task ProduceAsync(string topic, object message, Headers header = null)
public async Task ProduceAsync(List topic, object message)
{
try
{
foreach (var item in topic)
{
// producer = new ProducerBuilder(config).Build();
await _producer.ProduceAsync(item.Topic, new Message
{
Headers = item.Headers,
Value = JsonConvert.SerializeObject(message)
}).ConfigureAwait(false);
}
}
catch (ProduceException ex)
{
_logger.LogError($"推送到kafka失败,topic: {topic},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
}
}
}
}