No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

140 líneas
6.8KB

  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Hosting;
  3. using Microsoft.Extensions.Logging;
  4. using Microsoft.Extensions.Options;
  5. using Newtonsoft.Json;
  6. using TelpoPush.Models.Config;
  7. namespace TelpoPush.Service.Mq.Kafka
  8. {
  9. public class KafkaService: IKafkaService
  10. {
  11. private readonly ConsumerConfig _consumerConfig;
  12. private readonly IHostEnvironment env;
  13. private readonly ILogger<KafkaService> logger;
  14. private readonly ServiceConfig _configService;
  15. public KafkaService(ILogger<KafkaService> _logger, IHostEnvironment _env, IOptions<ServiceConfig> optConfigService)
  16. {
  17. //config = _configuration;
  18. _configService = optConfigService.Value;
  19. env = _env;
  20. logger = _logger;
  21. _consumerConfig = new ConsumerConfig
  22. {
  23. BootstrapServers = _configService.KafkaBootstrapServers,
  24. SecurityProtocol = SecurityProtocol.SaslSsl,
  25. SaslMechanism = SaslMechanism.ScramSha256,
  26. GroupId = _configService.KafkaGroupId,
  27. SaslUsername = _configService.KafkaSaslUsername,
  28. SaslPassword = _configService.KafkaSaslPassword,
  29. SslCaLocation = _configService.KafkaSslCaLocation,
  30. //SslCaLocation = @"D:\THOMAS\Project\SSJL\C#\Net8\TelpoPushThirdSsl\pem\ca-root.pem",
  31. EnableAutoCommit = false, // 禁止AutoCommit
  32. AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的开始消费起
  33. CancellationDelayMaxMs=1
  34. };
  35. }
  36. public async Task SubscribeAsync(Action<string, string, Headers> messageFunc, CancellationToken cancellationToken)
  37. {
  38. List<string> topics = _configService.KafkaTopics;
  39. using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig)
  40. .SetErrorHandler((_, e) =>
  41. {
  42. Console.WriteLine($"Error: {e.Reason}");
  43. })
  44. .SetStatisticsHandler((_, json) =>
  45. {
  46. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
  47. })
  48. .SetPartitionsAssignedHandler((c, partitions) =>
  49. {
  50. string partitionsStr = string.Join(", ", partitions);
  51. Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");
  52. })
  53. .SetPartitionsRevokedHandler((c, partitions) =>
  54. {
  55. string partitionsStr = string.Join(", ", partitions);
  56. Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
  57. })
  58. .Build())
  59. {
  60. consumer.Subscribe(topics);
  61. try
  62. {
  63. while (true)
  64. {
  65. try
  66. {
  67. var consumeResult = consumer.Consume(cancellationToken);
  68. int DataType = -1, AlarmType = -1, OperType = -1;
  69. foreach (var item in consumeResult?.Headers)
  70. {
  71. if (item.Key == KafkaHeader.DataType)
  72. DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  73. else if (item.Key == KafkaHeader.AlarmType)
  74. AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  75. else if (item.Key == KafkaHeader.OperType)
  76. OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  77. }
  78. var Headers = new { DataType, AlarmType, OperType };
  79. logger.LogInformation($"Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'.");
  80. if (consumeResult.IsPartitionEOF)
  81. {
  82. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
  83. continue;
  84. }
  85. // 根据消息内容动态决定topic
  86. //var dynamicTopic = GetDynamicTopic(consumeResult.Message?.Value);
  87. //consumer.Subscribe(new List<string>() { dynamicTopic });
  88. string messageResult = null;
  89. Headers headers = null;
  90. try
  91. {
  92. messageResult = consumeResult.Message.Value;
  93. headers = consumeResult.Message.Headers;
  94. }
  95. catch (Exception ex)
  96. {
  97. var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
  98. Console.WriteLine(errorMessage);
  99. messageResult = null;
  100. }
  101. if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
  102. {
  103. string topic = consumeResult.Topic;
  104. messageFunc(topic, messageResult, headers);
  105. try
  106. {
  107. consumer.Commit(consumeResult);
  108. }
  109. catch (KafkaException e)
  110. {
  111. Console.WriteLine(e.Message);
  112. }
  113. }
  114. }
  115. catch (ConsumeException e)
  116. {
  117. Console.WriteLine($"Consume error: {e.Error.Reason}");
  118. }
  119. }
  120. }
  121. catch (OperationCanceledException)
  122. {
  123. Console.WriteLine("Closing consumer.");
  124. consumer.Close();
  125. }
  126. }
  127. await Task.CompletedTask;
  128. }
  129. }
  130. }