H Vs 2 tygodni temu
rodzic
commit
f09e10edc9
1 zmienionych plików z 8 dodań i 3 usunięć
  1. +8
    -3
      services/kafka_service.py

+ 8
- 3
services/kafka_service.py Wyświetl plik

@@ -39,7 +39,8 @@ class KafkaService:
try:
self.producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
compression_type="gzip"
compression_type="gzip",
max_request_size=104857600
)
await self.producer.start()
except KafkaError as e:
@@ -68,9 +69,13 @@ class KafkaService:
raise RuntimeError("Producer not initialized")
target_topic = topic or self.producer_topic
print(f'生产者topic:{target_topic}')
#print(f'生产者topic:{target_topic}')
#logger.info(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}")
print(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}")
# print(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}")
#print(f'生产者topic:{target_topic}')
json_str = json.dumps(json.loads(message), ensure_ascii=False)
truncated = json_str if len(json_str) <= 300 else f"{json_str[:297]}..."
print(f"生产者topic:{target_topic}\n生产者消息:{truncated}")
try:
await self.producer.send_and_wait(
target_topic,


Ładowanie…
Anuluj
Zapisz