You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

142 lines
6.2KB

  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Logging;
  3. using Microsoft.Extensions.Options;
  4. using Newtonsoft.Json;
  5. using NearCardAttendance.Common;
  6. using NearCardAttendance.Model;
  7. using NearCardAttendance.Service.MessageQueue.Model;
  8. namespace NearCardAttendance.Service.MessageQueue.Kafka
  9. {
  10. public class KafkaService : IKafkaService
  11. {
  12. private readonly ILogger<KafkaService> _logger;
  13. private readonly ServiceConfig _configService;
  14. public KafkaService(IOptions<ServiceConfig> _optConfigService, ILogger<KafkaService> logger)
  15. {
  16. _configService = _optConfigService.Value;
  17. _logger = logger;
  18. }
  19. public async Task PublishAsync<T>(string topicName, T message) where T : class
  20. {
  21. try
  22. {
  23. Type messageType = typeof(T);
  24. var config = new ProducerConfig
  25. {
  26. BootstrapServers = _configService.KafkaServerAddress,
  27. EnableIdempotence = true,
  28. Acks = Acks.All,
  29. MessageSendMaxRetries = 3
  30. };
  31. if (message.GetType().Equals(typeof(EventData)))
  32. {
  33. using var producer = new ProducerBuilder<string, string>(config).Build();
  34. string imei = messageType.GetProperty("IMEI")!.GetValue(message)!.ToString()!;
  35. //var tailNo = long.Parse(messageType.GetProperty("IMEI")!.GetValue(message)!.ToString()!) % 100;
  36. //int tailNo = SafeType.SafeInt(imei.Substring(imei.Length - 2));
  37. var messageId = messageType.GetProperty("MessageId")!.GetValue(message)!.ToString()!;
  38. //await producer.ProduceAsync(new TopicPartition(topicName, new Partition(tailNo)), new Message<string, string>
  39. //{
  40. // Key = messageId,
  41. // Value = JsonConvert.SerializeObject(message),
  42. //});
  43. await producer.ProduceAsync(topicName, new Message<string, string>
  44. {
  45. Key = messageId,
  46. Value = JsonConvert.SerializeObject(message),
  47. });
  48. // TopicPartition topicPartition = new TopicPartition(topicName, new Partition(tailNo));
  49. }
  50. else
  51. {
  52. using var producer = new ProducerBuilder<string, string>(config).Build();
  53. await producer.ProduceAsync(topicName, new Message<string, string>
  54. {
  55. Key = Guid.NewGuid().ToString(),
  56. Value = JsonConvert.SerializeObject(message)
  57. });
  58. }
  59. }
  60. catch (ProduceException<Null, string> ex)
  61. {
  62. _logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
  63. }
  64. }
  65. public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class
  66. {
  67. var config = new ConsumerConfig
  68. {
  69. BootstrapServers = _configService.KafkaServerAddress,
  70. GroupId = "Consumer",
  71. EnableAutoCommit = false, // 禁止AutoCommit
  72. Acks = Acks.Leader, // 假设只需要Leader响应即可
  73. AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
  74. };
  75. using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
  76. {
  77. consumer.Subscribe(topics);
  78. try
  79. {
  80. while (true)
  81. {
  82. try
  83. {
  84. var consumeResult = consumer.Consume(cancellationToken);
  85. Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
  86. if (consumeResult!.IsPartitionEOF)
  87. {
  88. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
  89. continue;
  90. }
  91. T? messageResult = null;
  92. try
  93. {
  94. messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message!.Value)!;
  95. }
  96. catch (Exception ex)
  97. {
  98. var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message!.Value}】 :{ex.StackTrace?.ToString()}";
  99. Console.WriteLine(errorMessage);
  100. messageResult = null;
  101. }
  102. if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
  103. {
  104. messageFunc(messageResult);
  105. try
  106. {
  107. consumer.Commit(consumeResult);
  108. }
  109. catch (KafkaException e)
  110. {
  111. Console.WriteLine(e.Message);
  112. }
  113. }
  114. }
  115. catch (ConsumeException e)
  116. {
  117. Console.WriteLine($"Consume error: {e.Error.Reason}");
  118. }
  119. }
  120. }
  121. catch (OperationCanceledException)
  122. {
  123. Console.WriteLine("Closing consumer.");
  124. consumer.Close();
  125. }
  126. }
  127. await Task.CompletedTask;
  128. }
  129. }
  130. }