diff --git a/TelpoPush.Service/Mq/Kafka/KafkaService.cs b/TelpoPush.Service/Mq/Kafka/KafkaService.cs index d2b5345..b52d8a8 100644 --- a/TelpoPush.Service/Mq/Kafka/KafkaService.cs +++ b/TelpoPush.Service/Mq/Kafka/KafkaService.cs @@ -44,6 +44,7 @@ namespace TelpoPush.Service.Mq.Kafka using (var consumer = new ConsumerBuilder(_consumerConfig) .SetErrorHandler((_, e) => { + logger.LogError($"Error: {e.Reason}"); Console.WriteLine($"Error: {e.Reason}"); }) .SetStatisticsHandler((_, json) => @@ -53,6 +54,7 @@ namespace TelpoPush.Service.Mq.Kafka .SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); + logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}"); Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}"); }) .SetPartitionsRevokedHandler((c, partitions) => @@ -83,7 +85,7 @@ namespace TelpoPush.Service.Mq.Kafka OperType = BitConverter.ToInt32(item.GetValueBytes(), 0); } var Headers = new { DataType, AlarmType, OperType }; - logger.LogInformation($"Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'."); + logger.LogInformation($"[{env.EnvironmentName}]Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'."); if (consumeResult.IsPartitionEOF) { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");