|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
- import os
- from common.singleton import singleton
- from config import conf
- # 定义全局 redis_helper
- kafka_client = None
-
- class KafkaClient:
- def __init__(self):
-
- bootstrap_servers=conf().get("kafka_bootstrap_servers")
- agent_tel=os.environ.get('tel', '19200137635')
- consumer_group=f'aiops-wx_{agent_tel}'
- print(f'kafka消费组 {consumer_group}')
- topic="topic.aiops.wx"
-
- self.bootstrap_servers = bootstrap_servers
- self.consumer_group = consumer_group
- self.topic = topic
-
- self.producer = Producer({'bootstrap.servers': self.bootstrap_servers})
- self.consumer = Consumer({
- 'bootstrap.servers': self.bootstrap_servers,
- 'group.id': self.consumer_group,
- 'auto.offset.reset': 'earliest',
- # 'enable.auto.commit': False # 禁用自动提交,使用手动提交
- 'enable.auto.commit': True
- })
-
- def delivery_report(self, err, msg):
- """
- 回调函数,用于确认消息是否成功发送
- """
- if err is not None:
- print(f"Message delivery failed: {err}")
- else:
- print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
-
- def produce_messages(self, messages):
- """
- 发送消息
- """
- try:
- for message in messages:
- self.producer.produce(self.topic, value=message, callback=self.delivery_report)
- print(f"Produced: {message}")
- self.producer.poll(0)
- except Exception as e:
- print(f"An error occurred: {e}")
- finally:
- self.producer.flush()
-
- def produce_message(self, message):
- """
- 发送消息
- """
- try:
- self.producer.produce(self.topic, value=message, callback=self.delivery_report)
- # print(f"Produced: {message}")
- self.producer.poll(0)
- except Exception as e:
- print(f"An error occurred: {e}")
- finally:
- self.producer.flush()
-
- def consume_messages(self,process_callback, user_nickname):
- """
- 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量
- :param process_callback: 业务逻辑回调函数,返回布尔值
- :param user_nickname: 用户昵称
- """
- self.consumer.subscribe([self.topic])
-
- try:
- while True:
- msg = self.consumer.poll(0.3)
- if msg is None:
- continue
- if msg.error():
- if msg.error().code() == KafkaError._PARTITION_EOF:
- print(f"End of partition {msg.partition}, offset {msg.offset()}")
- else:
- raise KafkaException(msg.error())
- else:
- # 调用业务处理逻辑
- # process_callback(msg.value().decode('utf-8'))
- # 调用业务处理逻辑,传递 user_nickname 和消息
- process_callback(user_nickname, msg.value().decode('utf-8'))
- # if process_callback(user_nickname, msg.value().decode('utf-8')):
- # # 如果返回 True,表示处理成功,可以提交偏移量
- # try:
- # # self.consumer.commit(msg)
- # self.consumer.commit(message=msg, asynchronous=True)
- # print(f"Manually committed offset: {msg.offset()}")
- # except KafkaException as e:
- # print(f"Error committing offset: {e}")
- except KeyboardInterrupt:
- print("消费中断")
- finally:
- self.consumer.close()
-
- # if __name__ == '__main__':
- # kafka_client = KafkaClient(bootstrap_servers='localhost:9092', consumer_group='my-consumer-group', topic='my_topic')
-
- # # 生产消息
- # messages_to_produce = [f"Message {i}" for i in range(10)]
- # kafka_client.produce_messages(messages_to_produce)
-
- # # 消费消息
- # kafka_client.consume_messages()
-
- def start():
- global kafka_client
- kafka_client = KafkaClient()
|