From f09e10edc95b7afb29983c580e589dba080e778b Mon Sep 17 00:00:00 2001 From: H Vs Date: Thu, 3 Apr 2025 09:29:57 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/kafka_service.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/services/kafka_service.py b/services/kafka_service.py index 80ea012..a3e8322 100644 --- a/services/kafka_service.py +++ b/services/kafka_service.py @@ -39,7 +39,8 @@ class KafkaService: try: self.producer = AIOKafkaProducer( bootstrap_servers=self.bootstrap_servers, - compression_type="gzip" + compression_type="gzip", + max_request_size=104857600 ) await self.producer.start() except KafkaError as e: @@ -68,9 +69,13 @@ class KafkaService: raise RuntimeError("Producer not initialized") target_topic = topic or self.producer_topic - print(f'生产者topic:{target_topic}') + #print(f'生产者topic:{target_topic}') #logger.info(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}") - print(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}") + # print(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}") + #print(f'生产者topic:{target_topic}') + json_str = json.dumps(json.loads(message), ensure_ascii=False) + truncated = json_str if len(json_str) <= 300 else f"{json_str[:297]}..." + print(f"生产者topic:{target_topic}\n生产者消息:{truncated}") try: await self.producer.send_and_wait( target_topic,