using HealthMonitor.Common;
using HealthMonitor.Core.Dal;
using HealthMonitor.Model.Config;
using HealthMonitor.Service.Biz.db;
using HealthMonitor.Service.Cache;
using HealthMonitor.Service.Resolver.Factory;
using HealthMonitor.Service.Resolver.Interface;
using HealthMonitor.Service.Sub.Topic.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using TDengineDriver;
using TDengineTMQ;
using TelpoDataService.Util.Entities.GpsLocationHistory;
using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database;

namespace HealthMonitor.Service.Sub
{
    public class TDengineDataSubcribe
    {
        private readonly ILogger<TDengineDataSubcribe> _logger;
        private readonly MsgQueueManager _msgQueueManager;  
        private readonly TDengineService _serviceTDengine;
        private readonly PersonCacheManager _personCacheMgr;
        private readonly TDengineServiceConfig _configTDengineService;
        private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
        private readonly IResolverFactory _resolverFactory;
        private CancellationTokenSource? _tokenSource = null;

        //private int cnt = 0;

        public TDengineDataSubcribe(
            TDengineService serviceTDengine,
            PersonCacheManager personCacheMgr,
            BloodPressReferenceValueCacheManager bpRefValCacheManager,
            IResolverFactory resolverFactory,
            IOptions<TDengineServiceConfig> configTDengineService,
            MsgQueueManager msgQueueManager,
            ILogger<TDengineDataSubcribe> logger
            ) 
        {
            _serviceTDengine = serviceTDengine;
            _personCacheMgr = personCacheMgr;
            _bpRefValCacheManager = bpRefValCacheManager;
            _logger = logger;
            _resolverFactory = resolverFactory;
            _msgQueueManager = msgQueueManager;
            _configTDengineService = configTDengineService.Value;

        }
        public void BeginListen(CancellationToken stoppingToken)
        {
            
            _tokenSource?.Cancel();
            _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
            DoTDengineConnect();
        }

        public void DoTDengineConnect()
        {
            var conn = _serviceTDengine.Connection();
            DoReceive(conn);
        }

        public void DoReceive(IntPtr connection)
        {
            #region topic 订阅
            // string topic = "topichmbpstats";
            //string bloodPressTopic = nameof(TopicHmBloodPress).ToLower();
            //TopicHmBloodPress fields = new();
            //PropertyInfo[] props = fields.GetType().GetProperties();
            //// 获取 fields
            //string attributes = "";
            //foreach (PropertyInfo prop in props)
            //{
            //    JsonPropertyAttribute attr = prop.GetCustomAttribute<JsonPropertyAttribute>()!;
            //    if (attr != null)
            //    {
            //        attributes += attr.PropertyName + ",";
            //    }
            //}
            //attributes = attributes.TrimEnd(',');

            ////创建 topichmbpstats
            //IntPtr res = TDengine.Query(Connection, $"create topic if not exists {bloodPressTopic} as select {attributes} from health_monitor.stb_hm_bloodpress");

            ////创建 topichmpregnancyheartrate
            //var pregnancyHeartRateTopic = nameof(TopicHmPregnancyHeartRate).ToLower();
            //var pregnancyHeartateAttributes = typeof(TopicHmPregnancyHeartRate).GetType().GetProperties().Select(prop => prop.GetCustomAttribute<JsonPropertyAttribute>());
            //var pregnancyHeartateAttributesStr = string.Join(", ", pregnancyHeartateAttributes);
            //res = TDengine.Query(Connection, $"create topic if not exists {pregnancyHeartRateTopic} as select {pregnancyHeartateAttributesStr} from {_configTDengineService.DB}.stb_hm_pregnancy_heart_rate");

            //if (TDengine.ErrorNo(res) != 0)
            //{
            //    _logger.LogError($"create topic failed, reason:{TDengine.Error(res)}");
            //    throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
            //}

            // 获取字段属性
            string GetAttributes(Type type)
            {
                var props = type.GetProperties();
                var attributeNames = props
                    .Select(prop => prop.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName)
                    .Where(attr => !string.IsNullOrEmpty(attr));

                return string.Join(",", attributeNames);
            }

            // 创建 topic
            void CreateTopic(IntPtr conn, string topicName, string attributes, string tableName)
            {
                string query = $"create topic if not exists {topicName} as select {attributes} from {tableName}";
                IntPtr res = TDengine.Query(conn, query);

                if (TDengine.ErrorNo(res) != 0)
                {
                    string error = TDengine.Error(res);
                    _logger.LogError($"Create topic {topicName} failed, reason: {error}");
                    throw new Exception($"Create topic {topicName} failed, reason: {error}");
                }
            }

            // 血压 topic
            string bloodPressTopic = nameof(TopicHmBloodPress).ToLower();
            string bloodPressAttributes = GetAttributes(typeof(TopicHmBloodPress));
            CreateTopic(connection, bloodPressTopic, bloodPressAttributes, $"{_configTDengineService.DB}.stb_hm_bloodpress");

            // 孕期心率 topic
            string pregnancyHeartRateTopic = nameof(TopicHmPregnancyHeartRate).ToLower();
            string pregnancyHeartRateAttributes = GetAttributes(typeof(TopicHmPregnancyHeartRate));
            CreateTopic(connection, pregnancyHeartRateTopic, pregnancyHeartRateAttributes, $"{_configTDengineService.DB}.stb_hm_pregnancy_heart_rate");


            #endregion


            
            var cfg = new ConsumerConfig
            {
                GourpId = "group_1",
                TDConnectUser = _configTDengineService.UserName,
                TDConnectPasswd = _configTDengineService.Password,
                MsgWithTableName = "true",
                TDConnectIp = _configTDengineService.Host,
            };
            // create consumer 
            var consumer = new ConsumerBuilder(cfg).Build();

            var topics = new string[] { bloodPressTopic, pregnancyHeartRateTopic };
            
            // subscribe
            consumer.Subscribe(topics);
            
            while (!_tokenSource!.IsCancellationRequested)
            {
                var consumeRes = consumer.Consume(300);
                foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
                {
                    for (int i = 0; i < kv.Value.Datas.Count; i++)
                    {
                        if (((i + 1) % kv.Value.Metas.Count == 0))
                        {
                            try
                            {
                                IDictionary<string, object> row = new Dictionary<string, object>();
                                foreach (var meta in kv.Value.Metas)
                                {
                                    int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
                                    row.Add(meta.name, kv.Value.Datas[index]);
                                }


                                var db = kv.Key.db;
                                var table = kv.Key.table;
                                var kvTopic = kv.Key.topic;
                                var body = JsonConvert.SerializeObject(row);

                                ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
                                ParsePackage(msg);
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError("解析包发生异常:{ex.Message}, {ex.StackTrace}", ex.Message, ex.StackTrace);
                               
                            }
                        }
                    }  
                }
                consumer.Commit(consumeRes);
            }

            // close consumer after use.Otherwise will lead memory leak.
            consumer.Close();
            TDengine.Close(connection);
        }

        public void ParsePackage(ReceiveMessageModel model)
        {
            var msg = _resolverFactory.ParseAndWrap(model);
            if (msg == null) return;
            _msgQueueManager.Enqueue(msg);
        }

    }
}