diff --git a/NearCardAttendance.TcpServer/NearCardAttendance.TcpServer.csproj b/NearCardAttendance.TcpServer/NearCardAttendance.TcpServer.csproj index 56dcac3..0664919 100644 --- a/NearCardAttendance.TcpServer/NearCardAttendance.TcpServer.csproj +++ b/NearCardAttendance.TcpServer/NearCardAttendance.TcpServer.csproj @@ -11,10 +11,14 @@ + + + PreserveNewest + PreserveNewest diff --git a/NearCardAttendance.TcpServer/Worker.cs b/NearCardAttendance.TcpServer/Worker.cs deleted file mode 100644 index 27cc358..0000000 --- a/NearCardAttendance.TcpServer/Worker.cs +++ /dev/null @@ -1,288 +0,0 @@ -using Confluent.Kafka; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using NearCardAttendance.Model; -using Newtonsoft.Json; -using Serilog.Context; -using Serilog; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using NearCardAttendance.Common.helper; -using Newtonsoft.Json.Linq; -using NearCardAttendance.Service.MessageQueue.Model; - - -namespace NearCardAttendance.TcpServer -{ - public class Worker : BackgroundService - { - private readonly ILogger _logger; - private readonly ServiceConfig _configService; - private readonly HttpHelper _httpHelper = default!; - private int _messageCount = 0; - - public Worker(ILogger logger, IOptions _optConfigService,HttpHelper httpHelper) - { - _logger = logger; - _configService = _optConfigService.Value; - _httpHelper = httpHelper; - } - /** - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - var kafkaConsumer = CreateKafkaConsumer(); - kafkaConsumer.Subscribe("topics.storage.near_card_attendance"); - - // process messages every 50 messages or every minute - // 每隔1分钟或没50条消息就批量处理一次 - var tasks = new List(); - - #region 每分钟触发一次 - var messageCounter = 0; // 消息计数器 - var timer = new System.Timers.Timer(TimeSpan.FromMinutes(1).TotalMilliseconds); // 定时器,每分钟触发一次 - timer.AutoReset = true; - - // 定时器事件处理程序,用于定时处理消息 - timer.Elapsed += async (sender, e) => - { - // 如果计数器大于 0,则处理消息 - if (messageCounter > 0) - { - await ProcessMessagesAsync(tasks); - messageCounter = 0; // 处理完成后重置计数器 - } - }; - - timer.Start(); // 启动定时器 - #endregion - - try - { - while (!stoppingToken.IsCancellationRequested) - { - var consumeResult = kafkaConsumer.Consume(stoppingToken); - if (consumeResult != null) - { - //tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer)); - // 处理消息 - tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer)); - messageCounter++; // 增加消息计数器 - - // 如果消息计数达到 30 条,则处理消息并重置计数器 - if (messageCounter >= 30) - { - await ProcessMessagesAsync(tasks); - messageCounter = 0; // 处理完成后重置计数器 - } - } - } - } - catch (OperationCanceledException) - { - _logger.LogWarning("Worker exit"); - } - catch (Exception ex) - { - _logger.LogError($"An error occurred: {ex.Message}"); - } - - await Task.WhenAll(tasks); - } - - // 异步处理消息的方法 - private async Task ProcessMessagesAsync(List tasks) - { - await Task.WhenAll(tasks); // 等待所有任务完成 - tasks.Clear(); // 清空任务列表 - } - - private async Task ProcessMessageAsync(ConsumeResult consumeResult, IConsumer kafkaConsumer) - { - // 处理消息的逻辑 - if (consumeResult != null) - { - - try - { - var url = _configService.XinHuaLeYuUrl; - var data = new { }; - var res = await _httpHelper.HttpToPostAsync(url, data); - - } - catch (Exception ex) - { - //kafkaConsumer.Commit(consumeResult); - _logger.LogError($"消费出错:{consumeResult.Message.Value},\n{ex.Message}\n{ex.StackTrace}"); - } - } - } - */ - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - var kafkaConsumer = CreateKafkaConsumer(); - kafkaConsumer.Subscribe("topics.storage.near_card_attendance"); - var tasks = new List(); - List> consumeBatchResult = new List>(); - try - { - while (!stoppingToken.IsCancellationRequested) - { - var consumeResult = kafkaConsumer.Consume(stoppingToken); - if (consumeResult != null) - { - _messageCount++; - consumeBatchResult.Add(consumeResult); - //// 30条消息为一批 - if (_messageCount % 30 == 0) - { - if (!await ProcessBatchMessageAsync(consumeBatchResult, kafkaConsumer)) - { // 返回结果错误暂停5分钟 - await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken); - } - } - - } - } - } - catch (OperationCanceledException) - { - _logger.LogWarning("Worker exit"); - } - catch (Exception ex) - { - _logger.LogError($"An error occurred: {ex.Message}"); - } - } - private async Task ProcessMessageAsync(ConsumeResult consumeResult, IConsumer kafkaConsumer) - { - try - { - var url = _configService.XinHuaLeYuUrl; - var data = new { }; - var res = await _httpHelper.HttpToPostAsync(url, data); - if (!string.IsNullOrEmpty(res)) - { - JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!; - if ((bool)resObj["success"]!) - { - kafkaConsumer.Commit(consumeResult); - return true; - } - } - - } - catch (Exception ex) - { - //kafkaConsumer.Commit(consumeResult); - _logger.LogError($"消费出错:{consumeResult.Message.Value},\n{ex.Message}\n{ex.StackTrace}"); - - } - return false; - - } - - private async Task ProcessBatchMessageAsync(List> consumeBatchResult, IConsumer kafkaConsumer) - { - try - { - - var url =$"{_configService.XinHuaLeYuUrl}/user/electronicCardAttendance/receiveTbAttendanceRecordException"; - var list = new List(); - //consumeBatchResult.ForEach(x => { - // JObject msg = (JObject)JsonConvert.DeserializeObject(x.Message.Value)!; - // list.Add(new - // { - // attendanceStatus = int.TryParse(msg["data"]!["attendanceStatus"]!.ToString(), out int status) ? status : 0, - // attendanceTime = msg["data"]!["attendanceTime"]!.ToString(), - // imei = msg["data"]!["imei"]!.ToString() - // }) ; - //}); - - consumeBatchResult.ForEach(x => { - EventData msg = JsonConvert.DeserializeObject(x.Message.Value)!; - JObject content = (JObject)JsonConvert.DeserializeObject(msg.Content)!; - list.Add(new - { - attendanceStatus = int.TryParse(content!["attendanceStatus"]!.ToString(), out int status) ? status : 0, - attendanceTime = content!["attendanceTime"]!.ToString(), - imei = content!["imei"]!.ToString() - }); - }); - - - var data = new { - data= list - }; - var res = await _httpHelper.HttpToPostAsync(url, data); - if (!string.IsNullOrEmpty(res)) - { - JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!; - if ((bool)resObj["success"]!) - { - consumeBatchResult.ForEach(x => - { - kafkaConsumer.Commit(x); - }); - return true; - } - } - - } - catch (Exception ex) - { - _logger.LogError($"处理消息出错 \n{ex.Message}\n{ex.StackTrace}"); - - } - return false; - - } - - - /// - /// 创建消费者 - /// - /// - private IConsumer CreateKafkaConsumer() - { - - var consumerConfig = new ConsumerConfig - { - GroupId = "near_card_attendance", - BootstrapServers = _configService.KafkaServerAddress, - AutoOffsetReset = AutoOffsetReset.Earliest, - EnableAutoCommit = false, // 关闭自动提交偏移量 - CancellationDelayMaxMs = 1//set CancellationDelayMaxMs - }; - - return new ConsumerBuilder(consumerConfig) - .SetErrorHandler((_, e) => - { - - //Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); - _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); - }) - .SetPartitionsAssignedHandler((c, partitions) => - { - //// 在这里手动指定要消费的分区 - //var partitionsToConsume = new List - //{ - // new TopicPartitionOffset("topics.storage.near_card_attendance", partitionIndex, Offset.Unset) - //}; - - ////c.Assign(partitionsToConsume); - //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}"); - //return partitionsToConsume; - }) - .SetPartitionsRevokedHandler((c, partitions) => - { - - }) - .Build(); - } - } -} diff --git a/NearCardAttendance.TcpServer/appsettings.production.json b/NearCardAttendance.TcpServer/appsettings.production.json new file mode 100644 index 0000000..4a411c7 --- /dev/null +++ b/NearCardAttendance.TcpServer/appsettings.production.json @@ -0,0 +1,8 @@ +{ + "AllowedHosts": "*", + "ServiceConfig": { + "XinHuaLeYuUrl": "https://midplat.xinhualeyu.com/dev-api", + "TelpoDataUrl": "https://ai.ssjlai.com/data", + "KafkaServerAddress": "172.19.42.40:9092,172.19.42.41:9092,172.19.42.48:9092" + } +}