You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123 lines
5.0KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Model.Config;
  3. using Microsoft.EntityFrameworkCore.Diagnostics;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Options;
  6. using Newtonsoft.Json;
  7. using Confluent.Kafka;
  8. using System;
  9. using System.Collections.Concurrent;
  10. using System.Collections.Generic;
  11. using System.DirectoryServices.Protocols;
  12. using System.Linq;
  13. using System.Text;
  14. using System.Threading.Tasks;
  15. namespace HealthMonitor.Service.MessageQueue.Kafka
  16. {
  17. public class KafkaService : IKafkaService
  18. {
  19. private readonly ILogger<KafkaService> _logger;
  20. private readonly ServiceConfig _configService;
  21. public KafkaService(IOptions<ServiceConfig> _optConfigService, ILogger<KafkaService> logger)
  22. {
  23. _configService = _optConfigService.Value;
  24. _logger = logger;
  25. }
  26. public async Task PublishAsync<T>(string topicName, T message) where T : class
  27. {
  28. try
  29. {
  30. Type messageType = typeof(T);
  31. var config = new ProducerConfig
  32. {
  33. BootstrapServers = _configService.KafkaServerAddress,
  34. EnableIdempotence = true,
  35. Acks = Acks.All,
  36. MessageSendMaxRetries = 3
  37. };
  38. using var producer = new ProducerBuilder<string, string>(config).Build();
  39. await producer.ProduceAsync(topicName, new Message<string, string>
  40. {
  41. Key = Guid.NewGuid().ToString(),
  42. Value = JsonConvert.SerializeObject(message)
  43. });
  44. }
  45. catch (ProduceException<Null, string> ex)
  46. {
  47. _logger.LogError($"推送到kafka失败,topic: {topicName},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
  48. }
  49. }
  50. public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class
  51. {
  52. var config = new ConsumerConfig
  53. {
  54. BootstrapServers = _configService.KafkaServerAddress,
  55. GroupId = "Consumer",
  56. EnableAutoCommit = false, // 禁止AutoCommit
  57. Acks = Acks.Leader, // 假设只需要Leader响应即可
  58. AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
  59. };
  60. using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
  61. {
  62. consumer.Subscribe(topics);
  63. try
  64. {
  65. while (true)
  66. {
  67. try
  68. {
  69. var consumeResult = consumer.Consume(cancellationToken);
  70. Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
  71. if (consumeResult!.IsPartitionEOF)
  72. {
  73. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
  74. continue;
  75. }
  76. T? messageResult = null;
  77. try
  78. {
  79. messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message!.Value)!;
  80. }
  81. catch (Exception ex)
  82. {
  83. var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message!.Value}】 :{ex.StackTrace?.ToString()}";
  84. Console.WriteLine(errorMessage);
  85. messageResult = null;
  86. }
  87. if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
  88. {
  89. messageFunc(messageResult);
  90. try
  91. {
  92. consumer.Commit(consumeResult);
  93. }
  94. catch (KafkaException e)
  95. {
  96. Console.WriteLine(e.Message);
  97. }
  98. }
  99. }
  100. catch (ConsumeException e)
  101. {
  102. Console.WriteLine($"Consume error: {e.Error.Reason}");
  103. }
  104. }
  105. }
  106. catch (OperationCanceledException)
  107. {
  108. Console.WriteLine("Closing consumer.");
  109. consumer.Close();
  110. }
  111. }
  112. await Task.CompletedTask;
  113. }
  114. }
  115. }