|
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- using Confluent.Kafka;
- using GpsCardGatewayPosition.Model.Config;
- using GpsCardGatewayPosition.Service.MqProducer.Model;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reflection.PortableExecutable;
- using System.Text;
- using System.Threading.Tasks;
-
- namespace GpsCardGatewayPosition.Service.MqProducer
- {
- /// <summary>
- /// 消息生产者
- /// </summary>
- public class MessageProducer
- {
- private IProducer<Null, string> _producer;
- private readonly ServiceConfig _configService;
- private readonly ILogger<MessageProducer> _logger;
-
- public MessageProducer(IOptions<ServiceConfig> _optConfigService, ILogger<MessageProducer> logger)
- {
- _configService = _optConfigService.Value;
- _logger = logger;
-
- var config = new ProducerConfig
- {
- BootstrapServers = _configService.MqServerAddress,
- EnableIdempotence = true,
- Acks = Acks.All,
- //LingerMs=5000,
- //BatchNumMessages =1000,
- MessageSendMaxRetries = 3
- };
- _producer = new ProducerBuilder<Null, string>(config).Build();
- }
-
- public Headers CreateHeader(Dictionary<string, int> pair = null)
- {
- if (pair == null)
- {
- return null;
- }
- else
- {
- Headers headers = new Headers();
-
- foreach (var item in pair)
- {
- headers.Add(item.Key, BitConverter.GetBytes(item.Value));
- }
- return headers;
- }
- }
-
- // public async Task ProduceAsync(string topic, object message, Headers header = null)
- public async Task ProduceAsync(List<TopicModel> topic, object message)
- {
-
- try
- {
- foreach (var item in topic)
- {
- // producer = new ProducerBuilder<Null, string>(config).Build();
- await _producer.ProduceAsync(item.Topic, new Message<Null, string>
- {
- Headers = item.Headers,
- Value = JsonConvert.SerializeObject(message)
- }).ConfigureAwait(false);
- }
- }
- catch (ProduceException<Null, string> ex)
- {
- _logger.LogError($"推送到kafka失败,topic: {topic},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
- }
- }
- }
- }
|