From 6a4b21d9de39d9b883edf7bb8c26eaade6f01eae Mon Sep 17 00:00:00 2001 From: H Vs Date: Thu, 16 Jan 2025 17:44:46 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/kafka_helper.py | 16 ++++++++++------ resources/messages_resource.py | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/common/kafka_helper.py b/common/kafka_helper.py index 2e31dc1..5543d0e 100644 --- a/common/kafka_helper.py +++ b/common/kafka_helper.py @@ -1,5 +1,5 @@ from confluent_kafka import Producer, Consumer, KafkaException, KafkaError -import os,time +import os,time,threading from config import conf # 定义全局 redis_helper kafka_client = None @@ -9,7 +9,8 @@ class KafkaClient: bootstrap_servers=conf().get("kafka_bootstrap_servers") agent_tel=os.environ.get('tel', '18029274615') - consumer_group=f'aiops-wx_{agent_tel}' + #consumer_group=f'aiops-wx_{agent_tel}' + consumer_group='ai-ops-wx' print(f'kafka消费组 {consumer_group}') topic="topic.ai.ops.wx" @@ -22,8 +23,9 @@ class KafkaClient: 'bootstrap.servers': self.bootstrap_servers, 'group.id': self.consumer_group, 'auto.offset.reset': 'earliest', - # 'enable.auto.commit': False # 禁用自动提交,使用手动提交 - 'enable.auto.commit': True + #'enable.auto.commit': False # 禁用自动提交,使用手动提交 + 'enable.auto.commit': True, + 'max.poll.interval.ms': 600000 }) def delivery_report(self, err, msg): @@ -72,7 +74,7 @@ class KafkaClient: try: while True: - msg = self.consumer.poll(0.3) + msg = self.consumer.poll(0.1) if msg is None: continue if msg.error(): @@ -84,7 +86,9 @@ class KafkaClient: # 调用业务处理逻辑 # process_callback(msg.value().decode('utf-8')) # 调用业务处理逻辑,传递 user_nickname 和消息 - process_callback(msg.value().decode('utf-8')) + + # process_callback(msg.value().decode('utf-8')) + threading.Thread(target=process_callback, args=(msg.value().decode('utf-8'),)).start() # if process_callback(user_nickname, msg.value().decode('utf-8')): # # 如果返回 True,表示处理成功,可以提交偏移量 # try: diff --git a/resources/messages_resource.py b/resources/messages_resource.py index 7e07957..d6a4651 100644 --- a/resources/messages_resource.py +++ b/resources/messages_resource.py @@ -48,7 +48,7 @@ class MessagesResource(Resource): 10002: handle_10002_msg } # (扫码进群情况)判断受否是群聊,并添加到通信录, - if check_chatroom(from_wxid) or check_chatroom(to_wxid) or msg_push_content: + if check_chatroom(from_wxid) or check_chatroom(to_wxid): logger.info('群信息') chatroom_id=from_wxid ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)