定位推送服务
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
6.9KB

  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Options;
  3. using Newtonsoft.Json;
  4. using TelpoPush.Position.Worker.Models.Config;
  5. namespace TelpoPush.Position.Worker.Service.Mq
  6. {
  7. public class KafkaService : IKafkaService
  8. {
  9. private readonly ConsumerConfig _consumerConfig;
  10. private readonly IHostEnvironment env;
  11. private readonly ILogger<KafkaService> logger;
  12. private readonly ServiceConfig _configService;
  13. public KafkaService(ILogger<KafkaService> _logger, IHostEnvironment _env, IOptions<ServiceConfig> optConfigService)
  14. {
  15. _configService = optConfigService.Value;
  16. env = _env;
  17. logger = _logger;
  18. _consumerConfig = new ConsumerConfig
  19. {
  20. BootstrapServers = _configService.KafkaBootstrapServers,
  21. GroupId = _configService.KafkaGroupId,
  22. EnableAutoCommit = false,
  23. StatisticsIntervalMs = 5000,
  24. SessionTimeoutMs = 6000,
  25. AutoOffsetReset = AutoOffsetReset.Earliest,
  26. EnablePartitionEof = true,
  27. CancellationDelayMaxMs=1
  28. };
  29. }
  30. public async Task SubscribeAsync(Action<string, string, Headers> messageFunc, CancellationToken cancellationToken)
  31. {
  32. List<string> topics = _configService.KafkaTopics;
  33. using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig)
  34. .SetErrorHandler((_, e) =>
  35. {
  36. Console.WriteLine($"Error: {e.Reason}");
  37. logger.LogError($"Error: {e.Reason}");
  38. })
  39. .SetStatisticsHandler((_, json) =>
  40. {
  41. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
  42. logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
  43. })
  44. .SetPartitionsAssignedHandler((c, partitions) =>
  45. {
  46. string partitionsStr = string.Join(", ", partitions);
  47. logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}");
  48. })
  49. .SetPartitionsRevokedHandler((c, partitions) =>
  50. {
  51. string partitionsStr = string.Join(", ", partitions);
  52. Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
  53. logger.LogInformation($" - 回收了 kafka 分区: {partitionsStr}");
  54. })
  55. .Build())
  56. {
  57. consumer.Subscribe(topics);
  58. try
  59. {
  60. while (true)
  61. {
  62. try
  63. {
  64. var consumeResult = consumer.Consume(cancellationToken);
  65. int DataType = -1, AlarmType = -1, OperType = -1;
  66. foreach (var item in consumeResult?.Headers)
  67. {
  68. if (item.Key == KafkaHeader.DataType)
  69. DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  70. else if (item.Key == KafkaHeader.AlarmType)
  71. AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  72. else if (item.Key == KafkaHeader.OperType)
  73. OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  74. }
  75. var Headers = new { DataType, AlarmType, OperType };
  76. logger.LogInformation($"Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'.");
  77. if (consumeResult.IsPartitionEOF)
  78. {
  79. logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
  80. Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
  81. continue;
  82. }
  83. string messageResult = null;
  84. Headers headers = null;
  85. try
  86. {
  87. messageResult = consumeResult.Message.Value;
  88. headers = consumeResult.Message.Headers;
  89. }
  90. catch (Exception ex)
  91. {
  92. var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
  93. Console.WriteLine(errorMessage);
  94. logger.LogError(errorMessage);
  95. messageResult = null;
  96. }
  97. if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
  98. {
  99. string topic = consumeResult.Topic;
  100. messageFunc(topic, messageResult, headers);
  101. try
  102. {
  103. consumer.Commit(consumeResult);
  104. }
  105. catch (KafkaException e)
  106. {
  107. Console.WriteLine(e.Message);
  108. }
  109. }
  110. }
  111. catch (ConsumeException e)
  112. {
  113. logger.LogError($"Consume error: {e.Error.Reason}");
  114. Console.WriteLine($"Consume error: {e.Error.Reason}");
  115. }
  116. }
  117. }
  118. catch (OperationCanceledException)
  119. {
  120. logger.LogError("Closing consumer.");
  121. Console.WriteLine("Closing consumer.");
  122. consumer.Close();
  123. }
  124. }
  125. await Task.CompletedTask;
  126. }
  127. }
  128. }