定位推送服务
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

158 lines
7.8KB

  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Options;
  3. using Newtonsoft.Json;
  4. using TelpoPush.Position.Worker.Models.Config;
  5. namespace TelpoPush.Position.Worker.Service.Mq
  6. {
  7. public class KafkaService : IKafkaService
  8. {
  9. private readonly ConsumerConfig _consumerConfig;
  10. private readonly IHostEnvironment env;
  11. private readonly ILogger<KafkaService> logger;
  12. private readonly ServiceConfig _configService;
  13. public KafkaService(ILogger<KafkaService> _logger, IHostEnvironment _env, IOptions<ServiceConfig> optConfigService)
  14. {
  15. _configService = optConfigService.Value;
  16. env = _env;
  17. logger = _logger;
  18. _consumerConfig = new ConsumerConfig
  19. {
  20. BootstrapServers = _configService.KafkaBootstrapServers,
  21. GroupId = _configService.KafkaGroupId,
  22. EnableAutoCommit = false, // 禁止AutoCommit
  23. Acks = Acks.Leader, // 假设只需要Leader响应即可
  24. AutoOffsetReset = AutoOffsetReset.Earliest,// 从最早的开始消费起
  25. CancellationDelayMaxMs = 1//set CancellationDelayMaxMs
  26. };
  27. //_consumerConfig = new ConsumerConfig
  28. //{
  29. // BootstrapServers = _configService.KafkaBootstrapServers,
  30. // GroupId = _configService.KafkaGroupId,
  31. // EnableAutoCommit = false,
  32. // StatisticsIntervalMs = 5000,
  33. // SessionTimeoutMs = 6000,
  34. // AutoOffsetReset = AutoOffsetReset.Earliest,
  35. // EnablePartitionEof = true,
  36. // CancellationDelayMaxMs=1
  37. //};
  38. }
  39. public async Task SubscribeAsync(Action<string, string, Headers> messageFunc, CancellationToken cancellationToken)
  40. {
  41. List<string> topics = _configService.KafkaTopics;
  42. using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig)
  43. .SetErrorHandler((_, e) =>
  44. {
  45. logger.LogError($"Error: {e.Reason}");
  46. })
  47. .SetStatisticsHandler((_, json) =>
  48. {
  49. logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
  50. })
  51. .SetPartitionsAssignedHandler((c, partitions) =>
  52. {
  53. string partitionsStr = string.Join(", ", partitions);
  54. logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}");
  55. })
  56. .SetPartitionsRevokedHandler((c, partitions) =>
  57. {
  58. string partitionsStr = string.Join(", ", partitions);
  59. logger.LogInformation($" - 回收了 kafka 分区: {partitionsStr}");
  60. })
  61. .Build())
  62. {
  63. consumer.Subscribe(topics);
  64. try
  65. {
  66. while (true)
  67. {
  68. try
  69. {
  70. var consumeResult = consumer.Consume(cancellationToken);
  71. string topic = consumeResult.Topic;
  72. string messageResult = consumeResult.Message.Value;
  73. Headers headers = consumeResult.Message.Headers;
  74. bool isPartitionEOF = consumeResult.IsPartitionEOF;
  75. var partition = consumeResult.Partition;
  76. int DataType = -1, AlarmType = -1, OperType = -1;
  77. foreach (var item in headers)
  78. {
  79. if (item.Key == KafkaHeader.DataType)
  80. DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  81. else if (item.Key == KafkaHeader.AlarmType)
  82. AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  83. else if (item.Key == KafkaHeader.OperType)
  84. OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  85. }
  86. var Headers = new { DataType, AlarmType, OperType };
  87. logger.LogInformation($"Consumed topic '{topic}' , message '{messageResult}' , headers '{JsonConvert.SerializeObject(Headers)}', at '{consumeResult?.TopicPartitionOffset}'.");
  88. if (isPartitionEOF)
  89. {
  90. logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{topic}, partition {partition}, offset {consumeResult?.Offset}.");
  91. continue;
  92. }
  93. if (!string.IsNullOrEmpty(messageResult))
  94. {
  95. messageFunc(topic, messageResult, headers);
  96. try
  97. {
  98. consumer.Commit(consumeResult);
  99. }
  100. catch (KafkaException e)
  101. {
  102. logger.LogError($" - {e.Message}.");
  103. }
  104. }
  105. #region 注释
  106. //string messageResult = null;
  107. //Headers headers = null;
  108. //try
  109. //{
  110. // messageResult = consumeResult.Message.Value;
  111. // headers = consumeResult.Message.Headers;
  112. //}
  113. //catch (Exception ex)
  114. //{
  115. // var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
  116. // Console.WriteLine(errorMessage);
  117. // logger.LogError(errorMessage);
  118. // messageResult = null;
  119. //}
  120. //if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
  121. //{
  122. // string topic = consumeResult.Topic;
  123. // messageFunc(topic, messageResult, headers);
  124. // //try
  125. // //{
  126. // // consumer.Commit(consumeResult);
  127. // //}
  128. // //catch (KafkaException e)
  129. // //{
  130. // // Console.WriteLine(e.Message);
  131. // //}
  132. //}
  133. #endregion
  134. }
  135. catch (ConsumeException e)
  136. {
  137. logger.LogError($"Consume error: {e.Error.Reason}");
  138. }
  139. }
  140. }
  141. catch (OperationCanceledException)
  142. {
  143. logger.LogError("Closing consumer.");
  144. consumer.Close();
  145. }
  146. }
  147. await Task.CompletedTask;
  148. }
  149. }
  150. }