diff --git a/TelpoPush.Position.Worker/Service/Mq/KafkaService.cs b/TelpoPush.Position.Worker/Service/Mq/KafkaService.cs index 688aa97..796d8d2 100644 --- a/TelpoPush.Position.Worker/Service/Mq/KafkaService.cs +++ b/TelpoPush.Position.Worker/Service/Mq/KafkaService.cs @@ -97,7 +97,17 @@ namespace TelpoPush.Position.Worker.Service.Mq continue; } if (!string.IsNullOrEmpty(messageResult)) + { messageFunc(topic, messageResult, headers); + try + { + consumer.Commit(consumeResult); + } + catch (KafkaException e) + { + logger.LogError($" - {e.Message}."); + } + } #region 注释 //string messageResult = null; //Headers headers = null;