您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

186 行
7.5KB

  1. import asyncio
  2. from typing import Dict, Callable, Optional
  3. from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
  4. from aiokafka.errors import KafkaError
  5. from fastapi import FastAPI
  6. import json
  7. from common.log import logger
  8. class KafkaService:
  9. _instance = None
  10. def __new__(cls, *args, **kwargs):
  11. if not cls._instance:
  12. cls._instance = super().__new__(cls)
  13. return cls._instance
  14. def __init__(
  15. self,
  16. bootstrap_servers: str = "localhost:9092",
  17. producer_topic: str = "default_topic",
  18. consumer_topic: str = "default_topic",
  19. group_id: str = "fastapi-group"
  20. ):
  21. if not hasattr(self, 'initialized'):
  22. self.bootstrap_servers = bootstrap_servers
  23. self.producer_topic = producer_topic
  24. self.consumer_topic = consumer_topic
  25. self.group_id = group_id
  26. self.producer: Optional[AIOKafkaProducer] = None
  27. self.consumer: Optional[AIOKafkaConsumer] = None
  28. self.consumer_task: Optional[asyncio.Task] = None
  29. self.message_handlers: Dict[str, Callable] = {}
  30. self.initialized = True
  31. async def connect_producer(self):
  32. """Initialize Kafka producer"""
  33. try:
  34. self.producer = AIOKafkaProducer(
  35. bootstrap_servers=self.bootstrap_servers,
  36. compression_type="gzip",
  37. max_request_size=104857600
  38. )
  39. await self.producer.start()
  40. except KafkaError as e:
  41. print(f"Producer connection failed: {e}")
  42. raise
  43. async def connect_consumer(self):
  44. """Initialize Kafka consumer"""
  45. try:
  46. self.consumer = AIOKafkaConsumer(
  47. self.consumer_topic,
  48. bootstrap_servers=self.bootstrap_servers,
  49. group_id=self.group_id,
  50. auto_offset_reset="earliest",
  51. session_timeout_ms=30000, # 增加会话超时时间
  52. heartbeat_interval_ms=10000 # 增加心跳间隔时间
  53. )
  54. await self.consumer.start()
  55. except KafkaError as e:
  56. print(f"Consumer connection failed: {e}")
  57. raise
  58. async def send_message_async(self, message: str, topic: str = None):
  59. """Send message to Kafka topic"""
  60. if not self.producer:
  61. raise RuntimeError("Producer not initialized")
  62. target_topic = topic or self.producer_topic
  63. #print(f'生产者topic:{target_topic}')
  64. #logger.info(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}")
  65. # print(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}")
  66. #print(f'生产者topic:{target_topic}')
  67. json_str = json.dumps(json.loads(message), ensure_ascii=False)
  68. truncated = json_str if len(json_str) <= 300 else f"{json_str[:297]}..."
  69. print(f"生产者topic:{target_topic}\n生产者消息:{truncated}")
  70. try:
  71. await self.producer.send_and_wait(
  72. target_topic,
  73. message.encode('utf-8')
  74. )
  75. except KafkaError as e:
  76. print(f"Error sending message: {e}")
  77. raise
  78. # async def consume_messages(self):
  79. # """Start consuming messages from Kafka"""
  80. # if not self.consumer:
  81. # raise RuntimeError("Consumer not initialized")
  82. # try:
  83. # async for msg in self.consumer:
  84. # #print(f"Received message: {msg.value.decode()}")
  85. # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}")
  86. # topic = msg.topic
  87. # if topic in self.message_handlers:
  88. # handler = self.message_handlers[topic]
  89. # await handler(msg.value.decode())
  90. # except Exception as e:
  91. # print(f"Consuming error: {e}")
  92. # raise
  93. # finally:
  94. # await self.consumer.stop()
  95. # async def consume_messages(self):
  96. # """Start consuming messages from Kafka"""
  97. # if not self.consumer:
  98. # raise RuntimeError("Consumer not initialized")
  99. # try:
  100. # async for msg in self.consumer:
  101. # try:
  102. # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}")
  103. # topic = msg.topic
  104. # if topic in self.message_handlers:
  105. # handler = self.message_handlers[topic]
  106. # await handler(msg.value.decode())
  107. # else:
  108. # logger.warning(f"未处理消息类型: {topic}")
  109. # except Exception as e:
  110. # logger.error(f"处理消息失败: {e}")
  111. # except Exception as e:
  112. # logger.error(f"消费消息异常: {e}")
  113. # raise
  114. # finally:
  115. # await self.consumer.stop()
  116. async def consume_messages(self):
  117. """Start consuming messages from Kafka"""
  118. if not self.consumer:
  119. raise RuntimeError("Consumer not initialized")
  120. while True:
  121. try:
  122. async for msg in self.consumer:
  123. try:
  124. # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}")
  125. #logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)[:300]}")
  126. json_str = json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)
  127. truncated = json_str if len(json_str) <= 300 else f"{json_str[:297]}..."
  128. logger.info(f"接收到kafka消息: {truncated}")
  129. topic = msg.topic
  130. if topic in self.message_handlers:
  131. handler = self.message_handlers[topic]
  132. await handler(msg.value.decode())
  133. else:
  134. logger.warning(f"未处理消息类型: {topic}")
  135. except Exception as e:
  136. logger.error(f"处理消息失败: {e}")
  137. except Exception as e:
  138. logger.error(f"消费消息异常: {e}")
  139. await asyncio.sleep(5) # 等待一段时间后重试
  140. def add_handler(self, topic: str, handler: Callable):
  141. """Add message handler for specific topic"""
  142. self.message_handlers[topic] = handler
  143. async def start(self):
  144. """Start both producer and consumer"""
  145. await self.connect_producer()
  146. await self.connect_consumer()
  147. self.consumer_task = asyncio.create_task(self.consume_messages())
  148. async def start_producer(self):
  149. """Start both producer and consumer"""
  150. await self.connect_producer()
  151. async def stop_producer(self):
  152. if self.producer:
  153. await self.producer.stop()
  154. async def stop(self):
  155. """Graceful shutdown"""
  156. if self.producer:
  157. await self.producer.stop()
  158. if self.consumer:
  159. await self.consumer.stop()
  160. if self.consumer_task:
  161. self.consumer_task.cancel()
  162. try:
  163. await self.consumer_task
  164. except asyncio.CancelledError:
  165. pass