using Confluent.Kafka; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NearCardAttendance.Common.helper; using NearCardAttendance.Model; using Newtonsoft.Json.Linq; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; using NearCardAttendance.Service.MessageQueue.Model; namespace NearCardAttendance.TcpServer { public class Server : BackgroundService { private readonly ILogger _logger; //private readonly IServiceProvider _serviceProvider; private readonly ServerBootstrap _serverBootstrap; // private IChannel _serverChannel = default!; private CancellationTokenSource _tokenSource = null!; private readonly ServiceConfig _configService; private readonly HttpHelper _httpHelper = default!; //private int _messageCount = 0; public Server( ILogger logger, ServerBootstrap serverBootstrap, IServiceProvider serviceProvider, HttpHelper httpHelper, IOptions _optConfigService) { _logger = logger; //_serviceProvider = serviceProvider; _configService = _optConfigService.Value; _serverBootstrap = serverBootstrap; _httpHelper = httpHelper; } public override Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("------StartAsync"); _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); return base.StartAsync(cancellationToken); } public override Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("------StopAsync"); _tokenSource.Cancel(); //停止工作线程 return base.StopAsync(cancellationToken); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("DotNetty server starting..."); //var address = new IPEndPoint(IPAddress.Any, 12345); //IChannel _serverChannel = await _serverBootstrap.BindAsync(address); string ipAddress = "0.0.0.0"; int port = 16662; var endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); IChannel _serverChannel = await _serverBootstrap.BindAsync(endPoint); // _serverChannel.GetAttribute(MessageIdAttribute.Key).Set("SomeRequestId"); //_logger.LogInformation("DotNetty server started on {0}.", address); _logger.LogInformation("DotNetty server started on {0}.", endPoint); #region kafka var kafkaConsumer = CreateKafkaConsumer(); kafkaConsumer.Subscribe("topics.storage.near_card_attendance"); var tasks = new List(); List> consumeBatchResult = new List>(); try { while (!stoppingToken.IsCancellationRequested) { var consumeResult = kafkaConsumer.Consume(stoppingToken); if (consumeResult != null) { //_messageCount++; consumeBatchResult.Add(consumeResult); #if DEBUG if (consumeBatchResult.Count % 2 == 0) { if (!await ProcessBatchMessageAsync(consumeBatchResult, kafkaConsumer)) { // 返回结果错误暂停5分钟 await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken); } } //// 30条消息为一批 #else if (consumeBatchResult.Count % 30 == 0) { if (!await ProcessBatchMessageAsync(consumeBatchResult, kafkaConsumer)) { // 返回结果错误暂停5分钟 await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken); } } #endif } } } catch (OperationCanceledException) { _logger.LogWarning("Worker exit"); } catch (Exception ex) { _logger.LogError($"An error occurred: {ex.Message}"); } #endregion // Wait until the service is stopped stoppingToken.WaitHandle.WaitOne(); _logger.LogInformation("DotNetty server stopping..."); // Close the server channel and release resources await _serverChannel.CloseAsync(); _logger.LogInformation("DotNetty server stopped."); } private async Task ProcessBatchMessageAsync(List> consumeBatchResult, IConsumer kafkaConsumer) { try { var url = $"{_configService.XinHuaLeYuUrl}/user/electronicCardAttendance/receiveTbAttendanceRecordException"; var list = new List(); //consumeBatchResult.ForEach(x => { // JObject msg = (JObject)JsonConvert.DeserializeObject(x.Message.Value)!; // list.Add(new // { // attendanceStatus = int.TryParse(msg["data"]!["attendanceStatus"]!.ToString(), out int status) ? status : 0, // attendanceTime = msg["data"]!["attendanceTime"]!.ToString(), // imei = msg["data"]!["imei"]!.ToString() // }) ; //}); consumeBatchResult.ForEach(x => { EventData msg = JsonConvert.DeserializeObject(x.Message.Value)!; JObject content = (JObject)JsonConvert.DeserializeObject(msg.Content)!; list.Add(new { attendanceStatus = int.TryParse(content!["attendanceStatus"]!.ToString(), out int status) ? status : 0, attendanceTime = content!["attendanceTime"]!.ToString(), imei = content!["imei"]!.ToString() }); }); var data = new { data = list }; var res = await _httpHelper.HttpToPostAsync(url, data); if (!string.IsNullOrEmpty(res)) { JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!; if ((bool)resObj["success"]!) { _logger.LogInformation($"{nameof(ProcessBatchMessageAsync)} 推送 {JsonConvert.SerializeObject(data)} 结果,{res}"); consumeBatchResult.ForEach(x => { kafkaConsumer.Commit(x); _logger.LogInformation($"完成消费:{JsonConvert.SerializeObject(x.Message.Value)}"); }); consumeBatchResult.Clear(); return true; } } } catch (Exception ex) { _logger.LogError($"处理消息出错 \n{ex.Message}\n{ex.StackTrace}"); } return false; } private IConsumer CreateKafkaConsumer() { var consumerConfig = new ConsumerConfig { GroupId = "near_card_attendance", BootstrapServers = _configService.KafkaServerAddress, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 关闭自动提交偏移量 //CancellationDelayMaxMs = 1//set CancellationDelayMaxMs }; return new ConsumerBuilder(consumerConfig) .SetErrorHandler((_, e) => { //Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); }) .SetPartitionsAssignedHandler((c, partitions) => { //// 在这里手动指定要消费的分区 //var partitionsToConsume = new List //{ // new TopicPartitionOffset("topics.storage.near_card_attendance", partitionIndex, Offset.Unset) //}; ////c.Assign(partitionsToConsume); //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}"); //return partitionsToConsume; }) .SetPartitionsRevokedHandler((c, partitions) => { }) .Build(); } } }