You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

kafka_service.py 6.3KB

1 月之前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import asyncio
  2. from typing import Dict, Callable, Optional
  3. from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
  4. from aiokafka.errors import KafkaError
  5. from fastapi import FastAPI
  6. import json
  7. from common.log import logger
  8. class KafkaService:
  9. _instance = None
  10. def __new__(cls, *args, **kwargs):
  11. if not cls._instance:
  12. cls._instance = super().__new__(cls)
  13. return cls._instance
  14. def __init__(
  15. self,
  16. bootstrap_servers: str = "localhost:9092",
  17. producer_topic: str = "default_topic",
  18. consumer_topic: str = "default_topic",
  19. group_id: str = "fastapi-group"
  20. ):
  21. if not hasattr(self, 'initialized'):
  22. self.bootstrap_servers = bootstrap_servers
  23. self.producer_topic = producer_topic
  24. self.consumer_topic = consumer_topic
  25. self.group_id = group_id
  26. self.producer: Optional[AIOKafkaProducer] = None
  27. self.consumer: Optional[AIOKafkaConsumer] = None
  28. self.consumer_task: Optional[asyncio.Task] = None
  29. self.message_handlers: Dict[str, Callable] = {}
  30. self.initialized = True
  31. async def connect_producer(self):
  32. """Initialize Kafka producer"""
  33. try:
  34. self.producer = AIOKafkaProducer(
  35. bootstrap_servers=self.bootstrap_servers,
  36. compression_type="gzip"
  37. )
  38. await self.producer.start()
  39. except KafkaError as e:
  40. print(f"Producer connection failed: {e}")
  41. raise
  42. async def connect_consumer(self):
  43. """Initialize Kafka consumer"""
  44. try:
  45. self.consumer = AIOKafkaConsumer(
  46. self.consumer_topic,
  47. bootstrap_servers=self.bootstrap_servers,
  48. group_id=self.group_id,
  49. auto_offset_reset="earliest",
  50. session_timeout_ms=30000, # 增加会话超时时间
  51. heartbeat_interval_ms=10000 # 增加心跳间隔时间
  52. )
  53. await self.consumer.start()
  54. except KafkaError as e:
  55. print(f"Consumer connection failed: {e}")
  56. raise
  57. async def send_message_async(self, message: str, topic: str = None):
  58. """Send message to Kafka topic"""
  59. if not self.producer:
  60. raise RuntimeError("Producer not initialized")
  61. target_topic = topic or self.producer_topic
  62. print(f'生产者topic:{target_topic}')
  63. logger.info(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}")
  64. try:
  65. await self.producer.send_and_wait(
  66. target_topic,
  67. message.encode('utf-8')
  68. )
  69. except KafkaError as e:
  70. print(f"Error sending message: {e}")
  71. raise
  72. # async def consume_messages(self):
  73. # """Start consuming messages from Kafka"""
  74. # if not self.consumer:
  75. # raise RuntimeError("Consumer not initialized")
  76. # try:
  77. # async for msg in self.consumer:
  78. # #print(f"Received message: {msg.value.decode()}")
  79. # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}")
  80. # topic = msg.topic
  81. # if topic in self.message_handlers:
  82. # handler = self.message_handlers[topic]
  83. # await handler(msg.value.decode())
  84. # except Exception as e:
  85. # print(f"Consuming error: {e}")
  86. # raise
  87. # finally:
  88. # await self.consumer.stop()
  89. # async def consume_messages(self):
  90. # """Start consuming messages from Kafka"""
  91. # if not self.consumer:
  92. # raise RuntimeError("Consumer not initialized")
  93. # try:
  94. # async for msg in self.consumer:
  95. # try:
  96. # logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}")
  97. # topic = msg.topic
  98. # if topic in self.message_handlers:
  99. # handler = self.message_handlers[topic]
  100. # await handler(msg.value.decode())
  101. # else:
  102. # logger.warning(f"未处理消息类型: {topic}")
  103. # except Exception as e:
  104. # logger.error(f"处理消息失败: {e}")
  105. # except Exception as e:
  106. # logger.error(f"消费消息异常: {e}")
  107. # raise
  108. # finally:
  109. # await self.consumer.stop()
  110. async def consume_messages(self):
  111. """Start consuming messages from Kafka"""
  112. if not self.consumer:
  113. raise RuntimeError("Consumer not initialized")
  114. while True:
  115. try:
  116. async for msg in self.consumer:
  117. try:
  118. logger.info(f"接收到kafka消息: {json.dumps(json.loads(msg.value.decode()), ensure_ascii=False)}")
  119. topic = msg.topic
  120. if topic in self.message_handlers:
  121. handler = self.message_handlers[topic]
  122. await handler(msg.value.decode())
  123. else:
  124. logger.warning(f"未处理消息类型: {topic}")
  125. except Exception as e:
  126. logger.error(f"处理消息失败: {e}")
  127. except Exception as e:
  128. logger.error(f"消费消息异常: {e}")
  129. await asyncio.sleep(5) # 等待一段时间后重试
  130. def add_handler(self, topic: str, handler: Callable):
  131. """Add message handler for specific topic"""
  132. self.message_handlers[topic] = handler
  133. async def start(self):
  134. """Start both producer and consumer"""
  135. await self.connect_producer()
  136. await self.connect_consumer()
  137. self.consumer_task = asyncio.create_task(self.consume_messages())
  138. async def stop(self):
  139. """Graceful shutdown"""
  140. if self.producer:
  141. await self.producer.stop()
  142. if self.consumer:
  143. await self.consumer.stop()
  144. if self.consumer_task:
  145. self.consumer_task.cancel()
  146. try:
  147. await self.consumer_task
  148. except asyncio.CancelledError:
  149. pass