Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

139 lines
6.8KB

  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Hosting;
  3. using Microsoft.Extensions.Logging;
  4. using Microsoft.Extensions.Options;
  5. using Newtonsoft.Json;
  6. using TelpoPush.Models.Config;
  7. namespace TelpoPush.Service.Mq.Kafka
  8. {
  9. public class KafkaService: IKafkaService
  10. {
  11. private readonly ConsumerConfig _consumerConfig;
  12. private readonly IHostEnvironment env;
  13. private readonly ILogger<KafkaService> logger;
  14. private readonly ServiceConfig _configService;
  15. public KafkaService(ILogger<KafkaService> _logger, IHostEnvironment _env, IOptions<ServiceConfig> optConfigService)
  16. {
  17. //config = _configuration;
  18. _configService = optConfigService.Value;
  19. env = _env;
  20. logger = _logger;
  21. _consumerConfig = new ConsumerConfig
  22. {
  23. BootstrapServers = _configService.KafkaBootstrapServers,
  24. SecurityProtocol = SecurityProtocol.SaslSsl,
  25. SaslMechanism = SaslMechanism.ScramSha256,
  26. GroupId = _configService.KafkaGroupId,
  27. SaslUsername = _configService.KafkaSaslUsername,
  28. SaslPassword = _configService.KafkaSaslPassword,
  29. SslCaLocation = _configService.KafkaSslCaLocation,
  30. //SslCaLocation = @"D:\THOMAS\Project\SSJL\C#\Net8\TelpoPushThirdSsl\pem\ca-root.pem",
  31. EnableAutoCommit = false, // 禁止AutoCommit
  32. AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的开始消费起
  33. };
  34. }
  35. public async Task SubscribeAsync(Action<string, string, Headers> messageFunc, CancellationToken cancellationToken)
  36. {
  37. List<string> topics = _configService.KafkaTopics;
  38. using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig)
  39. .SetErrorHandler((_, e) =>
  40. {
  41. Console.WriteLine($"Error: {e.Reason}");
  42. })
  43. .SetStatisticsHandler((_, json) =>
  44. {
  45. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
  46. })
  47. .SetPartitionsAssignedHandler((c, partitions) =>
  48. {
  49. string partitionsStr = string.Join(", ", partitions);
  50. Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");
  51. })
  52. .SetPartitionsRevokedHandler((c, partitions) =>
  53. {
  54. string partitionsStr = string.Join(", ", partitions);
  55. Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
  56. })
  57. .Build())
  58. {
  59. consumer.Subscribe(topics);
  60. try
  61. {
  62. while (true)
  63. {
  64. try
  65. {
  66. var consumeResult = consumer.Consume(cancellationToken);
  67. int DataType = -1, AlarmType = -1, OperType = -1;
  68. foreach (var item in consumeResult?.Headers)
  69. {
  70. if (item.Key == KafkaHeader.DataType)
  71. DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  72. else if (item.Key == KafkaHeader.AlarmType)
  73. AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  74. else if (item.Key == KafkaHeader.OperType)
  75. OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  76. }
  77. var Headers = new { DataType, AlarmType, OperType };
  78. logger.LogInformation($"Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'.");
  79. if (consumeResult.IsPartitionEOF)
  80. {
  81. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
  82. continue;
  83. }
  84. // 根据消息内容动态决定topic
  85. //var dynamicTopic = GetDynamicTopic(consumeResult.Message?.Value);
  86. //consumer.Subscribe(new List<string>() { dynamicTopic });
  87. string messageResult = null;
  88. Headers headers = null;
  89. try
  90. {
  91. messageResult = consumeResult.Message.Value;
  92. headers = consumeResult.Message.Headers;
  93. }
  94. catch (Exception ex)
  95. {
  96. var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
  97. Console.WriteLine(errorMessage);
  98. messageResult = null;
  99. }
  100. if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
  101. {
  102. string topic = consumeResult.Topic;
  103. messageFunc(topic, messageResult, headers);
  104. try
  105. {
  106. consumer.Commit(consumeResult);
  107. }
  108. catch (KafkaException e)
  109. {
  110. Console.WriteLine(e.Message);
  111. }
  112. }
  113. }
  114. catch (ConsumeException e)
  115. {
  116. Console.WriteLine($"Consume error: {e.Error.Reason}");
  117. }
  118. }
  119. }
  120. catch (OperationCanceledException)
  121. {
  122. Console.WriteLine("Closing consumer.");
  123. consumer.Close();
  124. }
  125. }
  126. await Task.CompletedTask;
  127. }
  128. }
  129. }