|
- import asyncio
- from typing import Dict, Callable, Optional
- from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
- from aiokafka.errors import KafkaError
- from fastapi import FastAPI
- import json
- from common.log import logger
-
- class KafkaService:
- _instance = None
-
- def __new__(cls, *args, **kwargs):
- if not cls._instance:
- cls._instance = super().__new__(cls)
- return cls._instance
-
- def __init__(
- self,
- bootstrap_servers: str = "localhost:9092",
- producer_topic: str = "default_topic",
- consumer_topic: str = "default_topic",
- group_id: str = "fastapi-group"
- ):
- if not hasattr(self, 'initialized'):
- self.bootstrap_servers = bootstrap_servers
- self.producer_topic = producer_topic
- self.consumer_topic = consumer_topic
- self.group_id = group_id
-
- self.producer: Optional[AIOKafkaProducer] = None
- self.consumer: Optional[AIOKafkaConsumer] = None
- self.consumer_task: Optional[asyncio.Task] = None
- self.message_handlers: Dict[str, Callable] = {}
-
- self.initialized = True
-
- async def connect_producer(self):
- """Initialize Kafka producer"""
- try:
- self.producer = AIOKafkaProducer(
- bootstrap_servers=self.bootstrap_servers,
- compression_type="gzip",
- max_request_size=104857600
- )
- await self.producer.start()
- except KafkaError as e:
- print(f"Producer connection failed: {e}")
- raise
-
- async def connect_consumer(self):
- """Initialize Kafka consumer"""
- try:
- self.consumer = AIOKafkaConsumer(
- self.consumer_topic,
- bootstrap_servers=self.bootstrap_servers,
- group_id=self.group_id,
- auto_offset_reset="earliest",
- session_timeout_ms=30000, # 增加会话超时时间
- heartbeat_interval_ms=10000 # 增加心跳间隔时间
- )
- await self.consumer.start()
- except KafkaError as e:
- print(f"Consumer connection failed: {e}")
- raise
-
- async def send_message_async(self, message: str, topic: str = None):
- """Send message to Kafka topic"""
- if not self.producer:
- raise RuntimeError("Producer not initialized")
-
- target_topic = topic or self.producer_topic
- #print(f'生产者topic:{target_topic}')
- #logger.info(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}")
- # print(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}")
- #print(f'生产者topic:{target_topic}')
- json_str = json.dumps(json.loads(message), ensure_ascii=False)
- truncated = json_str if len(json_str) <= 300 else f"{json_str[:297]}..."
- print(f"生产者topic:{target_topic}\n生产者消息:{truncated}")
- try:
- await self.producer.send_and_wait(
- target_topic,
- message.encode('utf-8')
- )
- except KafkaError as e:
- print(f"Error sending message: {e}")
- raise
-
- # async def consume_messages(self):
- # """Start consuming messages from Kafka"""
- # if not self.consumer:
- # raise RuntimeError("Consumer not initialized")
-
- # try:
- # async for msg in self.consumer:
- # #print(f"Received message: {msg.value.decode()}")
- # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}")
- # topic = msg.topic
- # if topic in self.message_handlers:
- # handler = self.message_handlers[topic]
- # await handler(msg.value.decode())
- # except Exception as e:
- # print(f"Consuming error: {e}")
- # raise
- # finally:
- # await self.consumer.stop()
-
-
- # async def consume_messages(self):
- # """Start consuming messages from Kafka"""
- # if not self.consumer:
- # raise RuntimeError("Consumer not initialized")
- # try:
- # async for msg in self.consumer:
- # try:
- # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}")
- # topic = msg.topic
- # if topic in self.message_handlers:
- # handler = self.message_handlers[topic]
- # await handler(msg.value.decode())
- # else:
- # logger.warning(f"未处理消息类型: {topic}")
- # except Exception as e:
- # logger.error(f"处理消息失败: {e}")
- # except Exception as e:
- # logger.error(f"消费消息异常: {e}")
- # raise
- # finally:
- # await self.consumer.stop()
-
- async def consume_messages(self):
- """Start consuming messages from Kafka"""
- if not self.consumer:
- raise RuntimeError("Consumer not initialized")
- while True:
- try:
- async for msg in self.consumer:
- try:
- # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}")
- #logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)[:300]}")
-
- json_str = json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)
- truncated = json_str if len(json_str) <= 300 else f"{json_str[:297]}..."
- logger.info(f"接收到kafka消息: {truncated}")
-
- topic = msg.topic
- if topic in self.message_handlers:
- handler = self.message_handlers[topic]
- await handler(msg.value.decode())
- else:
- logger.warning(f"未处理消息类型: {topic}")
- except Exception as e:
- logger.error(f"处理消息失败: {e}")
- except Exception as e:
- logger.error(f"消费消息异常: {e}")
- await asyncio.sleep(5) # 等待一段时间后重试
-
- def add_handler(self, topic: str, handler: Callable):
- """Add message handler for specific topic"""
- self.message_handlers[topic] = handler
-
- async def start(self):
- """Start both producer and consumer"""
- await self.connect_producer()
- await self.connect_consumer()
- self.consumer_task = asyncio.create_task(self.consume_messages())
-
- async def start_producer(self):
- """Start both producer and consumer"""
- await self.connect_producer()
-
- async def stop_producer(self):
- if self.producer:
- await self.producer.stop()
-
- async def stop(self):
- """Graceful shutdown"""
- if self.producer:
- await self.producer.stop()
- if self.consumer:
- await self.consumer.stop()
- if self.consumer_task:
- self.consumer_task.cancel()
- try:
- await self.consumer_task
- except asyncio.CancelledError:
- pass
|