選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

KafkaService.cs 7.0KB

6ヶ月前
6ヶ月前
6ヶ月前
6ヶ月前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Hosting;
  3. using Microsoft.Extensions.Logging;
  4. using Microsoft.Extensions.Options;
  5. using Newtonsoft.Json;
  6. using TelpoPush.Models.Config;
  7. namespace TelpoPush.Service.Mq.Kafka
  8. {
  9. public class KafkaService: IKafkaService
  10. {
  11. private readonly ConsumerConfig _consumerConfig;
  12. private readonly IHostEnvironment env;
  13. private readonly ILogger<KafkaService> logger;
  14. private readonly ServiceConfig _configService;
  15. public KafkaService(ILogger<KafkaService> _logger, IHostEnvironment _env, IOptions<ServiceConfig> optConfigService)
  16. {
  17. //config = _configuration;
  18. _configService = optConfigService.Value;
  19. env = _env;
  20. logger = _logger;
  21. _consumerConfig = new ConsumerConfig
  22. {
  23. BootstrapServers = _configService.KafkaBootstrapServers,
  24. SecurityProtocol = SecurityProtocol.SaslSsl,
  25. SaslMechanism = SaslMechanism.ScramSha256,
  26. GroupId = _configService.KafkaGroupId,
  27. SaslUsername = _configService.KafkaSaslUsername,
  28. SaslPassword = _configService.KafkaSaslPassword,
  29. SslCaLocation = _configService.KafkaSslCaLocation,
  30. //SslCaLocation = @"D:\THOMAS\Project\SSJL\C#\Net8\TelpoPushThirdSsl\pem\ca-root.pem",
  31. EnableAutoCommit = false, // 禁止AutoCommit
  32. AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的开始消费起
  33. CancellationDelayMaxMs=1
  34. };
  35. }
  36. public async Task SubscribeAsync(Action<string, string, Headers> messageFunc, CancellationToken cancellationToken)
  37. {
  38. List<string> topics = _configService.KafkaTopics;
  39. using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig)
  40. .SetErrorHandler((_, e) =>
  41. {
  42. logger.LogError($"Error: {e.Reason}");
  43. Console.WriteLine($"Error: {e.Reason}");
  44. })
  45. .SetStatisticsHandler((_, json) =>
  46. {
  47. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
  48. })
  49. .SetPartitionsAssignedHandler((c, partitions) =>
  50. {
  51. string partitionsStr = string.Join(", ", partitions);
  52. logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}");
  53. Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");
  54. })
  55. .SetPartitionsRevokedHandler((c, partitions) =>
  56. {
  57. string partitionsStr = string.Join(", ", partitions);
  58. Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
  59. })
  60. .Build())
  61. {
  62. consumer.Subscribe(topics);
  63. try
  64. {
  65. while (true)
  66. {
  67. try
  68. {
  69. var consumeResult = consumer.Consume(cancellationToken);
  70. int DataType = -1, AlarmType = -1, OperType = -1;
  71. foreach (var item in consumeResult?.Headers)
  72. {
  73. if (item.Key == KafkaHeader.DataType)
  74. DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  75. else if (item.Key == KafkaHeader.AlarmType)
  76. AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  77. else if (item.Key == KafkaHeader.OperType)
  78. OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  79. }
  80. var Headers = new { DataType, AlarmType, OperType };
  81. logger.LogInformation($"[{env.EnvironmentName}]Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'.");
  82. if (consumeResult.IsPartitionEOF)
  83. {
  84. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
  85. continue;
  86. }
  87. // 根据消息内容动态决定topic
  88. //var dynamicTopic = GetDynamicTopic(consumeResult.Message?.Value);
  89. //consumer.Subscribe(new List<string>() { dynamicTopic });
  90. string messageResult = null;
  91. Headers headers = null;
  92. try
  93. {
  94. messageResult = consumeResult.Message.Value;
  95. headers = consumeResult.Message.Headers;
  96. }
  97. catch (Exception ex)
  98. {
  99. var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
  100. Console.WriteLine(errorMessage);
  101. messageResult = null;
  102. }
  103. if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
  104. {
  105. string topic = consumeResult.Topic;
  106. messageFunc(topic, messageResult, headers);
  107. try
  108. {
  109. consumer.Commit(consumeResult);
  110. }
  111. catch (KafkaException e)
  112. {
  113. Console.WriteLine(e.Message);
  114. }
  115. }
  116. }
  117. catch (ConsumeException e)
  118. {
  119. Console.WriteLine($"Consume error: {e.Error.Reason}");
  120. }
  121. }
  122. }
  123. catch (OperationCanceledException)
  124. {
  125. Console.WriteLine("Closing consumer.");
  126. consumer.Close();
  127. }
  128. }
  129. await Task.CompletedTask;
  130. }
  131. }
  132. }