No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

114 líneas
4.3KB

  1. from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
  2. import os
  3. from common.singleton import singleton
  4. from config import conf
  5. # 定义全局 redis_helper
  6. kafka_client = None
  7. class KafkaClient:
  8. def __init__(self):
  9. bootstrap_servers=conf().get("kafka_bootstrap_servers")
  10. agent_tel=os.environ.get('tel', '19200137635')
  11. consumer_group=f'aiops-wx_{agent_tel}'
  12. print(f'kafka消费组 {consumer_group}')
  13. topic="topic.aiops.wx"
  14. self.bootstrap_servers = bootstrap_servers
  15. self.consumer_group = consumer_group
  16. self.topic = topic
  17. self.producer = Producer({'bootstrap.servers': self.bootstrap_servers})
  18. self.consumer = Consumer({
  19. 'bootstrap.servers': self.bootstrap_servers,
  20. 'group.id': self.consumer_group,
  21. 'auto.offset.reset': 'earliest',
  22. # 'enable.auto.commit': False # 禁用自动提交,使用手动提交
  23. 'enable.auto.commit': True
  24. })
  25. def delivery_report(self, err, msg):
  26. """
  27. 回调函数,用于确认消息是否成功发送
  28. """
  29. if err is not None:
  30. print(f"Message delivery failed: {err}")
  31. else:
  32. print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
  33. def produce_messages(self, messages):
  34. """
  35. 发送消息
  36. """
  37. try:
  38. for message in messages:
  39. self.producer.produce(self.topic, value=message, callback=self.delivery_report)
  40. print(f"Produced: {message}")
  41. self.producer.poll(0)
  42. except Exception as e:
  43. print(f"An error occurred: {e}")
  44. finally:
  45. self.producer.flush()
  46. def produce_message(self, message):
  47. """
  48. 发送消息
  49. """
  50. try:
  51. self.producer.produce(self.topic, value=message, callback=self.delivery_report)
  52. # print(f"Produced: {message}")
  53. self.producer.poll(0)
  54. except Exception as e:
  55. print(f"An error occurred: {e}")
  56. finally:
  57. self.producer.flush()
  58. def consume_messages(self,process_callback, user_nickname):
  59. """
  60. 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量
  61. :param process_callback: 业务逻辑回调函数,返回布尔值
  62. :param user_nickname: 用户昵称
  63. """
  64. self.consumer.subscribe([self.topic])
  65. try:
  66. while True:
  67. msg = self.consumer.poll(0.3)
  68. if msg is None:
  69. continue
  70. if msg.error():
  71. if msg.error().code() == KafkaError._PARTITION_EOF:
  72. print(f"End of partition {msg.partition}, offset {msg.offset()}")
  73. else:
  74. raise KafkaException(msg.error())
  75. else:
  76. # 调用业务处理逻辑
  77. # process_callback(msg.value().decode('utf-8'))
  78. # 调用业务处理逻辑,传递 user_nickname 和消息
  79. process_callback(user_nickname, msg.value().decode('utf-8'))
  80. # if process_callback(user_nickname, msg.value().decode('utf-8')):
  81. # # 如果返回 True,表示处理成功,可以提交偏移量
  82. # try:
  83. # # self.consumer.commit(msg)
  84. # self.consumer.commit(message=msg, asynchronous=True)
  85. # print(f"Manually committed offset: {msg.offset()}")
  86. # except KafkaException as e:
  87. # print(f"Error committing offset: {e}")
  88. except KeyboardInterrupt:
  89. print("消费中断")
  90. finally:
  91. self.consumer.close()
  92. # if __name__ == '__main__':
  93. # kafka_client = KafkaClient(bootstrap_servers='localhost:9092', consumer_group='my-consumer-group', topic='my_topic')
  94. # # 生产消息
  95. # messages_to_produce = [f"Message {i}" for i in range(10)]
  96. # kafka_client.produce_messages(messages_to_produce)
  97. # # 消费消息
  98. # kafka_client.consume_messages()
  99. def start():
  100. global kafka_client
  101. kafka_client = KafkaClient()