Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

128 linhas
4.9KB

  1. from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
  2. import os,time,threading
  3. from config import conf
  4. # 定义全局 redis_helper
  5. kafka_client = None
  6. class KafkaClient:
  7. def __init__(self):
  8. bootstrap_servers=conf().get("kafka_bootstrap_servers")
  9. agent_tel=os.environ.get('tel', '18029274615')
  10. #consumer_group=f'aiops-wx_{agent_tel}'
  11. consumer_group='ai-ops-wx'
  12. print(f'kafka消费组 {consumer_group}')
  13. topic="topic.ai.ops.wx"
  14. self.bootstrap_servers = bootstrap_servers
  15. self.consumer_group = consumer_group
  16. self.topic = topic
  17. self.producer = Producer({'bootstrap.servers': self.bootstrap_servers})
  18. self.consumer = Consumer({
  19. 'bootstrap.servers': self.bootstrap_servers,
  20. 'group.id': self.consumer_group,
  21. 'auto.offset.reset': 'earliest',
  22. #'enable.auto.commit': False # 禁用自动提交,使用手动提交
  23. 'enable.auto.commit': True,
  24. 'max.poll.interval.ms': 600000
  25. })
  26. def delivery_report(self, err, msg):
  27. """
  28. 回调函数,用于确认消息是否成功发送
  29. """
  30. if err is not None:
  31. print(f"Message delivery failed: {err}")
  32. else:
  33. print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
  34. def produce_messages(self, messages):
  35. """
  36. 发送消息
  37. """
  38. try:
  39. for message in messages:
  40. self.producer.produce(self.topic, value=message, callback=self.delivery_report)
  41. print(f"Produced: {message}")
  42. self.producer.poll(0)
  43. except Exception as e:
  44. print(f"An error occurred: {e}")
  45. finally:
  46. self.producer.flush()
  47. def produce_message(self, message):
  48. """
  49. 发送消息
  50. """
  51. try:
  52. self.producer.produce(self.topic, value=message, callback=self.delivery_report)
  53. # print(f"Produced: {message}")
  54. self.producer.poll(0)
  55. except Exception as e:
  56. print(f"An error occurred: {e}")
  57. finally:
  58. self.producer.flush()
  59. def consume_messages(self,process_callback):
  60. """
  61. 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量
  62. :param process_callback: 业务逻辑回调函数,返回布尔值
  63. :param user_nickname: 用户昵称
  64. """
  65. self.consumer.subscribe([self.topic])
  66. try:
  67. while True:
  68. msg = self.consumer.poll(0.1)
  69. if msg is None:
  70. continue
  71. if msg.error():
  72. if msg.error().code() == KafkaError._PARTITION_EOF:
  73. print(f"End of partition {msg.partition}, offset {msg.offset()}")
  74. else:
  75. raise KafkaException(msg.error())
  76. else:
  77. # 调用业务处理逻辑
  78. # process_callback(msg.value().decode('utf-8'))
  79. # 调用业务处理逻辑,传递 user_nickname 和消息
  80. # process_callback(msg.value().decode('utf-8'))
  81. threading.Thread(target=process_callback, args=(msg.value().decode('utf-8'),)).start()
  82. # if process_callback(user_nickname, msg.value().decode('utf-8')):
  83. # # 如果返回 True,表示处理成功,可以提交偏移量
  84. # try:
  85. # # self.consumer.commit(msg)
  86. # self.consumer.commit(message=msg, asynchronous=True)
  87. # print(f"Manually committed offset: {msg.offset()}")
  88. # except KafkaException as e:
  89. # print(f"Error committing offset: {e}")
  90. except KeyboardInterrupt:
  91. print("消费中断")
  92. finally:
  93. self.consumer.close()
  94. def _reconnect_consumer_with_agent_tel(self, consumer, agent_tel):
  95. """
  96. 尝试为指定的代理商重新连接 Kafka 消费者
  97. """
  98. print(f"Attempting to reconnect Kafka consumer for agent {agent_tel}...")
  99. try:
  100. consumer.close() # Close the old consumer
  101. consumer = Consumer({
  102. 'bootstrap.servers': self.bootstrap_servers,
  103. 'group.id': f'aiops-wx_{agent_tel}',
  104. 'auto.offset.reset': 'earliest',
  105. 'enable.auto.commit': True
  106. })
  107. consumer.subscribe([self.topic])
  108. print(f"Reconnected successfully for agent {agent_tel}.")
  109. except KafkaException as e:
  110. print(f"Error while reconnecting for agent {agent_tel}: {e}")
  111. time.sleep(5) # Retry after 5 seconds
  112. def start():
  113. global kafka_client
  114. kafka_client = KafkaClient()