from confluent_kafka import Producer, Consumer, KafkaException, KafkaError import os from common.singleton import singleton from config import conf # 定义全局 redis_helper kafka_client = None class KafkaClient: def __init__(self): bootstrap_servers=conf().get("kafka_bootstrap_servers") agent_tel=os.environ.get('tel', '19200137635') consumer_group=f'aiops-wx_{agent_tel}' print(f'kafka消费组 {consumer_group}') topic="topic.aiops.wx" self.bootstrap_servers = bootstrap_servers self.consumer_group = consumer_group self.topic = topic self.producer = Producer({'bootstrap.servers': self.bootstrap_servers}) self.consumer = Consumer({ 'bootstrap.servers': self.bootstrap_servers, 'group.id': self.consumer_group, 'auto.offset.reset': 'earliest', # 'enable.auto.commit': False # 禁用自动提交,使用手动提交 'enable.auto.commit': True }) def delivery_report(self, err, msg): """ 回调函数,用于确认消息是否成功发送 """ if err is not None: print(f"Message delivery failed: {err}") else: print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}") def produce_messages(self, messages): """ 发送消息 """ try: for message in messages: self.producer.produce(self.topic, value=message, callback=self.delivery_report) print(f"Produced: {message}") self.producer.poll(0) except Exception as e: print(f"An error occurred: {e}") finally: self.producer.flush() def produce_message(self, message): """ 发送消息 """ try: self.producer.produce(self.topic, value=message, callback=self.delivery_report) # print(f"Produced: {message}") self.producer.poll(0) except Exception as e: print(f"An error occurred: {e}") finally: self.producer.flush() def consume_messages(self,process_callback, user_nickname): """ 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量 :param process_callback: 业务逻辑回调函数,返回布尔值 :param user_nickname: 用户昵称 """ self.consumer.subscribe([self.topic]) try: while True: msg = self.consumer.poll(0.3) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: print(f"End of partition {msg.partition}, offset {msg.offset()}") else: raise KafkaException(msg.error()) else: # 调用业务处理逻辑 # process_callback(msg.value().decode('utf-8')) # 调用业务处理逻辑,传递 user_nickname 和消息 process_callback(user_nickname, msg.value().decode('utf-8')) # if process_callback(user_nickname, msg.value().decode('utf-8')): # # 如果返回 True,表示处理成功,可以提交偏移量 # try: # # self.consumer.commit(msg) # self.consumer.commit(message=msg, asynchronous=True) # print(f"Manually committed offset: {msg.offset()}") # except KafkaException as e: # print(f"Error committing offset: {e}") except KeyboardInterrupt: print("消费中断") finally: self.consumer.close() # if __name__ == '__main__': # kafka_client = KafkaClient(bootstrap_servers='localhost:9092', consumer_group='my-consumer-group', topic='my_topic') # # 生产消息 # messages_to_produce = [f"Message {i}" for i in range(10)] # kafka_client.produce_messages(messages_to_produce) # # 消费消息 # kafka_client.consume_messages() def start(): global kafka_client kafka_client = KafkaClient()