using Confluent.Kafka;
using GpsCardGatewayPosition.Model.Config;
using GpsCardGatewayPosition.Service.MqProducer.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection.PortableExecutable;
using System.Text;
using System.Threading.Tasks;

namespace GpsCardGatewayPosition.Service.MqProducer
{
    /// <summary>
	/// 消息生产者
	/// </summary>
	public class MessageProducer
    {
        private IProducer<Null, string> _producer;
        private readonly ServiceConfig _configService;
        private readonly ILogger<MessageProducer> _logger;

        public MessageProducer(IOptions<ServiceConfig> _optConfigService,  ILogger<MessageProducer> logger)
        {
            _configService = _optConfigService.Value;
            _logger = logger;

            var config = new ProducerConfig
            {
                BootstrapServers = _configService.MqServerAddress,
                EnableIdempotence = true,
                Acks = Acks.All,
                //LingerMs=5000,
                //BatchNumMessages =1000,
                MessageSendMaxRetries = 3
            };
            _producer = new ProducerBuilder<Null, string>(config).Build();
        }

        public Headers CreateHeader(Dictionary<string, int> pair = null)
        {
            if (pair == null)
            {
                return null;
            }
            else
            {
                Headers headers = new Headers();

                foreach (var item in pair)
                {
                    headers.Add(item.Key, BitConverter.GetBytes(item.Value));
                }
                return headers;
            }
        }

        // public async Task ProduceAsync(string topic, object message, Headers header = null)
        public async Task ProduceAsync(List<TopicModel> topic, object message)
        {
     
            try
            {
                foreach (var item in topic)
                {
                    //  producer = new ProducerBuilder<Null, string>(config).Build();
                    await _producer.ProduceAsync(item.Topic, new Message<Null, string>
                    {
                        Headers = item.Headers,
                        Value = JsonConvert.SerializeObject(message)
                    }).ConfigureAwait(false);
                }
            }
            catch (ProduceException<Null, string> ex)
            {
                _logger.LogError($"推送到kafka失败,topic: {topic},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
            }
        }
    }
}