Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

237 lines
9.3KB

  1. using Confluent.Kafka;
  2. using DotNetty.Transport.Bootstrapping;
  3. using DotNetty.Transport.Channels;
  4. using Microsoft.Extensions.Hosting;
  5. using Microsoft.Extensions.Logging;
  6. using Microsoft.Extensions.Options;
  7. using NearCardAttendance.Common.helper;
  8. using NearCardAttendance.Model;
  9. using Newtonsoft.Json.Linq;
  10. using Newtonsoft.Json;
  11. using System;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Net;
  15. using System.Text;
  16. using System.Threading.Tasks;
  17. using NearCardAttendance.Service.MessageQueue.Model;
  18. namespace NearCardAttendance.TcpServer
  19. {
  20. public class Server : BackgroundService
  21. {
  22. private readonly ILogger<Server> _logger;
  23. //private readonly IServiceProvider _serviceProvider;
  24. private readonly ServerBootstrap _serverBootstrap;
  25. // private IChannel _serverChannel = default!;
  26. private CancellationTokenSource _tokenSource = null!;
  27. private readonly ServiceConfig _configService;
  28. private readonly HttpHelper _httpHelper = default!;
  29. //private int _messageCount = 0;
  30. public Server(
  31. ILogger<Server> logger,
  32. ServerBootstrap serverBootstrap,
  33. IServiceProvider serviceProvider, HttpHelper httpHelper, IOptions<ServiceConfig> _optConfigService)
  34. {
  35. _logger = logger;
  36. //_serviceProvider = serviceProvider;
  37. _configService = _optConfigService.Value;
  38. _serverBootstrap = serverBootstrap;
  39. _httpHelper = httpHelper;
  40. }
  41. public override Task StartAsync(CancellationToken cancellationToken)
  42. {
  43. _logger.LogInformation("------StartAsync");
  44. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  45. return base.StartAsync(cancellationToken);
  46. }
  47. public override Task StopAsync(CancellationToken cancellationToken)
  48. {
  49. _logger.LogInformation("------StopAsync");
  50. _tokenSource.Cancel(); //停止工作线程
  51. return base.StopAsync(cancellationToken);
  52. }
  53. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  54. {
  55. _logger.LogInformation("DotNetty server starting...");
  56. //var address = new IPEndPoint(IPAddress.Any, 12345);
  57. //IChannel _serverChannel = await _serverBootstrap.BindAsync(address);
  58. string ipAddress = "0.0.0.0";
  59. int port = 16662;
  60. var endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
  61. IChannel _serverChannel = await _serverBootstrap.BindAsync(endPoint);
  62. // _serverChannel.GetAttribute(MessageIdAttribute.Key).Set("SomeRequestId");
  63. //_logger.LogInformation("DotNetty server started on {0}.", address);
  64. _logger.LogInformation("DotNetty server started on {0}.", endPoint);
  65. #region kafka
  66. var kafkaConsumer = CreateKafkaConsumer();
  67. kafkaConsumer.Subscribe("topics.storage.near_card_attendance");
  68. var tasks = new List<Task>();
  69. List<ConsumeResult<Ignore, string>> consumeBatchResult = new List<ConsumeResult<Ignore, string>>();
  70. try
  71. {
  72. while (!stoppingToken.IsCancellationRequested)
  73. {
  74. var consumeResult = kafkaConsumer.Consume(stoppingToken);
  75. if (consumeResult != null)
  76. {
  77. //_messageCount++;
  78. consumeBatchResult.Add(consumeResult);
  79. #if DEBUG
  80. if (consumeBatchResult.Count % 2 == 0)
  81. {
  82. if (!await ProcessBatchMessageAsync(consumeBatchResult, kafkaConsumer))
  83. { // 返回结果错误暂停5分钟
  84. await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
  85. }
  86. }
  87. //// 30条消息为一批
  88. #else
  89. if (consumeBatchResult.Count % 30 == 0)
  90. {
  91. if (!await ProcessBatchMessageAsync(consumeBatchResult, kafkaConsumer))
  92. { // 返回结果错误暂停5分钟
  93. await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
  94. }
  95. }
  96. #endif
  97. }
  98. }
  99. }
  100. catch (OperationCanceledException)
  101. {
  102. _logger.LogWarning("Worker exit");
  103. }
  104. catch (Exception ex)
  105. {
  106. _logger.LogError($"An error occurred: {ex.Message}");
  107. }
  108. #endregion
  109. // Wait until the service is stopped
  110. stoppingToken.WaitHandle.WaitOne();
  111. _logger.LogInformation("DotNetty server stopping...");
  112. // Close the server channel and release resources
  113. await _serverChannel.CloseAsync();
  114. _logger.LogInformation("DotNetty server stopped.");
  115. }
  116. private async Task<bool> ProcessBatchMessageAsync(List<ConsumeResult<Ignore, string>> consumeBatchResult, IConsumer<Ignore, string> kafkaConsumer)
  117. {
  118. try
  119. {
  120. var url = $"{_configService.XinHuaLeYuUrl}/user/electronicCardAttendance/receiveTbAttendanceRecordException";
  121. var list = new List<object>();
  122. //consumeBatchResult.ForEach(x => {
  123. // JObject msg = (JObject)JsonConvert.DeserializeObject(x.Message.Value)!;
  124. // list.Add(new
  125. // {
  126. // attendanceStatus = int.TryParse(msg["data"]!["attendanceStatus"]!.ToString(), out int status) ? status : 0,
  127. // attendanceTime = msg["data"]!["attendanceTime"]!.ToString(),
  128. // imei = msg["data"]!["imei"]!.ToString()
  129. // }) ;
  130. //});
  131. consumeBatchResult.ForEach(x => {
  132. EventData msg = JsonConvert.DeserializeObject<EventData>(x.Message.Value)!;
  133. JObject content = (JObject)JsonConvert.DeserializeObject(msg.Content)!;
  134. list.Add(new
  135. {
  136. attendanceStatus = int.TryParse(content!["attendanceStatus"]!.ToString(), out int status) ? status : 0,
  137. attendanceTime = content!["attendanceTime"]!.ToString(),
  138. imei = content!["imei"]!.ToString()
  139. });
  140. });
  141. var data = new
  142. {
  143. data = list
  144. };
  145. var res = await _httpHelper.HttpToPostAsync(url, data);
  146. if (!string.IsNullOrEmpty(res))
  147. {
  148. JObject resObj = (JObject)JsonConvert.DeserializeObject(res!)!;
  149. if ((bool)resObj["success"]!)
  150. {
  151. _logger.LogInformation($"{nameof(ProcessBatchMessageAsync)} 推送 {JsonConvert.SerializeObject(data)} 结果,{res}");
  152. consumeBatchResult.ForEach(x =>
  153. {
  154. kafkaConsumer.Commit(x);
  155. _logger.LogInformation($"完成消费:{JsonConvert.SerializeObject(x.Message.Value)}");
  156. });
  157. consumeBatchResult.Clear();
  158. return true;
  159. }
  160. }
  161. }
  162. catch (Exception ex)
  163. {
  164. _logger.LogError($"处理消息出错 \n{ex.Message}\n{ex.StackTrace}");
  165. }
  166. return false;
  167. }
  168. private IConsumer<Ignore, string> CreateKafkaConsumer()
  169. {
  170. var consumerConfig = new ConsumerConfig
  171. {
  172. GroupId = "near_card_attendance",
  173. BootstrapServers = _configService.KafkaServerAddress,
  174. AutoOffsetReset = AutoOffsetReset.Earliest,
  175. EnableAutoCommit = false, // 关闭自动提交偏移量
  176. //CancellationDelayMaxMs = 1//set CancellationDelayMaxMs
  177. };
  178. return new ConsumerBuilder<Ignore, string>(consumerConfig)
  179. .SetErrorHandler((_, e) =>
  180. {
  181. //Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  182. _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  183. })
  184. .SetPartitionsAssignedHandler((c, partitions) =>
  185. {
  186. //// 在这里手动指定要消费的分区
  187. //var partitionsToConsume = new List<TopicPartitionOffset>
  188. //{
  189. // new TopicPartitionOffset("topics.storage.near_card_attendance", partitionIndex, Offset.Unset)
  190. //};
  191. ////c.Assign(partitionsToConsume);
  192. //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}");
  193. //return partitionsToConsume;
  194. })
  195. .SetPartitionsRevokedHandler((c, partitions) =>
  196. {
  197. })
  198. .Build();
  199. }
  200. }
  201. }