Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

289 lines
11KB

  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Hosting;
  3. using Microsoft.Extensions.Logging;
  4. using Microsoft.Extensions.Options;
  5. using NearCardAttendance.Model;
  6. using Newtonsoft.Json;
  7. using Serilog.Context;
  8. using Serilog;
  9. using System;
  10. using System.Collections.Generic;
  11. using System.Linq;
  12. using System.Text;
  13. using System.Threading.Tasks;
  14. using NearCardAttendance.Common.helper;
  15. using Newtonsoft.Json.Linq;
  16. using NearCardAttendance.Service.MessageQueue.Model;
  17. namespace NearCardAttendance.TcpServer
  18. {
  19. public class Worker : BackgroundService
  20. {
  21. private readonly ILogger<Worker> _logger;
  22. private readonly ServiceConfig _configService;
  23. private readonly HttpHelper _httpHelper = default!;
  24. private int _messageCount = 0;
  25. public Worker(ILogger<Worker> logger, IOptions<ServiceConfig> _optConfigService,HttpHelper httpHelper)
  26. {
  27. _logger = logger;
  28. _configService = _optConfigService.Value;
  29. _httpHelper = httpHelper;
  30. }
  31. /**
  32. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  33. {
  34. var kafkaConsumer = CreateKafkaConsumer();
  35. kafkaConsumer.Subscribe("topics.storage.near_card_attendance");
  36. // process messages every 50 messages or every minute
  37. // 每隔1分钟或没50条消息就批量处理一次
  38. var tasks = new List<Task>();
  39. #region 每分钟触发一次
  40. var messageCounter = 0; // 消息计数器
  41. var timer = new System.Timers.Timer(TimeSpan.FromMinutes(1).TotalMilliseconds); // 定时器,每分钟触发一次
  42. timer.AutoReset = true;
  43. // 定时器事件处理程序,用于定时处理消息
  44. timer.Elapsed += async (sender, e) =>
  45. {
  46. // 如果计数器大于 0,则处理消息
  47. if (messageCounter > 0)
  48. {
  49. await ProcessMessagesAsync(tasks);
  50. messageCounter = 0; // 处理完成后重置计数器
  51. }
  52. };
  53. timer.Start(); // 启动定时器
  54. #endregion
  55. try
  56. {
  57. while (!stoppingToken.IsCancellationRequested)
  58. {
  59. var consumeResult = kafkaConsumer.Consume(stoppingToken);
  60. if (consumeResult != null)
  61. {
  62. //tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
  63. // 处理消息
  64. tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
  65. messageCounter++; // 增加消息计数器
  66. // 如果消息计数达到 30 条,则处理消息并重置计数器
  67. if (messageCounter >= 30)
  68. {
  69. await ProcessMessagesAsync(tasks);
  70. messageCounter = 0; // 处理完成后重置计数器
  71. }
  72. }
  73. }
  74. }
  75. catch (OperationCanceledException)
  76. {
  77. _logger.LogWarning("Worker exit");
  78. }
  79. catch (Exception ex)
  80. {
  81. _logger.LogError($"An error occurred: {ex.Message}");
  82. }
  83. await Task.WhenAll(tasks);
  84. }
  85. // 异步处理消息的方法
  86. private async Task ProcessMessagesAsync(List<Task> tasks)
  87. {
  88. await Task.WhenAll(tasks); // 等待所有任务完成
  89. tasks.Clear(); // 清空任务列表
  90. }
  91. private async Task ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  92. {
  93. // 处理消息的逻辑
  94. if (consumeResult != null)
  95. {
  96. try
  97. {
  98. var url = _configService.XinHuaLeYuUrl;
  99. var data = new { };
  100. var res = await _httpHelper.HttpToPostAsync(url, data);
  101. }
  102. catch (Exception ex)
  103. {
  104. //kafkaConsumer.Commit(consumeResult);
  105. _logger.LogError($"消费出错:{consumeResult.Message.Value},\n{ex.Message}\n{ex.StackTrace}");
  106. }
  107. }
  108. }
  109. */
  110. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  111. {
  112. var kafkaConsumer = CreateKafkaConsumer();
  113. kafkaConsumer.Subscribe("topics.storage.near_card_attendance");
  114. var tasks = new List<Task>();
  115. List<ConsumeResult<Ignore, string>> consumeBatchResult = new List<ConsumeResult<Ignore, string>>();
  116. try
  117. {
  118. while (!stoppingToken.IsCancellationRequested)
  119. {
  120. var consumeResult = kafkaConsumer.Consume(stoppingToken);
  121. if (consumeResult != null)
  122. {
  123. _messageCount++;
  124. consumeBatchResult.Add(consumeResult);
  125. //// 30条消息为一批
  126. if (_messageCount % 30 == 0)
  127. {
  128. if (!await ProcessBatchMessageAsync(consumeBatchResult, kafkaConsumer))
  129. { // 返回结果错误暂停5分钟
  130. await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
  131. }
  132. }
  133. }
  134. }
  135. }
  136. catch (OperationCanceledException)
  137. {
  138. _logger.LogWarning("Worker exit");
  139. }
  140. catch (Exception ex)
  141. {
  142. _logger.LogError($"An error occurred: {ex.Message}");
  143. }
  144. }
  145. private async Task<bool> ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  146. {
  147. try
  148. {
  149. var url = _configService.XinHuaLeYuUrl;
  150. var data = new { };
  151. var res = await _httpHelper.HttpToPostAsync(url, data);
  152. if (!string.IsNullOrEmpty(res))
  153. {
  154. JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!;
  155. if ((bool)resObj["success"]!)
  156. {
  157. kafkaConsumer.Commit(consumeResult);
  158. return true;
  159. }
  160. }
  161. }
  162. catch (Exception ex)
  163. {
  164. //kafkaConsumer.Commit(consumeResult);
  165. _logger.LogError($"消费出错:{consumeResult.Message.Value},\n{ex.Message}\n{ex.StackTrace}");
  166. }
  167. return false;
  168. }
  169. private async Task<bool> ProcessBatchMessageAsync(List<ConsumeResult<Ignore, string>> consumeBatchResult, IConsumer<Ignore, string> kafkaConsumer)
  170. {
  171. try
  172. {
  173. var url =$"{_configService.XinHuaLeYuUrl}/user/electronicCardAttendance/receiveTbAttendanceRecordException";
  174. var list = new List<object>();
  175. //consumeBatchResult.ForEach(x => {
  176. // JObject msg = (JObject)JsonConvert.DeserializeObject(x.Message.Value)!;
  177. // list.Add(new
  178. // {
  179. // attendanceStatus = int.TryParse(msg["data"]!["attendanceStatus"]!.ToString(), out int status) ? status : 0,
  180. // attendanceTime = msg["data"]!["attendanceTime"]!.ToString(),
  181. // imei = msg["data"]!["imei"]!.ToString()
  182. // }) ;
  183. //});
  184. consumeBatchResult.ForEach(x => {
  185. EventData msg = JsonConvert.DeserializeObject<EventData>(x.Message.Value)!;
  186. JObject content = (JObject)JsonConvert.DeserializeObject(msg.Content)!;
  187. list.Add(new
  188. {
  189. attendanceStatus = int.TryParse(content!["attendanceStatus"]!.ToString(), out int status) ? status : 0,
  190. attendanceTime = content!["attendanceTime"]!.ToString(),
  191. imei = content!["imei"]!.ToString()
  192. });
  193. });
  194. var data = new {
  195. data= list
  196. };
  197. var res = await _httpHelper.HttpToPostAsync(url, data);
  198. if (!string.IsNullOrEmpty(res))
  199. {
  200. JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!;
  201. if ((bool)resObj["success"]!)
  202. {
  203. consumeBatchResult.ForEach(x =>
  204. {
  205. kafkaConsumer.Commit(x);
  206. });
  207. return true;
  208. }
  209. }
  210. }
  211. catch (Exception ex)
  212. {
  213. _logger.LogError($"处理消息出错 \n{ex.Message}\n{ex.StackTrace}");
  214. }
  215. return false;
  216. }
  217. /// <summary>
  218. /// 创建消费者
  219. /// </summary>
  220. /// <returns></returns>
  221. private IConsumer<Ignore, string> CreateKafkaConsumer()
  222. {
  223. var consumerConfig = new ConsumerConfig
  224. {
  225. GroupId = "near_card_attendance",
  226. BootstrapServers = _configService.KafkaServerAddress,
  227. AutoOffsetReset = AutoOffsetReset.Earliest,
  228. EnableAutoCommit = false, // 关闭自动提交偏移量
  229. CancellationDelayMaxMs = 1//set CancellationDelayMaxMs
  230. };
  231. return new ConsumerBuilder<Ignore, string>(consumerConfig)
  232. .SetErrorHandler((_, e) =>
  233. {
  234. //Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  235. _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  236. })
  237. .SetPartitionsAssignedHandler((c, partitions) =>
  238. {
  239. //// 在这里手动指定要消费的分区
  240. //var partitionsToConsume = new List<TopicPartitionOffset>
  241. //{
  242. // new TopicPartitionOffset("topics.storage.near_card_attendance", partitionIndex, Offset.Unset)
  243. //};
  244. ////c.Assign(partitionsToConsume);
  245. //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}");
  246. //return partitionsToConsume;
  247. })
  248. .SetPartitionsRevokedHandler((c, partitions) =>
  249. {
  250. })
  251. .Build();
  252. }
  253. }
  254. }