Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

103 linhas
3.9KB

  1. from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
  2. import os
  3. from config import conf
  4. # 定义全局 redis_helper
  5. kafka_client = None
  6. class KafkaClient:
  7. def __init__(self):
  8. bootstrap_servers=conf().get("kafka_bootstrap_servers")
  9. agent_tel=os.environ.get('tel', '18029274615')
  10. consumer_group=f'aiops-wx_{agent_tel}'
  11. print(f'kafka消费组 {consumer_group}')
  12. topic="topic.ai.ops.wx"
  13. self.bootstrap_servers = bootstrap_servers
  14. self.consumer_group = consumer_group
  15. self.topic = topic
  16. self.producer = Producer({'bootstrap.servers': self.bootstrap_servers})
  17. self.consumer = Consumer({
  18. 'bootstrap.servers': self.bootstrap_servers,
  19. 'group.id': self.consumer_group,
  20. 'auto.offset.reset': 'earliest',
  21. # 'enable.auto.commit': False # 禁用自动提交,使用手动提交
  22. 'enable.auto.commit': True
  23. })
  24. def delivery_report(self, err, msg):
  25. """
  26. 回调函数,用于确认消息是否成功发送
  27. """
  28. if err is not None:
  29. print(f"Message delivery failed: {err}")
  30. else:
  31. print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
  32. def produce_messages(self, messages):
  33. """
  34. 发送消息
  35. """
  36. try:
  37. for message in messages:
  38. self.producer.produce(self.topic, value=message, callback=self.delivery_report)
  39. print(f"Produced: {message}")
  40. self.producer.poll(0)
  41. except Exception as e:
  42. print(f"An error occurred: {e}")
  43. finally:
  44. self.producer.flush()
  45. def produce_message(self, message):
  46. """
  47. 发送消息
  48. """
  49. try:
  50. self.producer.produce(self.topic, value=message, callback=self.delivery_report)
  51. # print(f"Produced: {message}")
  52. self.producer.poll(0)
  53. except Exception as e:
  54. print(f"An error occurred: {e}")
  55. finally:
  56. self.producer.flush()
  57. def consume_messages(self,process_callback):
  58. """
  59. 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量
  60. :param process_callback: 业务逻辑回调函数,返回布尔值
  61. :param user_nickname: 用户昵称
  62. """
  63. self.consumer.subscribe([self.topic])
  64. try:
  65. while True:
  66. msg = self.consumer.poll(0.3)
  67. if msg is None:
  68. continue
  69. if msg.error():
  70. if msg.error().code() == KafkaError._PARTITION_EOF:
  71. print(f"End of partition {msg.partition}, offset {msg.offset()}")
  72. else:
  73. raise KafkaException(msg.error())
  74. else:
  75. # 调用业务处理逻辑
  76. # process_callback(msg.value().decode('utf-8'))
  77. # 调用业务处理逻辑,传递 user_nickname 和消息
  78. process_callback(msg.value().decode('utf-8'))
  79. # if process_callback(user_nickname, msg.value().decode('utf-8')):
  80. # # 如果返回 True,表示处理成功,可以提交偏移量
  81. # try:
  82. # # self.consumer.commit(msg)
  83. # self.consumer.commit(message=msg, asynchronous=True)
  84. # print(f"Manually committed offset: {msg.offset()}")
  85. # except KafkaException as e:
  86. # print(f"Error committing offset: {e}")
  87. except KeyboardInterrupt:
  88. print("消费中断")
  89. finally:
  90. self.consumer.close()
  91. def start():
  92. global kafka_client
  93. kafka_client = KafkaClient()