|
- from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
- import os
- from common.singleton import singleton
- from config import conf
-
- kafka_client = None
-
- class KafkaClient:
- def __init__(self):
-
- bootstrap_servers=conf().get("kafka_bootstrap_servers")
- agent_tel=os.environ.get('tel', '19200137635')
- consumer_group=f'aiops-wx_{agent_tel}'
- print(f'kafka消费组 {consumer_group}')
- topic="topic.aiops.wx"
-
- self.bootstrap_servers = bootstrap_servers
- self.consumer_group = consumer_group
- self.topic = topic
-
- self.producer = Producer({'bootstrap.servers': self.bootstrap_servers})
- self.consumer = Consumer({
- 'bootstrap.servers': self.bootstrap_servers,
- 'group.id': self.consumer_group,
- 'auto.offset.reset': 'earliest',
-
- 'enable.auto.commit': True
- })
-
- def delivery_report(self, err, msg):
- """
- 回调函数,用于确认消息是否成功发送
- """
- if err is not None:
- print(f"Message delivery failed: {err}")
- else:
- print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
-
- def produce_messages(self, messages):
- """
- 发送消息
- """
- try:
- for message in messages:
- self.producer.produce(self.topic, value=message, callback=self.delivery_report)
- print(f"Produced: {message}")
- self.producer.poll(0)
- except Exception as e:
- print(f"An error occurred: {e}")
- finally:
- self.producer.flush()
-
- def produce_message(self, message):
- """
- 发送消息
- """
- try:
- self.producer.produce(self.topic, value=message, callback=self.delivery_report)
-
- self.producer.poll(0)
- except Exception as e:
- print(f"An error occurred: {e}")
- finally:
- self.producer.flush()
-
- def consume_messages(self,process_callback, user_nickname):
- """
- 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量
- :param process_callback: 业务逻辑回调函数,返回布尔值
- :param user_nickname: 用户昵称
- """
- self.consumer.subscribe([self.topic])
-
- try:
- while True:
- msg = self.consumer.poll(0.3)
- if msg is None:
- continue
- if msg.error():
- if msg.error().code() == KafkaError._PARTITION_EOF:
- print(f"End of partition {msg.partition}, offset {msg.offset()}")
- else:
- raise KafkaException(msg.error())
- else:
-
-
-
- process_callback(user_nickname, msg.value().decode('utf-8'))
-
-
-
-
-
-
-
-
- except KeyboardInterrupt:
- print("消费中断")
- finally:
- self.consumer.close()
-
-
-
-
-
-
-
-
-
-
-
- def start():
- global kafka_client
- kafka_client = KafkaClient()
|