using HealthMonitor.Common;
using HealthMonitor.Model.Config;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Confluent.Kafka;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.DirectoryServices.Protocols;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HealthMonitor.Service.MessageQueue.Kafka
{
    public class KafkaService : IKafkaService
    {

        private readonly ILogger<KafkaService> _logger;
        private readonly ServiceConfig _configService;
        public KafkaService(IOptions<ServiceConfig> _optConfigService, ILogger<KafkaService> logger)
        {
            _configService = _optConfigService.Value;
            _logger = logger;
        }

        public async Task PublishAsync<T>(string topicName, T message) where T : class
        {
            try
            {
                Type messageType = typeof(T);
                var config = new ProducerConfig
                {
                    BootstrapServers = _configService.KafkaServerAddress,
                    EnableIdempotence = true,
                    Acks = Acks.All,
                    MessageSendMaxRetries = 3
                };

                using var producer = new ProducerBuilder<string, string>(config).Build();
                await producer.ProduceAsync(topicName, new Message<string, string>
                {
                    Key = Guid.NewGuid().ToString(),
                    Value = JsonConvert.SerializeObject(message)
                });
            }
            catch (ProduceException<Null, string> ex)
            {
                _logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
            }
        }

        public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = _configService.KafkaServerAddress,
                GroupId = "Consumer",
                EnableAutoCommit = false, // 禁止AutoCommit
                Acks = Acks.Leader, // 假设只需要Leader响应即可
                AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
            };
            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                consumer.Subscribe(topics);
                try
                {
                    while (true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cancellationToken);
                            Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
                            if (consumeResult!.IsPartitionEOF)
                            {
                                Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
                                continue;
                            }
                            T? messageResult = null;
                            try
                            {
                                messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message!.Value)!;
                            }
                            catch (Exception ex)
                            {
                                var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message!.Value}】 :{ex.StackTrace?.ToString()}";
                                Console.WriteLine(errorMessage);
                                messageResult = null;
                            }
                            if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
                            {
                                messageFunc(messageResult);
                                try
                                {
                                    consumer.Commit(consumeResult);
                                }
                                catch (KafkaException e)
                                {
                                    Console.WriteLine(e.Message);
                                }
                            }
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Consume error: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    Console.WriteLine("Closing consumer.");
                    consumer.Close();
                }
            }

            await Task.CompletedTask;
        }
    }
}