|
- from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
- import os,time
- from config import conf
- # 定义全局 redis_helper
- kafka_client = None
-
- class KafkaClient:
- def __init__(self):
-
- bootstrap_servers=conf().get("kafka_bootstrap_servers")
- agent_tel=os.environ.get('tel', '18029274615')
- consumer_group=f'aiops-wx_{agent_tel}'
- print(f'kafka消费组 {consumer_group}')
- topic="topic.ai.ops.wx"
-
- self.bootstrap_servers = bootstrap_servers
- self.consumer_group = consumer_group
- self.topic = topic
-
- self.producer = Producer({'bootstrap.servers': self.bootstrap_servers})
- self.consumer = Consumer({
- 'bootstrap.servers': self.bootstrap_servers,
- 'group.id': self.consumer_group,
- 'auto.offset.reset': 'earliest',
- # 'enable.auto.commit': False # 禁用自动提交,使用手动提交
- 'enable.auto.commit': True
- })
-
- def delivery_report(self, err, msg):
- """
- 回调函数,用于确认消息是否成功发送
- """
- if err is not None:
- print(f"Message delivery failed: {err}")
- else:
- print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
-
- def produce_messages(self, messages):
- """
- 发送消息
- """
- try:
- for message in messages:
- self.producer.produce(self.topic, value=message, callback=self.delivery_report)
- print(f"Produced: {message}")
- self.producer.poll(0)
- except Exception as e:
- print(f"An error occurred: {e}")
- finally:
- self.producer.flush()
-
- def produce_message(self, message):
- """
- 发送消息
- """
- try:
- self.producer.produce(self.topic, value=message, callback=self.delivery_report)
- # print(f"Produced: {message}")
- self.producer.poll(0)
- except Exception as e:
- print(f"An error occurred: {e}")
- finally:
- self.producer.flush()
-
- def consume_messages(self,process_callback):
- """
- 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量
- :param process_callback: 业务逻辑回调函数,返回布尔值
- :param user_nickname: 用户昵称
- """
- self.consumer.subscribe([self.topic])
-
- try:
- while True:
- msg = self.consumer.poll(0.3)
- if msg is None:
- continue
- if msg.error():
- if msg.error().code() == KafkaError._PARTITION_EOF:
- print(f"End of partition {msg.partition}, offset {msg.offset()}")
- else:
- raise KafkaException(msg.error())
- else:
- # 调用业务处理逻辑
- # process_callback(msg.value().decode('utf-8'))
- # 调用业务处理逻辑,传递 user_nickname 和消息
- process_callback(msg.value().decode('utf-8'))
- # if process_callback(user_nickname, msg.value().decode('utf-8')):
- # # 如果返回 True,表示处理成功,可以提交偏移量
- # try:
- # # self.consumer.commit(msg)
- # self.consumer.commit(message=msg, asynchronous=True)
- # print(f"Manually committed offset: {msg.offset()}")
- # except KafkaException as e:
- # print(f"Error committing offset: {e}")
- except KeyboardInterrupt:
- print("消费中断")
- finally:
- self.consumer.close()
-
- def consume_messages(self,agent_tel,process_callback):
- """
- 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量
- :param process_callback: 业务逻辑回调函数,返回布尔值
- :param agent_tel: 代理商手机号
- """
- consumer=Consumer({
- 'bootstrap.servers': self.bootstrap_servers,
- 'group.id': f'aiops-wx_{agent_tel}',
- 'auto.offset.reset': 'earliest',
- # 'enable.auto.commit': False # 禁用自动提交,使用手动提交
- 'enable.auto.commit': True
- })
- consumer.subscribe([self.topic])
-
- try:
- while True:
- msg = consumer.poll(0.3)
- if msg is None:
- continue
- if msg.error():
- if msg.error().code() == KafkaError._PARTITION_EOF:
- print(f"End of partition {msg.partition}, offset {msg.offset()}")
- else:
- raise KafkaException(msg.error())
- else:
- # 调用业务处理逻辑
- # process_callback(msg.value().decode('utf-8'))
- # 调用业务处理逻辑,传递 user_nickname 和消息
- process_callback(agent_tel,msg.value().decode('utf-8'))
- # if process_callback(user_nickname, msg.value().decode('utf-8')):
- # # 如果返回 True,表示处理成功,可以提交偏移量
- # try:
- # # self.consumer.commit(msg)
- # self.consumer.commit(message=msg, asynchronous=True)
- # print(f"Manually committed offset: {msg.offset()}")
- # except KafkaException as e:
- # print(f"Error committing offset: {e}")
- except KafkaException as e:
- print(f"Kafka exception occurred: {e}")
- if 'KafkaError._ALL_BROKERS_DOWN' in str(e):
- print(f"Kafka brokers for agent {agent_tel} are down, retrying in 5 seconds...")
- time.sleep(5)
- self._reconnect_consumer_with_agent_tel(consumer, agent_tel)
- except Exception as e:
- print(f"An unexpected error occurred: {e}")
- time.sleep(5)
-
-
- def _reconnect_consumer_with_agent_tel(self, consumer, agent_tel):
- """
- 尝试为指定的代理商重新连接 Kafka 消费者
- """
- print(f"Attempting to reconnect Kafka consumer for agent {agent_tel}...")
- try:
- consumer.close() # Close the old consumer
- consumer = Consumer({
- 'bootstrap.servers': self.bootstrap_servers,
- 'group.id': f'aiops-wx_{agent_tel}',
- 'auto.offset.reset': 'earliest',
- 'enable.auto.commit': True
- })
- consumer.subscribe([self.topic])
- print(f"Reconnected successfully for agent {agent_tel}.")
- except KafkaException as e:
- print(f"Error while reconnecting for agent {agent_tel}: {e}")
- time.sleep(5) # Retry after 5 seconds
-
-
- def start():
- global kafka_client
- kafka_client = KafkaClient()
|