Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Hosting;
  3. using Microsoft.Extensions.Logging;
  4. using Microsoft.Extensions.Options;
  5. using Newtonsoft.Json;
  6. using TelpoPush.Models.Config;
  7. namespace TelpoPush.Service.Mq.Kafka
  8. {
  9. public class KafkaService: IKafkaService
  10. {
  11. private readonly ConsumerConfig _consumerConfig;
  12. private readonly IHostEnvironment env;
  13. private readonly ILogger<KafkaService> logger;
  14. private readonly ServiceConfig _configService;
  15. public KafkaService(ILogger<KafkaService> _logger, IHostEnvironment _env, IOptions<ServiceConfig> optConfigService)
  16. {
  17. //config = _configuration;
  18. _configService = optConfigService.Value;
  19. env = _env;
  20. logger = _logger;
  21. _consumerConfig = new ConsumerConfig
  22. {
  23. BootstrapServers = _configService.KafkaBootstrapServers,
  24. SecurityProtocol = SecurityProtocol.SaslSsl,
  25. SaslMechanism = SaslMechanism.ScramSha256,
  26. GroupId = _configService.KafkaGroupId,
  27. SaslUsername = _configService.KafkaSaslUsername,
  28. SaslPassword = _configService.KafkaSaslPassword,
  29. SslCaLocation = _configService.KafkaSslCaLocation,
  30. //SslCaLocation = @"D:\THOMAS\Project\SSJL\C#\Net8\TelpoPushThirdSsl\pem\ca-root.pem",
  31. EnableAutoCommit = false, // 禁止AutoCommit
  32. AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的开始消费起
  33. CancellationDelayMaxMs=1
  34. };
  35. }
  36. public async Task SubscribeAsync(Action<string, string, Headers> messageFunc, CancellationToken cancellationToken)
  37. {
  38. List<string> topics = _configService.KafkaTopics;
  39. using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig)
  40. .SetErrorHandler((_, e) =>
  41. {
  42. logger.LogError($"Error: {e.Reason}");
  43. Console.WriteLine($"Error: {e.Reason}");
  44. })
  45. .SetStatisticsHandler((_, json) =>
  46. {
  47. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
  48. })
  49. .SetPartitionsAssignedHandler((c, partitions) =>
  50. {
  51. string partitionsStr = string.Join(", ", partitions);
  52. logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}");
  53. Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");
  54. })
  55. .SetPartitionsRevokedHandler((c, partitions) =>
  56. {
  57. string partitionsStr = string.Join(", ", partitions);
  58. Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
  59. })
  60. .Build())
  61. {
  62. consumer.Subscribe(topics);
  63. try
  64. {
  65. while (true)
  66. {
  67. try
  68. {
  69. var consumeResult = consumer.Consume(cancellationToken);
  70. int DataType = -1, AlarmType = -1, OperType = -1;
  71. foreach (var item in consumeResult?.Headers)
  72. {
  73. if (item.Key == KafkaHeader.DataType)
  74. DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  75. else if (item.Key == KafkaHeader.AlarmType)
  76. AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  77. else if (item.Key == KafkaHeader.OperType)
  78. OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  79. }
  80. var Headers = new { DataType, AlarmType, OperType };
  81. logger.LogInformation($"[{env.EnvironmentName}]Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'.");
  82. if (consumeResult.IsPartitionEOF)
  83. {
  84. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
  85. continue;
  86. }
  87. // 根据消息内容动态决定topic
  88. //var dynamicTopic = GetDynamicTopic(consumeResult.Message?.Value);
  89. //consumer.Subscribe(new List<string>() { dynamicTopic });
  90. string messageResult = null;
  91. Headers headers = null;
  92. try
  93. {
  94. messageResult = consumeResult.Message.Value;
  95. headers = consumeResult.Message.Headers;
  96. }
  97. catch (Exception ex)
  98. {
  99. var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
  100. Console.WriteLine(errorMessage);
  101. messageResult = null;
  102. }
  103. if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
  104. {
  105. string topic = consumeResult.Topic;
  106. messageFunc(topic, messageResult, headers);
  107. try
  108. {
  109. consumer.Commit(consumeResult);
  110. }
  111. catch (KafkaException e)
  112. {
  113. Console.WriteLine(e.Message);
  114. }
  115. }
  116. }
  117. catch (ConsumeException e)
  118. {
  119. Console.WriteLine($"Consume error: {e.Error.Reason}");
  120. }
  121. }
  122. }
  123. catch (OperationCanceledException)
  124. {
  125. Console.WriteLine("Closing consumer.");
  126. consumer.Close();
  127. }
  128. }
  129. await Task.CompletedTask;
  130. }
  131. }
  132. }