using HealthMonitor.Common;
using HealthMonitor.Core.Dal;
using HealthMonitor.Model.Config;
using HealthMonitor.Service.Biz.db;
using HealthMonitor.Service.Cache;
using HealthMonitor.Service.Resolver.Factory;
using HealthMonitor.Service.Resolver.Interface;
using HealthMonitor.Service.Sub.Topic.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using TDengineDriver;
using TDengineTMQ;
using TelpoDataService.Util.Entities.GpsLocationHistory;

namespace HealthMonitor.Service.Sub
{
    public class TDengineDataSubcribe
    {
        private readonly ILogger<TDengineDataSubcribe> _logger;
        private readonly MsgQueueManager _msgQueueManager;  
        private readonly TDengineService _serviceTDengine;
        private readonly PersonCacheManager _personCacheMgr;
        private readonly TDengineServiceConfig _configTDengineService;
        private readonly BloodPressReferenceValueCacheManager _bpRefValCacheManager;
        private readonly IResolverFactory _resolverFactory;
        private CancellationTokenSource? _tokenSource = null;

        //private int cnt = 0;

        public TDengineDataSubcribe(
            TDengineService serviceTDengine,
            PersonCacheManager personCacheMgr,
            BloodPressReferenceValueCacheManager bpRefValCacheManager,
            IResolverFactory resolverFactory,
            IOptions<TDengineServiceConfig> configTDengineService,
            MsgQueueManager msgQueueManager,
            ILogger<TDengineDataSubcribe> logger
            ) 
        {
            _serviceTDengine = serviceTDengine;
            _personCacheMgr = personCacheMgr;
            _bpRefValCacheManager = bpRefValCacheManager;
            _logger = logger;
            _resolverFactory = resolverFactory;
            _msgQueueManager = msgQueueManager;
            _configTDengineService = configTDengineService.Value;

        }
        public void BeginListen(CancellationToken stoppingToken)
        {
            //var cfg = new ConsumerConfig
            //{
            //    GourpId = "group_1",
            //    TDConnectUser = "root",
            //    TDConnectPasswd = "taosdata",
            //    MsgWithTableName = "true",
            //    TDConnectIp = "47.116.142.20",
            //};
            //var conn = GetConnection();


            //ProcessMsg(consumer);

            //防止造成多线程运行
            _tokenSource?.Cancel();

            _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
            DoTDengineConnect();
        }

        public void DoTDengineConnect()
        {
            //            string host = _configTDengineService.Host;
            //            short port = 6030;
            //            string username = _configTDengineService.UserName;
            //            string password = _configTDengineService.Password;
            //            string dbname = _configTDengineService.DB;
            ////#if DEBUG
            ////            //string configDir = "C:/TDengine/cfg";
            ////            //TDengine.Options((int)TDengineInitOption.TSDB_OPTION_CONFIGDIR, configDir);
            ////            TDengine.Options((int)TDengineInitOption.TSDB_OPTION_TIMEZONE, "Asia/Beijing");
            ////#endif
            //            var conn = TDengine.Connect(host, username, password, dbname, port);

            //            if (conn == IntPtr.Zero)
            //            {
            //                _logger.LogError("reason:{TDengine.Error(conn)}", TDengine.Error(conn));

            //            }
            //            else
            //            {
            //                _logger.LogInformation($"连接 TDengine 成功....");

            //            }

            var conn = _serviceTDengine.Connection();
            DoReceive(conn);
        }

        public void DoReceive(IntPtr Connection)
        {
            
           // string topic = "topic_hm_bp_stats";
            string topic = nameof(TopicHmBloodPress).ToLower();
            TopicHmBloodPress fields = new();
            PropertyInfo[] props = fields.GetType().GetProperties();

            // 获取 fields
            string attributes = "";
            foreach (PropertyInfo prop in props)
            {
                JsonPropertyAttribute attr = prop.GetCustomAttribute<JsonPropertyAttribute>()!;
                if (attr != null)
                {
                    attributes += attr.PropertyName + ",";
                }
            }
            attributes = attributes.TrimEnd(',');

            //create topic 
            IntPtr res = TDengine.Query(Connection, $"create topic if not exists {topic} as select {attributes} from health_monitor.stb_hm_bloodpress");

            if (TDengine.ErrorNo(res) != 0)
            {
                _logger.LogError($"create topic failed, reason:{TDengine.Error(res)}");
                throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
            }
            var cfg = new ConsumerConfig
            {
                GourpId = "group_1",
                TDConnectUser = _configTDengineService.UserName,
                TDConnectPasswd = _configTDengineService.Password,
                MsgWithTableName = "true",
                TDConnectIp = _configTDengineService.Host,
            };
            // create consumer 
            var consumer = new ConsumerBuilder(cfg)
                .Build();

            // subscribe
            consumer.Subscribe(topic);

            while (!_tokenSource!.IsCancellationRequested)
            {
                var consumeRes = consumer.Consume(300);
                foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
                {
                    for (int i = 0; i < kv.Value.Datas.Count; i++)
                    {
                        if (((i + 1) % kv.Value.Metas.Count == 0))
                        {
                            try
                            {

                                IDictionary<string, object> row = new Dictionary<string, object>();
                                foreach (var meta in kv.Value.Metas)
                                {
                                    int index = i - (kv.Value.Metas.Count - kv.Value.Metas.IndexOf(meta) - 1);
                                    //var value = kv.Value.Datas[index];
                                    row.Add(meta.name, kv.Value.Datas[index]);
                                }


                                var db = kv.Key.db;
                                var table = kv.Key.table;
                                var kvTopic = kv.Key.topic;
                                var body = JsonConvert.SerializeObject(row);

                                ReceiveMessageModel msg = new(db, table, kvTopic, row["message_id"].ToString()!, body);
                                ParsePackage(msg);
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError("解析包发生异常:{ex.Message}, {ex.StackTrace}", ex.Message, ex.StackTrace);
                               
                            }
                        }
                    }  
                }
                consumer.Commit(consumeRes);
              
                //_logger.LogInformation("监听中....");
               // Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff},监听中....");
            }

            // close consumer after use.Otherwise will lead memory leak.
            consumer.Close();
            TDengine.Close(Connection);
        }

        public void ParsePackage(ReceiveMessageModel model)
        {
            var msg = _resolverFactory.ParseAndWrap(model);
            if (msg == null) return;
            _msgQueueManager.Enqueue(msg);
        }

    }
}