|
- using Confluent.Kafka;
- using Microsoft.Extensions.Hosting;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
- using Newtonsoft.Json;
- using TelpoPush.Models.Config;
-
- namespace TelpoPush.Service.Mq.Kafka
- {
- public class KafkaService: IKafkaService
- {
- private readonly ConsumerConfig _consumerConfig;
- private readonly IHostEnvironment env;
- private readonly ILogger<KafkaService> logger;
- private readonly ServiceConfig _configService;
- public KafkaService(ILogger<KafkaService> _logger, IHostEnvironment _env, IOptions<ServiceConfig> optConfigService)
- {
- //config = _configuration;
- _configService = optConfigService.Value;
- env = _env;
- logger = _logger;
-
- _consumerConfig = new ConsumerConfig
- {
- BootstrapServers = _configService.KafkaBootstrapServers,
- SecurityProtocol = SecurityProtocol.SaslSsl,
- SaslMechanism = SaslMechanism.ScramSha256,
- GroupId = _configService.KafkaGroupId,
- SaslUsername = _configService.KafkaSaslUsername,
- SaslPassword = _configService.KafkaSaslPassword,
- SslCaLocation = _configService.KafkaSslCaLocation,
- //SslCaLocation = @"D:\THOMAS\Project\SSJL\C#\Net8\TelpoPushThirdSsl\pem\ca-root.pem",
- EnableAutoCommit = false, // 禁止AutoCommit
- AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的开始消费起
- CancellationDelayMaxMs=1
- };
- }
-
- public async Task SubscribeAsync(Action<string, string, Headers> messageFunc, CancellationToken cancellationToken)
- {
-
- List<string> topics = _configService.KafkaTopics;
-
- using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig)
- .SetErrorHandler((_, e) =>
- {
- logger.LogError($"Error: {e.Reason}");
- Console.WriteLine($"Error: {e.Reason}");
- })
- .SetStatisticsHandler((_, json) =>
- {
- Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
- })
- .SetPartitionsAssignedHandler((c, partitions) =>
- {
- string partitionsStr = string.Join(", ", partitions);
- logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}");
- Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");
- })
- .SetPartitionsRevokedHandler((c, partitions) =>
- {
- string partitionsStr = string.Join(", ", partitions);
- Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
- })
- .Build())
- {
-
- consumer.Subscribe(topics);
- try
- {
- while (true)
- {
- try
- {
-
- var consumeResult = consumer.Consume(cancellationToken);
- int DataType = -1, AlarmType = -1, OperType = -1;
- foreach (var item in consumeResult?.Headers)
- {
- if (item.Key == KafkaHeader.DataType)
- DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
- else if (item.Key == KafkaHeader.AlarmType)
- AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
- else if (item.Key == KafkaHeader.OperType)
- OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
- }
- var Headers = new { DataType, AlarmType, OperType };
- logger.LogInformation($"[{env.EnvironmentName}]Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'.");
- if (consumeResult.IsPartitionEOF)
- {
- Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
- continue;
- }
- // 根据消息内容动态决定topic
- //var dynamicTopic = GetDynamicTopic(consumeResult.Message?.Value);
- //consumer.Subscribe(new List<string>() { dynamicTopic });
-
- string messageResult = null;
- Headers headers = null;
- try
- {
- messageResult = consumeResult.Message.Value;
- headers = consumeResult.Message.Headers;
- }
- catch (Exception ex)
- {
- var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
- Console.WriteLine(errorMessage);
- messageResult = null;
- }
- if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
- {
- string topic = consumeResult.Topic;
- messageFunc(topic, messageResult, headers);
-
- try
- {
- consumer.Commit(consumeResult);
- }
- catch (KafkaException e)
- {
- Console.WriteLine(e.Message);
- }
- }
- }
- catch (ConsumeException e)
- {
- Console.WriteLine($"Consume error: {e.Error.Reason}");
- }
- }
- }
- catch (OperationCanceledException)
- {
- Console.WriteLine("Closing consumer.");
- consumer.Close();
- }
- }
- await Task.CompletedTask;
- }
- }
- }
|