From 9a54ea05378cb1de4c95ce526e9258a444243e11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E9=9B=B7?= <284428564@QQ.com> Date: Mon, 20 May 2024 17:02:38 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TelpoPush.Service/Mq/Kafka/KafkaService.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/TelpoPush.Service/Mq/Kafka/KafkaService.cs b/TelpoPush.Service/Mq/Kafka/KafkaService.cs index d2b5345..b52d8a8 100644 --- a/TelpoPush.Service/Mq/Kafka/KafkaService.cs +++ b/TelpoPush.Service/Mq/Kafka/KafkaService.cs @@ -44,6 +44,7 @@ namespace TelpoPush.Service.Mq.Kafka using (var consumer = new ConsumerBuilder(_consumerConfig) .SetErrorHandler((_, e) => { + logger.LogError($"Error: {e.Reason}"); Console.WriteLine($"Error: {e.Reason}"); }) .SetStatisticsHandler((_, json) => @@ -53,6 +54,7 @@ namespace TelpoPush.Service.Mq.Kafka .SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); + logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}"); Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}"); }) .SetPartitionsRevokedHandler((c, partitions) => @@ -83,7 +85,7 @@ namespace TelpoPush.Service.Mq.Kafka OperType = BitConverter.ToInt32(item.GetValueBytes(), 0); } var Headers = new { DataType, AlarmType, OperType }; - logger.LogInformation($"Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'."); + logger.LogInformation($"[{env.EnvironmentName}]Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'."); if (consumeResult.IsPartitionEOF) { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");