From 3207258fd95f4118e4dd494827e15d5757ef295c Mon Sep 17 00:00:00 2001 From: lanvent Date: Fri, 7 Apr 2023 12:22:24 +0800 Subject: [PATCH] fix: check duplicate in wechatmp --- channel/wechatmp/wechatmp_channel.py | 42 ++++++++++++++++++---------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/channel/wechatmp/wechatmp_channel.py b/channel/wechatmp/wechatmp_channel.py index 3bf2df4..12878a7 100644 --- a/channel/wechatmp/wechatmp_channel.py +++ b/channel/wechatmp/wechatmp_channel.py @@ -7,6 +7,7 @@ import textwrap from channel.chat_channel import ChatChannel import channel.wechatmp.reply as reply import channel.wechatmp.receive as receive +from common.expired_dict import ExpiredDict from common.singleton import singleton from common.log import logger from config import conf @@ -37,7 +38,7 @@ class WechatMPChannel(ChatChannel): self.query1 = dict() self.query2 = dict() self.query3 = dict() - + self.received_msgs = ExpiredDict(60*60*24) def startup(self): urls = ( @@ -106,7 +107,9 @@ class SubsribeAccountQuery(): message_id = wechat_msg.msg_id logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message)) - + supported = True + if "【收到不支持的消息类型,暂无法显示】" in message: + supported = False # not supported, used to refresh cache_key = from_user reply_text = "" @@ -115,20 +118,28 @@ class SubsribeAccountQuery(): # The first query begin, reset the cache context = channel._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechat_msg) logger.debug("[wechatmp] context: {} {}".format(context, wechat_msg)) - if context: + if message_id in channel.received_msgs: # received and finished + return + if supported and context: # set private openai_api_key # if from_user is not changed in itchat, this can be placed at chat_channel user_data = conf().get_user_data(from_user) context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key + channel.received_msgs[message_id] = wechat_msg channel.running.add(cache_key) channel.produce(context) else: trigger_prefix = conf().get('single_chat_prefix',[''])[0] - if trigger_prefix: - content = textwrap.dedent(f"""\ - 请输入'{trigger_prefix}'接你想说的话跟我说话。 - 例如: - {trigger_prefix}你好,很高兴见到你。""") + if trigger_prefix or not supported: + if trigger_prefix: + content = textwrap.dedent(f"""\ + 请输入'{trigger_prefix}'接你想说的话跟我说话。 + 例如: + {trigger_prefix}你好,很高兴见到你。""") + else: + content = textwrap.dedent("""\ + 你好,很高兴见到你。 + 请跟我说话吧。""") else: logger.error(f"[wechatmp] unknown error") content = textwrap.dedent("""\ @@ -139,7 +150,7 @@ class SubsribeAccountQuery(): channel.query2[cache_key] = False channel.query3[cache_key] = False # Request again - elif cache_key in channel.running and channel.query1.get(cache_key) == True and channel.query2.get(cache_key) == True and channel.query3.get(cache_key) == True: + elif cache_key in channel.running: channel.query1[cache_key] = False #To improve waiting experience, this can be set to True. channel.query2[cache_key] = False #To improve waiting experience, this can be set to True. channel.query3[cache_key] = False @@ -149,6 +160,8 @@ class SubsribeAccountQuery(): channel.query2[cache_key] = True channel.query3[cache_key] = True + assert not (cache_key in channel.cache_dict and cache_key in channel.running) + if channel.query1.get(cache_key) == False: # The first query from wechat official server logger.debug("[wechatmp] query1 {}".format(cache_key)) @@ -159,7 +172,7 @@ class SubsribeAccountQuery(): time.sleep(0.1) if cnt == 45: # waiting for timeout (the POST query will be closed by wechat official server) - time.sleep(5) + time.sleep(1) # and do nothing return else: @@ -174,7 +187,7 @@ class SubsribeAccountQuery(): time.sleep(0.1) if cnt == 45: # waiting for timeout (the POST query will be closed by wechat official server) - time.sleep(5) + time.sleep(1) # and do nothing return else: @@ -198,9 +211,10 @@ class SubsribeAccountQuery(): pass if float(time.time()) - float(query_time) > 4.8: - logger.info("[wechatmp] Timeout for {} {}".format(from_user, message_id)) - time.sleep(1) - return + reply_text = "【正在思考中,回复任意文字尝试获取回复】" + logger.info("[wechatmp] Timeout for {} {}, return".format(from_user, message_id)) + replyPost = reply.TextMsg(from_user, to_user, reply_text).send() + return replyPost if cache_key in channel.cache_dict: content = channel.cache_dict[cache_key]