from confluent_kafka import Producer, Consumer, KafkaException, KafkaError import os,time from config import conf # 定义全局 redis_helper kafka_client = None class KafkaClient: def __init__(self): bootstrap_servers=conf().get("kafka_bootstrap_servers") agent_tel=os.environ.get('tel', '18029274615') consumer_group=f'aiops-wx_{agent_tel}' print(f'kafka消费组 {consumer_group}') topic="topic.ai.ops.wx" self.bootstrap_servers = bootstrap_servers self.consumer_group = consumer_group self.topic = topic self.producer = Producer({'bootstrap.servers': self.bootstrap_servers}) self.consumer = Consumer({ 'bootstrap.servers': self.bootstrap_servers, 'group.id': self.consumer_group, 'auto.offset.reset': 'earliest', # 'enable.auto.commit': False # 禁用自动提交,使用手动提交 'enable.auto.commit': True }) def delivery_report(self, err, msg): """ 回调函数,用于确认消息是否成功发送 """ if err is not None: print(f"Message delivery failed: {err}") else: print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}") def produce_messages(self, messages): """ 发送消息 """ try: for message in messages: self.producer.produce(self.topic, value=message, callback=self.delivery_report) print(f"Produced: {message}") self.producer.poll(0) except Exception as e: print(f"An error occurred: {e}") finally: self.producer.flush() def produce_message(self, message): """ 发送消息 """ try: self.producer.produce(self.topic, value=message, callback=self.delivery_report) # print(f"Produced: {message}") self.producer.poll(0) except Exception as e: print(f"An error occurred: {e}") finally: self.producer.flush() def consume_messages(self,process_callback): """ 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量 :param process_callback: 业务逻辑回调函数,返回布尔值 :param user_nickname: 用户昵称 """ self.consumer.subscribe([self.topic]) try: while True: msg = self.consumer.poll(0.3) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: print(f"End of partition {msg.partition}, offset {msg.offset()}") else: raise KafkaException(msg.error()) else: # 调用业务处理逻辑 # process_callback(msg.value().decode('utf-8')) # 调用业务处理逻辑,传递 user_nickname 和消息 process_callback(msg.value().decode('utf-8')) # if process_callback(user_nickname, msg.value().decode('utf-8')): # # 如果返回 True,表示处理成功,可以提交偏移量 # try: # # self.consumer.commit(msg) # self.consumer.commit(message=msg, asynchronous=True) # print(f"Manually committed offset: {msg.offset()}") # except KafkaException as e: # print(f"Error committing offset: {e}") except KeyboardInterrupt: print("消费中断") finally: self.consumer.close() def consume_messages(self,agent_tel,process_callback): """ 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量 :param process_callback: 业务逻辑回调函数,返回布尔值 :param agent_tel: 代理商手机号 """ consumer=Consumer({ 'bootstrap.servers': self.bootstrap_servers, 'group.id': f'aiops-wx_{agent_tel}', 'auto.offset.reset': 'earliest', # 'enable.auto.commit': False # 禁用自动提交,使用手动提交 'enable.auto.commit': True }) consumer.subscribe([self.topic]) try: while True: msg = consumer.poll(0.3) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: print(f"End of partition {msg.partition}, offset {msg.offset()}") else: raise KafkaException(msg.error()) else: # 调用业务处理逻辑 # process_callback(msg.value().decode('utf-8')) # 调用业务处理逻辑,传递 user_nickname 和消息 process_callback(agent_tel,msg.value().decode('utf-8')) # if process_callback(user_nickname, msg.value().decode('utf-8')): # # 如果返回 True,表示处理成功,可以提交偏移量 # try: # # self.consumer.commit(msg) # self.consumer.commit(message=msg, asynchronous=True) # print(f"Manually committed offset: {msg.offset()}") # except KafkaException as e: # print(f"Error committing offset: {e}") except KafkaException as e: print(f"Kafka exception occurred: {e}") if 'KafkaError._ALL_BROKERS_DOWN' in str(e): print(f"Kafka brokers for agent {agent_tel} are down, retrying in 5 seconds...") time.sleep(5) self._reconnect_consumer_with_agent_tel(consumer, agent_tel) except Exception as e: print(f"An unexpected error occurred: {e}") time.sleep(5) def _reconnect_consumer_with_agent_tel(self, consumer, agent_tel): """ 尝试为指定的代理商重新连接 Kafka 消费者 """ print(f"Attempting to reconnect Kafka consumer for agent {agent_tel}...") try: consumer.close() # Close the old consumer consumer = Consumer({ 'bootstrap.servers': self.bootstrap_servers, 'group.id': f'aiops-wx_{agent_tel}', 'auto.offset.reset': 'earliest', 'enable.auto.commit': True }) consumer.subscribe([self.topic]) print(f"Reconnected successfully for agent {agent_tel}.") except KafkaException as e: print(f"Error while reconnecting for agent {agent_tel}: {e}") time.sleep(5) # Retry after 5 seconds def start(): global kafka_client kafka_client = KafkaClient()