万佳安设备数据
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
7.9KB

  1. using Confluent.Kafka;
  2. using Microsoft.Extensions.Options;
  3. using Newtonsoft.Json;
  4. using TelpoPush.WanJiaAn.Worker.Models.Config;
  5. namespace TelpoPush.WanJiaAn.Worker.Service.Mq
  6. {
  7. public class KafkaService : IKafkaService
  8. {
  9. private readonly ConsumerConfig _consumerConfig;
  10. private readonly IHostEnvironment env;
  11. private readonly ILogger<KafkaService> logger;
  12. private readonly ServiceConfig _configService;
  13. public KafkaService(ILogger<KafkaService> _logger, IHostEnvironment _env, IOptions<ServiceConfig> optConfigService)
  14. {
  15. _configService = optConfigService.Value;
  16. env = _env;
  17. logger = _logger;
  18. //_consumerConfig = new ConsumerConfig
  19. //{
  20. // BootstrapServers = _configService.KafkaBootstrapServers,
  21. // GroupId = _configService.KafkaGroupId,
  22. // EnableAutoCommit = false, // 禁止AutoCommit
  23. // Acks = Acks.Leader, // 假设只需要Leader响应即可
  24. // AutoOffsetReset = AutoOffsetReset.Earliest,// 从最早的开始消费起
  25. // CancellationDelayMaxMs = 1//set CancellationDelayMaxMs
  26. //};
  27. _consumerConfig = new ConsumerConfig
  28. {
  29. BootstrapServers = _configService.KafkaBootstrapServers,
  30. SecurityProtocol = SecurityProtocol.SaslPlaintext,
  31. SaslMechanism = SaslMechanism.Plain,
  32. GroupId = _configService.KafkaGroupId,
  33. SaslUsername = _configService.KafkaUserName,
  34. SaslPassword = _configService.KafkaPassword,
  35. EnableAutoCommit = false, // 禁止AutoCommit
  36. AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的开始消费起
  37. CancellationDelayMaxMs = 1
  38. };
  39. }
  40. public async Task SubscribeAsync(Action<string, string, Headers> messageFunc, CancellationToken cancellationToken)
  41. {
  42. List<string> topics = _configService.KafkaTopics;
  43. using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig)
  44. .SetErrorHandler((_, e) =>
  45. {
  46. logger.LogError($"Error: {e.Reason}");
  47. })
  48. .SetStatisticsHandler((_, json) =>
  49. {
  50. logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
  51. })
  52. .SetPartitionsAssignedHandler((c, partitions) =>
  53. {
  54. string partitionsStr = string.Join(", ", partitions);
  55. logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}");
  56. })
  57. .SetPartitionsRevokedHandler((c, partitions) =>
  58. {
  59. string partitionsStr = string.Join(", ", partitions);
  60. logger.LogInformation($" - 回收了 kafka 分区: {partitionsStr}");
  61. })
  62. .Build())
  63. {
  64. consumer.Subscribe(topics);
  65. try
  66. {
  67. while (true)
  68. {
  69. try
  70. {
  71. var consumeResult = consumer.Consume(cancellationToken);
  72. string topic = consumeResult.Topic;
  73. string messageResult = consumeResult.Message.Value;
  74. Headers headers = consumeResult.Message.Headers;
  75. bool isPartitionEOF = consumeResult.IsPartitionEOF;
  76. var partition = consumeResult.Partition;
  77. int DataType = -1, AlarmType = -1, OperType = -1;
  78. foreach (var item in headers)
  79. {
  80. if (item.Key == KafkaHeader.DataType)
  81. DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  82. else if (item.Key == KafkaHeader.AlarmType)
  83. AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  84. else if (item.Key == KafkaHeader.OperType)
  85. OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
  86. }
  87. var Headers = new { DataType, AlarmType, OperType };
  88. logger.LogInformation($"Consumed topic '{topic}' , message '{messageResult}' , headers '{JsonConvert.SerializeObject(Headers)}', at '{consumeResult?.TopicPartitionOffset}'.");
  89. if (isPartitionEOF)
  90. {
  91. logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{topic}, partition {partition}, offset {consumeResult?.Offset}.");
  92. continue;
  93. }
  94. if (!string.IsNullOrEmpty(messageResult))
  95. {
  96. messageFunc(topic, messageResult, headers);
  97. try
  98. {
  99. consumer.Commit(consumeResult);
  100. }
  101. catch (KafkaException e)
  102. {
  103. logger.LogError($" - {e.Message}.");
  104. }
  105. }
  106. #region 注释
  107. //string messageResult = null;
  108. //Headers headers = null;
  109. //try
  110. //{
  111. // messageResult = consumeResult.Message.Value;
  112. // headers = consumeResult.Message.Headers;
  113. //}
  114. //catch (Exception ex)
  115. //{
  116. // var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
  117. // Console.WriteLine(errorMessage);
  118. // logger.LogError(errorMessage);
  119. // messageResult = null;
  120. //}
  121. //if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
  122. //{
  123. // string topic = consumeResult.Topic;
  124. // messageFunc(topic, messageResult, headers);
  125. // //try
  126. // //{
  127. // // consumer.Commit(consumeResult);
  128. // //}
  129. // //catch (KafkaException e)
  130. // //{
  131. // // Console.WriteLine(e.Message);
  132. // //}
  133. //}
  134. #endregion
  135. }
  136. catch (ConsumeException e)
  137. {
  138. logger.LogError($"Consume error: {e.Error.Reason}");
  139. }
  140. }
  141. }
  142. catch (OperationCanceledException)
  143. {
  144. logger.LogError("Closing consumer.");
  145. consumer.Close();
  146. }
  147. }
  148. await Task.CompletedTask;
  149. }
  150. }
  151. }