From f5a00a30f0ffc97ff33bc6e2f35e4e488f0d82ee Mon Sep 17 00:00:00 2001 From: H Vs Date: Sat, 2 Mar 2024 17:51:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96=E5=88=B0kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- NearCardAttendance.Model/Attendance.cs | 17 ++ NearCardAttendance.Model/ServiceConfig.cs | 2 + .../MessageQueue/Kafka/IKafkaService.cs | 15 + .../MessageQueue/Kafka/KafkaService.cs | 141 +++++++++ .../MessageQueue/Model/EventData.cs | 14 + .../MessageQueue/Model/IMEIMessage.cs | 13 + .../MessageQueue/MqProcessLogic.cs | 24 ++ .../NearCardAttendance.Service.csproj | 1 + .../TcpServer/Handler/RegisterHandler.cs | 81 ++++- .../NearCardAttendance.TcpServer.csproj | 1 + NearCardAttendance.TcpServer/Program.cs | 5 +- NearCardAttendance.TcpServer/Server.cs | 164 +++++++++- NearCardAttendance.TcpServer/Worker.cs | 288 ++++++++++++++++++ .../appsettings.debug.json | 5 +- .../appsettings.test.json | 5 +- 15 files changed, 761 insertions(+), 15 deletions(-) create mode 100644 NearCardAttendance.Model/Attendance.cs create mode 100644 NearCardAttendance.Service/MessageQueue/Kafka/IKafkaService.cs create mode 100644 NearCardAttendance.Service/MessageQueue/Kafka/KafkaService.cs create mode 100644 NearCardAttendance.Service/MessageQueue/Model/EventData.cs create mode 100644 NearCardAttendance.Service/MessageQueue/Model/IMEIMessage.cs create mode 100644 NearCardAttendance.Service/MessageQueue/MqProcessLogic.cs create mode 100644 NearCardAttendance.TcpServer/Worker.cs diff --git a/NearCardAttendance.Model/Attendance.cs b/NearCardAttendance.Model/Attendance.cs new file mode 100644 index 0000000..46fb086 --- /dev/null +++ b/NearCardAttendance.Model/Attendance.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace NearCardAttendance.Model +{ + public class Attendance + { + public int AttendanceStatus { get; set; } + + public string AttendanceTime { get; set; } =default!; + + public string Imei { get; set; } = default!; + } +} diff --git a/NearCardAttendance.Model/ServiceConfig.cs b/NearCardAttendance.Model/ServiceConfig.cs index 3a9c794..8a9a50b 100644 --- a/NearCardAttendance.Model/ServiceConfig.cs +++ b/NearCardAttendance.Model/ServiceConfig.cs @@ -9,5 +9,7 @@ namespace NearCardAttendance.Model public class ServiceConfig { public string XinHuaLeYuUrl { get; set; } = default!; + + public string KafkaServerAddress { get; set; } = default!; } } diff --git a/NearCardAttendance.Service/MessageQueue/Kafka/IKafkaService.cs b/NearCardAttendance.Service/MessageQueue/Kafka/IKafkaService.cs new file mode 100644 index 0000000..b6fe672 --- /dev/null +++ b/NearCardAttendance.Service/MessageQueue/Kafka/IKafkaService.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace NearCardAttendance.Service.MessageQueue.Kafka +{ + public interface IKafkaService + { + Task PublishAsync(string topicName, T message) where T : class; + + Task SubscribeAsync(IEnumerable topics, Action messageFunc, CancellationToken cancellationToken = default) where T : class; + } +} diff --git a/NearCardAttendance.Service/MessageQueue/Kafka/KafkaService.cs b/NearCardAttendance.Service/MessageQueue/Kafka/KafkaService.cs new file mode 100644 index 0000000..b702dda --- /dev/null +++ b/NearCardAttendance.Service/MessageQueue/Kafka/KafkaService.cs @@ -0,0 +1,141 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using NearCardAttendance.Common; +using NearCardAttendance.Model; +using NearCardAttendance.Service.MessageQueue.Model; + +namespace NearCardAttendance.Service.MessageQueue.Kafka +{ + public class KafkaService : IKafkaService + { + + private readonly ILogger _logger; + private readonly ServiceConfig _configService; + public KafkaService(IOptions _optConfigService, ILogger logger) + { + _configService = _optConfigService.Value; + _logger = logger; + } + + public async Task PublishAsync(string topicName, T message) where T : class + { + try + { + Type messageType = typeof(T); + var config = new ProducerConfig + { + BootstrapServers = _configService.KafkaServerAddress, + EnableIdempotence = true, + Acks = Acks.All, + MessageSendMaxRetries = 3 + }; + + if (message.GetType().Equals(typeof(EventData))) + { + using var producer = new ProducerBuilder(config).Build(); + string imei = messageType.GetProperty("IMEI")!.GetValue(message)!.ToString()!; + //var tailNo = long.Parse(messageType.GetProperty("IMEI")!.GetValue(message)!.ToString()!) % 100; + + //int tailNo = SafeType.SafeInt(imei.Substring(imei.Length - 2)); + var messageId = messageType.GetProperty("MessageId")!.GetValue(message)!.ToString()!; + //await producer.ProduceAsync(new TopicPartition(topicName, new Partition(tailNo)), new Message + //{ + // Key = messageId, + // Value = JsonConvert.SerializeObject(message), + + //}); + await producer.ProduceAsync(topicName, new Message + { + Key = messageId, + Value = JsonConvert.SerializeObject(message), + }); + + // TopicPartition topicPartition = new TopicPartition(topicName, new Partition(tailNo)); + + } + else + { + using var producer = new ProducerBuilder(config).Build(); + await producer.ProduceAsync(topicName, new Message + { + Key = Guid.NewGuid().ToString(), + Value = JsonConvert.SerializeObject(message) + }); + } + + } + catch (ProduceException ex) + { + _logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}"); + } + } + + public async Task SubscribeAsync(IEnumerable topics, Action messageFunc, CancellationToken cancellationToken = default) where T : class + { + var config = new ConsumerConfig + { + BootstrapServers = _configService.KafkaServerAddress, + GroupId = "Consumer", + EnableAutoCommit = false, // 禁止AutoCommit + Acks = Acks.Leader, // 假设只需要Leader响应即可 + AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起 + }; + using (var consumer = new ConsumerBuilder(config).Build()) + { + consumer.Subscribe(topics); + try + { + while (true) + { + try + { + var consumeResult = consumer.Consume(cancellationToken); + Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'."); + if (consumeResult!.IsPartitionEOF) + { + Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); + continue; + } + T? messageResult = null; + try + { + messageResult = JsonConvert.DeserializeObject(consumeResult.Message!.Value)!; + } + catch (Exception ex) + { + var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message!.Value}】 :{ex.StackTrace?.ToString()}"; + Console.WriteLine(errorMessage); + messageResult = null; + } + if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) + { + messageFunc(messageResult); + try + { + consumer.Commit(consumeResult); + } + catch (KafkaException e) + { + Console.WriteLine(e.Message); + } + } + } + catch (ConsumeException e) + { + Console.WriteLine($"Consume error: {e.Error.Reason}"); + } + } + } + catch (OperationCanceledException) + { + Console.WriteLine("Closing consumer."); + consumer.Close(); + } + } + + await Task.CompletedTask; + } + } +} diff --git a/NearCardAttendance.Service/MessageQueue/Model/EventData.cs b/NearCardAttendance.Service/MessageQueue/Model/EventData.cs new file mode 100644 index 0000000..e0cd487 --- /dev/null +++ b/NearCardAttendance.Service/MessageQueue/Model/EventData.cs @@ -0,0 +1,14 @@ +namespace NearCardAttendance.Service.MessageQueue.Model +{ + public class EventData + { + public string EventTime { get; set; } = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"); + public string TopicName { get; set; } = default!; + public string MessageId { get; set; } = default!; + + public string IMEI{ get; set; } = default!; + + public string Content { get; set; } = default!; + //public T Message { get; set; } = default!; + } +} diff --git a/NearCardAttendance.Service/MessageQueue/Model/IMEIMessage.cs b/NearCardAttendance.Service/MessageQueue/Model/IMEIMessage.cs new file mode 100644 index 0000000..9d0bfd1 --- /dev/null +++ b/NearCardAttendance.Service/MessageQueue/Model/IMEIMessage.cs @@ -0,0 +1,13 @@ +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace NearCardAttendance.Service.MessageQueue.Model +{ + public class IMEIMessage + { + [JsonProperty("imei")] + public string IMEI { get; set; } = default!; + [JsonProperty("content")] + public JObject Content { get; set; } = default!; + } +} diff --git a/NearCardAttendance.Service/MessageQueue/MqProcessLogic.cs b/NearCardAttendance.Service/MessageQueue/MqProcessLogic.cs new file mode 100644 index 0000000..962a077 --- /dev/null +++ b/NearCardAttendance.Service/MessageQueue/MqProcessLogic.cs @@ -0,0 +1,24 @@ +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using NearCardAttendance.Service.MessageQueue.Kafka; +using NearCardAttendance.Service.MessageQueue.Model; + +namespace NearCardAttendance.Service.MessageQueue +{ + public class MqProcessLogic + { + private readonly ILogger _logger; + + private readonly KafkaService _serviceKafka; + public MqProcessLogic(ILogger logger, KafkaService serviceKafka) + { + _logger = logger; + _serviceKafka = serviceKafka; + } + public async Task ProcessIMEIEventMessageAsync(EventData eventData) + { + await _serviceKafka.PublishAsync(eventData.TopicName, eventData); + _logger.LogInformation($"推送消息 {eventData.MessageId} 内容:{JsonConvert.SerializeObject(eventData)}"); + } + } +} diff --git a/NearCardAttendance.Service/NearCardAttendance.Service.csproj b/NearCardAttendance.Service/NearCardAttendance.Service.csproj index 957e400..3823a78 100644 --- a/NearCardAttendance.Service/NearCardAttendance.Service.csproj +++ b/NearCardAttendance.Service/NearCardAttendance.Service.csproj @@ -9,6 +9,7 @@ + diff --git a/NearCardAttendance.Service/TcpServer/Handler/RegisterHandler.cs b/NearCardAttendance.Service/TcpServer/Handler/RegisterHandler.cs index 40c31f0..1cbcd96 100644 --- a/NearCardAttendance.Service/TcpServer/Handler/RegisterHandler.cs +++ b/NearCardAttendance.Service/TcpServer/Handler/RegisterHandler.cs @@ -18,6 +18,9 @@ using TelpoDataService.Util; using TelpoDataService.Util.Clients; using TelpoDataService.Util.Models; using TelpoDataService.Util.QueryObjects; +using NearCardAttendance.Service.MessageQueue; +using NearCardAttendance.Service.MessageQueue.Model; +using Newtonsoft.Json.Linq; namespace NearCardAttendance.Service.TcpServer.Handler { @@ -25,6 +28,7 @@ namespace NearCardAttendance.Service.TcpServer.Handler { private readonly ILogger _logger; private readonly HttpHelper _httpHelper = default!; + private readonly MqProcessLogic _serviceMqProcess; //private readonly IDisposable _loggerScope = default!; private readonly ServiceConfig _configService; private readonly GpsCardAccessorClient _deviceConfigApiClient; @@ -32,12 +36,13 @@ namespace NearCardAttendance.Service.TcpServer.Handler //private readonly TcpClientsManager _managerTcpClients; //private readonly ScheduleResendManager _managerScheduleResend; - public RegisterHandler(ILogger logger, GpsCardAccessorClient deviceConfigApiClient,HttpHelper httpHelper, IOptions configService) + public RegisterHandler(ILogger logger, GpsCardAccessorClient deviceConfigApiClient,HttpHelper httpHelper, IOptions configService, MqProcessLogic serviceMqProcess) { _logger = logger; _httpHelper = httpHelper; _configService = configService.Value; _deviceConfigApiClient= deviceConfigApiClient; + _serviceMqProcess = serviceMqProcess; } public override void ChannelActive(IChannelHandlerContext context) @@ -267,15 +272,43 @@ namespace NearCardAttendance.Service.TcpServer.Handler if (config!=null) { - var url = _configService.XinHuaLeYuUrl; + //var url = _configService.XinHuaLeYuUrl; + var url = $"{_configService.XinHuaLeYuUrl}/user/electronicCardAttendance/receiveTbAttendanceRecord"; var data = new { attendanceStatus = 2, //考勤状态: 0.进 1.出 2.未知 attendanceTime = DateTime.ParseExact(startTime, "yyyyMMddHHmmss", null).ToString("yyyy-MM-dd HH:mm:ss"), imei = config.Imei }; + var eventData = new EventData + { + TopicName = "topics.storage.near_card_attendance", + MessageId = $"{config.Imei}-{parser.SeqNo}-{Guid.NewGuid().ToString("N")[^4..]}", + IMEI = config.Imei, + Content = JsonConvert.SerializeObject(data) + }; var res = await _httpHelper.HttpToPostAsync(url, data); - _logger.LogInformation($"{nameof(HandleSignRecsAsync)} 推送 {JsonConvert.SerializeObject(data)} 结果,{res}"); + + if (!string.IsNullOrEmpty(res)) + { + JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!; + if ((bool)resObj["success"]!) + { + _logger.LogInformation($"{nameof(HandleSignRecsAsync)} 推送 {JsonConvert.SerializeObject(data)} 结果,{res}"); + } + else + { + await _serviceMqProcess.ProcessIMEIEventMessageAsync(eventData); + _logger.LogInformation($"HTTP 响应业务失败 {res},{nameof(HandleSignRecsAsync)} 推送 {JsonConvert.SerializeObject(eventData)}"); + + } + } + else + { + await _serviceMqProcess.ProcessIMEIEventMessageAsync(eventData); + _logger.LogInformation($"HTTP 响应超时,{nameof(HandleSignRecsAsync)} 推送 {JsonConvert.SerializeObject(eventData)}"); + + } } else { @@ -328,15 +361,53 @@ namespace NearCardAttendance.Service.TcpServer.Handler if (config != null) { //var url = "https://midplat.xinhualeyu.com/dev-api/user/electronicCardAttendance/receiveTbAttendanceRecord"; - var url = _configService.XinHuaLeYuUrl; +#if DEBUG + var url = $"{_configService.XinHuaLeYuUrl}/user/electronicCardAttendance/receiveTbAttendanceRecord1"; + +#else + var url = $"{_configService.XinHuaLeYuUrl}/user/electronicCardAttendance/receiveTbAttendanceRecord"; + +#endif var data = new { attendanceStatus = int.TryParse(optType, out int type) ? type : 0, attendanceTime = DateTime.ParseExact(startTime, "yyyyMMddHHmmss", null).ToString("yyyy-MM-dd HH:mm:ss"), imei = config.Imei }; + var eventData = new EventData + { + TopicName = "topics.storage.near_card_attendance", + MessageId = $"{config.Imei}-{parser.SeqNo}-{Guid.NewGuid().ToString("N")[^4..]}", + IMEI = config.Imei, + Content = JsonConvert.SerializeObject(data) + }; + var res = await _httpHelper.HttpToPostAsync(url, data); - _logger.LogInformation($"{nameof(HandleStdtSchoolRecsAsync)} 推送 {JsonConvert.SerializeObject(data)} 结果,{res}"); + + if (!string.IsNullOrEmpty(res)) + { + JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!; + if ((bool)resObj["success"]!) + { + _logger.LogInformation($"{nameof(HandleStdtSchoolRecsAsync)} 推送 {JsonConvert.SerializeObject(data)} 结果,{res}"); + } + else + { + await _serviceMqProcess.ProcessIMEIEventMessageAsync(eventData); + _logger.LogInformation($"HTTP 响应业务失败 {res},{nameof(HandleStdtSchoolRecsAsync)} 推送 {JsonConvert.SerializeObject(eventData)}"); + + } + } + else + { + + await _serviceMqProcess.ProcessIMEIEventMessageAsync(eventData); + _logger.LogInformation($"HTTP 响应超时,{nameof(HandleStdtSchoolRecsAsync)} 推送 {JsonConvert.SerializeObject(eventData)}"); + + } + + + } else { diff --git a/NearCardAttendance.TcpServer/NearCardAttendance.TcpServer.csproj b/NearCardAttendance.TcpServer/NearCardAttendance.TcpServer.csproj index 9c82ad7..56dcac3 100644 --- a/NearCardAttendance.TcpServer/NearCardAttendance.TcpServer.csproj +++ b/NearCardAttendance.TcpServer/NearCardAttendance.TcpServer.csproj @@ -30,6 +30,7 @@ + diff --git a/NearCardAttendance.TcpServer/Program.cs b/NearCardAttendance.TcpServer/Program.cs index 1bde2a4..3a0f974 100644 --- a/NearCardAttendance.TcpServer/Program.cs +++ b/NearCardAttendance.TcpServer/Program.cs @@ -14,6 +14,8 @@ using Serilog; using NearCardAttendance.Model; using TelpoDataService.Util.Clients; using System.Text; +using NearCardAttendance.Service.MessageQueue.Kafka; +using NearCardAttendance.Service.MessageQueue; namespace NearCardAttendance.TcpServer { @@ -110,8 +112,9 @@ namespace NearCardAttendance.TcpServer .AddHostedService() ; #endregion + services.AddSingleton(); + services.AddSingleton(); - }); } } diff --git a/NearCardAttendance.TcpServer/Server.cs b/NearCardAttendance.TcpServer/Server.cs index 3e316b7..38efd67 100644 --- a/NearCardAttendance.TcpServer/Server.cs +++ b/NearCardAttendance.TcpServer/Server.cs @@ -1,13 +1,20 @@ -using DotNetty.Transport.Bootstrapping; +using Confluent.Kafka; +using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NearCardAttendance.Common.helper; +using NearCardAttendance.Model; +using Newtonsoft.Json.Linq; +using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; +using NearCardAttendance.Service.MessageQueue.Model; namespace NearCardAttendance.TcpServer { @@ -19,16 +26,23 @@ namespace NearCardAttendance.TcpServer // private IChannel _serverChannel = default!; private CancellationTokenSource _tokenSource = null!; - + + private readonly ServiceConfig _configService; + private readonly HttpHelper _httpHelper = default!; + private int _messageCount = 0; + + public Server( ILogger logger, ServerBootstrap serverBootstrap, - IServiceProvider serviceProvider) + IServiceProvider serviceProvider, HttpHelper httpHelper, IOptions _optConfigService) { _logger = logger; //_serviceProvider = serviceProvider; + _configService = _optConfigService.Value; _serverBootstrap = serverBootstrap; - + _httpHelper = httpHelper; + } public override Task StartAsync(CancellationToken cancellationToken) { @@ -60,7 +74,51 @@ namespace NearCardAttendance.TcpServer //_logger.LogInformation("DotNetty server started on {0}.", address); _logger.LogInformation("DotNetty server started on {0}.", endPoint); - + + #region kafka + var kafkaConsumer = CreateKafkaConsumer(); + kafkaConsumer.Subscribe("topics.storage.near_card_attendance"); + var tasks = new List(); + List> consumeBatchResult = new List>(); + try + { + while (!stoppingToken.IsCancellationRequested) + { + var consumeResult = kafkaConsumer.Consume(stoppingToken); + if (consumeResult != null) + { + _messageCount++; + consumeBatchResult.Add(consumeResult); + //// 30条消息为一批 +#if DEBUG + if (!await ProcessBatchMessageAsync(consumeBatchResult, kafkaConsumer)) + { // 返回结果错误暂停5分钟 + await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken); + } +#else + if (_messageCount % 30 == 0) + { + if (!await ProcessBatchMessageAsync(consumeBatchResult, kafkaConsumer)) + { // 返回结果错误暂停5分钟 + await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken); + } + } +#endif + + + } + } + } + catch (OperationCanceledException) + { + _logger.LogWarning("Worker exit"); + } + catch (Exception ex) + { + _logger.LogError($"An error occurred: {ex.Message}"); + } + +#endregion // Wait until the service is stopped stoppingToken.WaitHandle.WaitOne(); @@ -71,6 +129,102 @@ namespace NearCardAttendance.TcpServer await _serverChannel.CloseAsync(); _logger.LogInformation("DotNetty server stopped."); } + private async Task ProcessBatchMessageAsync(List> consumeBatchResult, IConsumer kafkaConsumer) + { + try + { + + var url = $"{_configService.XinHuaLeYuUrl}/user/electronicCardAttendance/receiveTbAttendanceRecordException"; + var list = new List(); + //consumeBatchResult.ForEach(x => { + // JObject msg = (JObject)JsonConvert.DeserializeObject(x.Message.Value)!; + // list.Add(new + // { + // attendanceStatus = int.TryParse(msg["data"]!["attendanceStatus"]!.ToString(), out int status) ? status : 0, + // attendanceTime = msg["data"]!["attendanceTime"]!.ToString(), + // imei = msg["data"]!["imei"]!.ToString() + // }) ; + //}); + + consumeBatchResult.ForEach(x => { + EventData msg = JsonConvert.DeserializeObject(x.Message.Value)!; + JObject content = (JObject)JsonConvert.DeserializeObject(msg.Content)!; + list.Add(new + { + attendanceStatus = int.TryParse(content!["attendanceStatus"]!.ToString(), out int status) ? status : 0, + attendanceTime = content!["attendanceTime"]!.ToString(), + imei = content!["imei"]!.ToString() + }); + }); + + + var data = new + { + data = list + }; + var res = await _httpHelper.HttpToPostAsync(url, data); + if (!string.IsNullOrEmpty(res)) + { + JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!; + if ((bool)resObj["success"]!) + { + _logger.LogInformation($"{nameof(ProcessBatchMessageAsync)} 推送 {JsonConvert.SerializeObject(data)} 结果,{res}"); + consumeBatchResult.ForEach(x => + { + kafkaConsumer.Commit(x); + _logger.LogInformation($"完成消费:{JsonConvert.SerializeObject(x.Message.Value)}"); + }); + return true; + } + } + + } + catch (Exception ex) + { + _logger.LogError($"处理消息出错 \n{ex.Message}\n{ex.StackTrace}"); + + } + return false; + + } + + private IConsumer CreateKafkaConsumer() + { + + var consumerConfig = new ConsumerConfig + { + GroupId = "near_card_attendance", + BootstrapServers = _configService.KafkaServerAddress, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false, // 关闭自动提交偏移量 + CancellationDelayMaxMs = 1//set CancellationDelayMaxMs + }; + + return new ConsumerBuilder(consumerConfig) + .SetErrorHandler((_, e) => + { + + //Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); + _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); + }) + .SetPartitionsAssignedHandler((c, partitions) => + { + //// 在这里手动指定要消费的分区 + //var partitionsToConsume = new List + //{ + // new TopicPartitionOffset("topics.storage.near_card_attendance", partitionIndex, Offset.Unset) + //}; + + ////c.Assign(partitionsToConsume); + //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}"); + //return partitionsToConsume; + }) + .SetPartitionsRevokedHandler((c, partitions) => + { + + }) + .Build(); + } } diff --git a/NearCardAttendance.TcpServer/Worker.cs b/NearCardAttendance.TcpServer/Worker.cs new file mode 100644 index 0000000..27cc358 --- /dev/null +++ b/NearCardAttendance.TcpServer/Worker.cs @@ -0,0 +1,288 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NearCardAttendance.Model; +using Newtonsoft.Json; +using Serilog.Context; +using Serilog; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using NearCardAttendance.Common.helper; +using Newtonsoft.Json.Linq; +using NearCardAttendance.Service.MessageQueue.Model; + + +namespace NearCardAttendance.TcpServer +{ + public class Worker : BackgroundService + { + private readonly ILogger _logger; + private readonly ServiceConfig _configService; + private readonly HttpHelper _httpHelper = default!; + private int _messageCount = 0; + + public Worker(ILogger logger, IOptions _optConfigService,HttpHelper httpHelper) + { + _logger = logger; + _configService = _optConfigService.Value; + _httpHelper = httpHelper; + } + /** + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var kafkaConsumer = CreateKafkaConsumer(); + kafkaConsumer.Subscribe("topics.storage.near_card_attendance"); + + // process messages every 50 messages or every minute + // 每隔1分钟或没50条消息就批量处理一次 + var tasks = new List(); + + #region 每分钟触发一次 + var messageCounter = 0; // 消息计数器 + var timer = new System.Timers.Timer(TimeSpan.FromMinutes(1).TotalMilliseconds); // 定时器,每分钟触发一次 + timer.AutoReset = true; + + // 定时器事件处理程序,用于定时处理消息 + timer.Elapsed += async (sender, e) => + { + // 如果计数器大于 0,则处理消息 + if (messageCounter > 0) + { + await ProcessMessagesAsync(tasks); + messageCounter = 0; // 处理完成后重置计数器 + } + }; + + timer.Start(); // 启动定时器 + #endregion + + try + { + while (!stoppingToken.IsCancellationRequested) + { + var consumeResult = kafkaConsumer.Consume(stoppingToken); + if (consumeResult != null) + { + //tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer)); + // 处理消息 + tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer)); + messageCounter++; // 增加消息计数器 + + // 如果消息计数达到 30 条,则处理消息并重置计数器 + if (messageCounter >= 30) + { + await ProcessMessagesAsync(tasks); + messageCounter = 0; // 处理完成后重置计数器 + } + } + } + } + catch (OperationCanceledException) + { + _logger.LogWarning("Worker exit"); + } + catch (Exception ex) + { + _logger.LogError($"An error occurred: {ex.Message}"); + } + + await Task.WhenAll(tasks); + } + + // 异步处理消息的方法 + private async Task ProcessMessagesAsync(List tasks) + { + await Task.WhenAll(tasks); // 等待所有任务完成 + tasks.Clear(); // 清空任务列表 + } + + private async Task ProcessMessageAsync(ConsumeResult consumeResult, IConsumer kafkaConsumer) + { + // 处理消息的逻辑 + if (consumeResult != null) + { + + try + { + var url = _configService.XinHuaLeYuUrl; + var data = new { }; + var res = await _httpHelper.HttpToPostAsync(url, data); + + } + catch (Exception ex) + { + //kafkaConsumer.Commit(consumeResult); + _logger.LogError($"消费出错:{consumeResult.Message.Value},\n{ex.Message}\n{ex.StackTrace}"); + } + } + } + */ + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var kafkaConsumer = CreateKafkaConsumer(); + kafkaConsumer.Subscribe("topics.storage.near_card_attendance"); + var tasks = new List(); + List> consumeBatchResult = new List>(); + try + { + while (!stoppingToken.IsCancellationRequested) + { + var consumeResult = kafkaConsumer.Consume(stoppingToken); + if (consumeResult != null) + { + _messageCount++; + consumeBatchResult.Add(consumeResult); + //// 30条消息为一批 + if (_messageCount % 30 == 0) + { + if (!await ProcessBatchMessageAsync(consumeBatchResult, kafkaConsumer)) + { // 返回结果错误暂停5分钟 + await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken); + } + } + + } + } + } + catch (OperationCanceledException) + { + _logger.LogWarning("Worker exit"); + } + catch (Exception ex) + { + _logger.LogError($"An error occurred: {ex.Message}"); + } + } + private async Task ProcessMessageAsync(ConsumeResult consumeResult, IConsumer kafkaConsumer) + { + try + { + var url = _configService.XinHuaLeYuUrl; + var data = new { }; + var res = await _httpHelper.HttpToPostAsync(url, data); + if (!string.IsNullOrEmpty(res)) + { + JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!; + if ((bool)resObj["success"]!) + { + kafkaConsumer.Commit(consumeResult); + return true; + } + } + + } + catch (Exception ex) + { + //kafkaConsumer.Commit(consumeResult); + _logger.LogError($"消费出错:{consumeResult.Message.Value},\n{ex.Message}\n{ex.StackTrace}"); + + } + return false; + + } + + private async Task ProcessBatchMessageAsync(List> consumeBatchResult, IConsumer kafkaConsumer) + { + try + { + + var url =$"{_configService.XinHuaLeYuUrl}/user/electronicCardAttendance/receiveTbAttendanceRecordException"; + var list = new List(); + //consumeBatchResult.ForEach(x => { + // JObject msg = (JObject)JsonConvert.DeserializeObject(x.Message.Value)!; + // list.Add(new + // { + // attendanceStatus = int.TryParse(msg["data"]!["attendanceStatus"]!.ToString(), out int status) ? status : 0, + // attendanceTime = msg["data"]!["attendanceTime"]!.ToString(), + // imei = msg["data"]!["imei"]!.ToString() + // }) ; + //}); + + consumeBatchResult.ForEach(x => { + EventData msg = JsonConvert.DeserializeObject(x.Message.Value)!; + JObject content = (JObject)JsonConvert.DeserializeObject(msg.Content)!; + list.Add(new + { + attendanceStatus = int.TryParse(content!["attendanceStatus"]!.ToString(), out int status) ? status : 0, + attendanceTime = content!["attendanceTime"]!.ToString(), + imei = content!["imei"]!.ToString() + }); + }); + + + var data = new { + data= list + }; + var res = await _httpHelper.HttpToPostAsync(url, data); + if (!string.IsNullOrEmpty(res)) + { + JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!; + if ((bool)resObj["success"]!) + { + consumeBatchResult.ForEach(x => + { + kafkaConsumer.Commit(x); + }); + return true; + } + } + + } + catch (Exception ex) + { + _logger.LogError($"处理消息出错 \n{ex.Message}\n{ex.StackTrace}"); + + } + return false; + + } + + + /// + /// 创建消费者 + /// + /// + private IConsumer CreateKafkaConsumer() + { + + var consumerConfig = new ConsumerConfig + { + GroupId = "near_card_attendance", + BootstrapServers = _configService.KafkaServerAddress, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false, // 关闭自动提交偏移量 + CancellationDelayMaxMs = 1//set CancellationDelayMaxMs + }; + + return new ConsumerBuilder(consumerConfig) + .SetErrorHandler((_, e) => + { + + //Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); + _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); + }) + .SetPartitionsAssignedHandler((c, partitions) => + { + //// 在这里手动指定要消费的分区 + //var partitionsToConsume = new List + //{ + // new TopicPartitionOffset("topics.storage.near_card_attendance", partitionIndex, Offset.Unset) + //}; + + ////c.Assign(partitionsToConsume); + //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}"); + //return partitionsToConsume; + }) + .SetPartitionsRevokedHandler((c, partitions) => + { + + }) + .Build(); + } + } +} diff --git a/NearCardAttendance.TcpServer/appsettings.debug.json b/NearCardAttendance.TcpServer/appsettings.debug.json index f313f1b..4e970c5 100644 --- a/NearCardAttendance.TcpServer/appsettings.debug.json +++ b/NearCardAttendance.TcpServer/appsettings.debug.json @@ -1,7 +1,8 @@ { "AllowedHosts": "*", "ServiceConfig": { - "XinHuaLeYuUrl": "https://midplat.xinhualeyu.com/dev-api/user/electronicCardAttendance/receiveTbAttendanceRecord", - "TelpoDataUrl": "https://ai.ssjlai.com/data" + "XinHuaLeYuUrl": "https://midplat.xinhualeyu.com/dev-api", + "TelpoDataUrl": "https://ai.ssjlai.com/data", + "KafkaServerAddress": "192.168.2.121:9092" } } diff --git a/NearCardAttendance.TcpServer/appsettings.test.json b/NearCardAttendance.TcpServer/appsettings.test.json index f313f1b..52bd254 100644 --- a/NearCardAttendance.TcpServer/appsettings.test.json +++ b/NearCardAttendance.TcpServer/appsettings.test.json @@ -1,7 +1,8 @@ { "AllowedHosts": "*", "ServiceConfig": { - "XinHuaLeYuUrl": "https://midplat.xinhualeyu.com/dev-api/user/electronicCardAttendance/receiveTbAttendanceRecord", - "TelpoDataUrl": "https://ai.ssjlai.com/data" + "XinHuaLeYuUrl": "https://midplat.xinhualeyu.com/dev-api", + "TelpoDataUrl": "https://ai.ssjlai.com/data", + "KafkaServerAddress": "172.19.42.53:9092" } }