import asyncio from typing import Dict, Callable, Optional from aiokafka import AIOKafkaProducer, AIOKafkaConsumer from aiokafka.errors import KafkaError from fastapi import FastAPI import json from common.log import logger class KafkaService: _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__( self, bootstrap_servers: str = "localhost:9092", producer_topic: str = "default_topic", consumer_topic: str = "default_topic", group_id: str = "fastapi-group" ): if not hasattr(self, 'initialized'): self.bootstrap_servers = bootstrap_servers self.producer_topic = producer_topic self.consumer_topic = consumer_topic self.group_id = group_id self.producer: Optional[AIOKafkaProducer] = None self.consumer: Optional[AIOKafkaConsumer] = None self.consumer_task: Optional[asyncio.Task] = None self.message_handlers: Dict[str, Callable] = {} self.initialized = True async def connect_producer(self): """Initialize Kafka producer""" try: self.producer = AIOKafkaProducer( bootstrap_servers=self.bootstrap_servers, compression_type="gzip", max_request_size=104857600 ) await self.producer.start() except KafkaError as e: print(f"Producer connection failed: {e}") raise async def connect_consumer(self): """Initialize Kafka consumer""" try: self.consumer = AIOKafkaConsumer( self.consumer_topic, bootstrap_servers=self.bootstrap_servers, group_id=self.group_id, auto_offset_reset="earliest", session_timeout_ms=30000, # 增加会话超时时间 heartbeat_interval_ms=10000 # 增加心跳间隔时间 ) await self.consumer.start() except KafkaError as e: print(f"Consumer connection failed: {e}") raise async def send_message_async(self, message: str, topic: str = None): """Send message to Kafka topic""" if not self.producer: raise RuntimeError("Producer not initialized") target_topic = topic or self.producer_topic #print(f'生产者topic:{target_topic}') #logger.info(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}") # print(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}") #print(f'生产者topic:{target_topic}') json_str = json.dumps(json.loads(message), ensure_ascii=False) truncated = json_str if len(json_str) <= 300 else f"{json_str[:297]}..." print(f"生产者topic:{target_topic}\n生产者消息:{truncated}") try: await self.producer.send_and_wait( target_topic, message.encode('utf-8') ) except KafkaError as e: print(f"Error sending message: {e}") raise # async def consume_messages(self): # """Start consuming messages from Kafka""" # if not self.consumer: # raise RuntimeError("Consumer not initialized") # try: # async for msg in self.consumer: # #print(f"Received message: {msg.value.decode()}") # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}") # topic = msg.topic # if topic in self.message_handlers: # handler = self.message_handlers[topic] # await handler(msg.value.decode()) # except Exception as e: # print(f"Consuming error: {e}") # raise # finally: # await self.consumer.stop() # async def consume_messages(self): # """Start consuming messages from Kafka""" # if not self.consumer: # raise RuntimeError("Consumer not initialized") # try: # async for msg in self.consumer: # try: # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}") # topic = msg.topic # if topic in self.message_handlers: # handler = self.message_handlers[topic] # await handler(msg.value.decode()) # else: # logger.warning(f"未处理消息类型: {topic}") # except Exception as e: # logger.error(f"处理消息失败: {e}") # except Exception as e: # logger.error(f"消费消息异常: {e}") # raise # finally: # await self.consumer.stop() async def consume_messages(self): """Start consuming messages from Kafka""" if not self.consumer: raise RuntimeError("Consumer not initialized") while True: try: async for msg in self.consumer: try: # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}") #logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)[:300]}") json_str = json.dumps(json.loads(msg.value.decode()), ensure_ascii=False) truncated = json_str if len(json_str) <= 300 else f"{json_str[:297]}..." logger.info(f"接收到kafka消息: {truncated}") topic = msg.topic if topic in self.message_handlers: handler = self.message_handlers[topic] await handler(msg.value.decode()) else: logger.warning(f"未处理消息类型: {topic}") except Exception as e: logger.error(f"处理消息失败: {e}") except Exception as e: logger.error(f"消费消息异常: {e}") await asyncio.sleep(5) # 等待一段时间后重试 def add_handler(self, topic: str, handler: Callable): """Add message handler for specific topic""" self.message_handlers[topic] = handler async def start(self): """Start both producer and consumer""" await self.connect_producer() await self.connect_consumer() self.consumer_task = asyncio.create_task(self.consume_messages()) async def start_producer(self): """Start both producer and consumer""" await self.connect_producer() async def stop_producer(self): if self.producer: await self.producer.stop() async def stop(self): """Graceful shutdown""" if self.producer: await self.producer.stop() if self.consumer: await self.consumer.stop() if self.consumer_task: self.consumer_task.cancel() try: await self.consumer_task except asyncio.CancelledError: pass