From d35d01f9808a7d4568594d3e37c578590af0e359 Mon Sep 17 00:00:00 2001 From: JS00000 Date: Fri, 7 Apr 2023 19:47:50 +0800 Subject: [PATCH] Add wechatmp_service channel --- app.py | 2 +- channel/channel_factory.py | 5 +- channel/wechatmp/ServiceAccount.py | 51 +++++ channel/wechatmp/SubscribeAccount.py | 144 ++++++++++++++ channel/wechatmp/common.py | 43 ++++ channel/wechatmp/wechatmp_channel.py | 282 +++++++++------------------ config.py | 6 +- 7 files changed, 336 insertions(+), 197 deletions(-) create mode 100644 channel/wechatmp/ServiceAccount.py create mode 100644 channel/wechatmp/SubscribeAccount.py create mode 100644 channel/wechatmp/common.py diff --git a/app.py b/app.py index 4e51ab9..989762b 100644 --- a/app.py +++ b/app.py @@ -28,7 +28,7 @@ def run(): # os.environ['WECHATY_PUPPET_SERVICE_ENDPOINT'] = '127.0.0.1:9001' channel = channel_factory.create_channel(channel_name) - if channel_name in ['wx','wxy','wechatmp']: + if channel_name in ['wx','wxy','wechatmp','wechatmp_service']: PluginManager().load_plugins() # startup channel diff --git a/channel/channel_factory.py b/channel/channel_factory.py index 3303ded..e272206 100644 --- a/channel/channel_factory.py +++ b/channel/channel_factory.py @@ -19,5 +19,8 @@ def create_channel(channel_type): return TerminalChannel() elif channel_type == 'wechatmp': from channel.wechatmp.wechatmp_channel import WechatMPChannel - return WechatMPChannel() + return WechatMPChannel(passive_reply = True) + elif channel_type == 'wechatmp_service': + from channel.wechatmp.wechatmp_channel import WechatMPChannel + return WechatMPChannel(passive_reply = False) raise RuntimeError diff --git a/channel/wechatmp/ServiceAccount.py b/channel/wechatmp/ServiceAccount.py new file mode 100644 index 0000000..db9dff3 --- /dev/null +++ b/channel/wechatmp/ServiceAccount.py @@ -0,0 +1,51 @@ +import web +import time +import channel.wechatmp.reply as reply +import channel.wechatmp.receive as receive +from config import conf +from common.log import logger +from bridge.context import * +from channel.wechatmp.common import * +from channel.wechatmp.wechatmp_channel import WechatMPChannel + +# This class is instantiated once per query +class Query(): + + def GET(self): + return verify_server(web.input()) + + def POST(self): + # Make sure to return the instance that first created, @singleton will do that. + channel_instance = WechatMPChannel() + try: + webData = web.data() + # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8")) + wechatmp_msg = receive.parse_xml(webData) + if wechatmp_msg.msg_type == 'text': + from_user = wechatmp_msg.from_user_id + message = wechatmp_msg.content.decode("utf-8") + message_id = wechatmp_msg.msg_id + + logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message)) + context = channel_instance._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechatmp_msg) + if context: + # set private openai_api_key + # if from_user is not changed in itchat, this can be placed at chat_channel + user_data = conf().get_user_data(from_user) + context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key + channel_instance.produce(context) + # The reply will be sent by channel_instance.send() in another thread + return "success" + + elif wechatmp_msg.msg_type == 'event': + logger.info("[wechatmp] Event {} from {}".format(wechatmp_msg.Event, wechatmp_msg.from_user_id)) + content = subscribe_msg() + replyMsg = reply.TextMsg(wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content) + return replyMsg.send() + else: + logger.info("暂且不处理") + return "success" + except Exception as exc: + logger.exception(exc) + return exc + diff --git a/channel/wechatmp/SubscribeAccount.py b/channel/wechatmp/SubscribeAccount.py new file mode 100644 index 0000000..9035887 --- /dev/null +++ b/channel/wechatmp/SubscribeAccount.py @@ -0,0 +1,144 @@ +import web +import time +import channel.wechatmp.reply as reply +import channel.wechatmp.receive as receive +from config import conf +from common.log import logger +from bridge.context import * +from channel.wechatmp.common import * +from channel.wechatmp.wechatmp_channel import WechatMPChannel + +# This class is instantiated once per query +class Query(): + + def GET(self): + return verify_server(web.input()) + + def POST(self): + # Make sure to return the instance that first created, @singleton will do that. + channel_instance = WechatMPChannel() + try: + query_time = time.time() + webData = web.data() + # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8")) + wechatmp_msg = receive.parse_xml(webData) + if wechatmp_msg.msg_type == 'text': + from_user = wechatmp_msg.from_user_id + to_user = wechatmp_msg.to_user_id + message = wechatmp_msg.content.decode("utf-8") + message_id = wechatmp_msg.msg_id + + logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message)) + + cache_key = from_user + cache = channel_instance.cache_dict.get(cache_key) + + reply_text = "" + # New request + if cache == None: + # The first query begin, reset the cache + channel_instance.cache_dict[cache_key] = (0, "") + + context = channel_instance._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechatmp_msg) + if context: + # set private openai_api_key + # if from_user is not changed in itchat, this can be placed at chat_channel + user_data = conf().get_user_data(from_user) + context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key + channel_instance.produce(context) + + + channel_instance.query1[cache_key] = False + channel_instance.query2[cache_key] = False + channel_instance.query3[cache_key] = False + # Request again + elif cache[0] == 0 and channel_instance.query1.get(cache_key) == True and channel_instance.query2.get(cache_key) == True and channel_instance.query3.get(cache_key) == True: + channel_instance.query1[cache_key] = False #To improve waiting experience, this can be set to True. + channel_instance.query2[cache_key] = False #To improve waiting experience, this can be set to True. + channel_instance.query3[cache_key] = False + elif cache[0] >= 1: + # Skip the waiting phase + channel_instance.query1[cache_key] = True + channel_instance.query2[cache_key] = True + channel_instance.query3[cache_key] = True + + + cache = channel_instance.cache_dict.get(cache_key) + if channel_instance.query1.get(cache_key) == False: + # The first query from wechat official server + logger.debug("[wechatmp] query1 {}".format(cache_key)) + channel_instance.query1[cache_key] = True + cnt = 0 + while cache[0] == 0 and cnt < 45: + cnt = cnt + 1 + time.sleep(0.1) + cache = channel_instance.cache_dict.get(cache_key) + if cnt == 45: + # waiting for timeout (the POST query will be closed by wechat official server) + time.sleep(5) + # and do nothing + return + else: + pass + elif channel_instance.query2.get(cache_key) == False: + # The second query from wechat official server + logger.debug("[wechatmp] query2 {}".format(cache_key)) + channel_instance.query2[cache_key] = True + cnt = 0 + while cache[0] == 0 and cnt < 45: + cnt = cnt + 1 + time.sleep(0.1) + cache = channel_instance.cache_dict.get(cache_key) + if cnt == 45: + # waiting for timeout (the POST query will be closed by wechat official server) + time.sleep(5) + # and do nothing + return + else: + pass + elif channel_instance.query3.get(cache_key) == False: + # The third query from wechat official server + logger.debug("[wechatmp] query3 {}".format(cache_key)) + channel_instance.query3[cache_key] = True + cnt = 0 + while cache[0] == 0 and cnt < 45: + cnt = cnt + 1 + time.sleep(0.1) + cache = channel_instance.cache_dict.get(cache_key) + if cnt == 45: + # Have waiting for 3x5 seconds + # return timeout message + reply_text = "【正在响应中,回复任意文字尝试获取回复】" + logger.info("[wechatmp] Three queries has finished For {}: {}".format(from_user, message_id)) + replyPost = reply.TextMsg(from_user, to_user, reply_text).send() + return replyPost + else: + pass + + if float(time.time()) - float(query_time) > 4.8: + logger.info("[wechatmp] Timeout for {} {}".format(from_user, message_id)) + return + + + if cache[0] > 1: + reply_text = cache[1][:600] + "\n【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit + channel_instance.cache_dict[cache_key] = (cache[0] - 1, cache[1][600:]) + elif cache[0] == 1: + reply_text = cache[1] + channel_instance.cache_dict.pop(cache_key) + logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text)) + replyPost = reply.TextMsg(from_user, to_user, reply_text).send() + return replyPost + + elif wechatmp_msg.msg_type == 'event': + logger.info("[wechatmp] Event {} from {}".format(wechatmp_msg.Event, wechatmp_msg.from_user_id)) + content = subscribe_msg() + replyMsg = reply.TextMsg(wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content) + return replyMsg.send() + else: + logger.info("暂且不处理") + return "success" + except Exception as exc: + logger.exception(exc) + return exc + diff --git a/channel/wechatmp/common.py b/channel/wechatmp/common.py new file mode 100644 index 0000000..6a24ef2 --- /dev/null +++ b/channel/wechatmp/common.py @@ -0,0 +1,43 @@ +from config import conf +import hashlib +import textwrap + + +class WeChatAPIException(Exception): + pass + +def verify_server(data): + try: + if len(data) == 0: + return "None" + signature = data.signature + timestamp = data.timestamp + nonce = data.nonce + echostr = data.echostr + token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写 + + data_list = [token, timestamp, nonce] + data_list.sort() + sha1 = hashlib.sha1() + # map(sha1.update, data_list) #python2 + sha1.update("".join(data_list).encode('utf-8')) + hashcode = sha1.hexdigest() + print("handle/GET func: hashcode, signature: ", hashcode, signature) + if hashcode == signature: + return echostr + else: + return "" + except Exception as Argument: + return Argument + +def subscribe_msg(): + msg = textwrap.dedent("""\ + 感谢您的关注! + 这里是ChatGPT,可以自由对话。 + 资源有限,回复较慢,请勿着急。 + 支持通用表情输入。 + 暂时不支持图片输入。 + 支持图片输出,画字开头的问题将回复图片或链接。 + 支持角色扮演和文字冒险两种定制模式对话。 + 输入'#帮助' 查看详细指令。""") + return msg \ No newline at end of file diff --git a/channel/wechatmp/wechatmp_channel.py b/channel/wechatmp/wechatmp_channel.py index cdd8673..36e9297 100644 --- a/channel/wechatmp/wechatmp_channel.py +++ b/channel/wechatmp/wechatmp_channel.py @@ -1,19 +1,18 @@ # -*- coding: utf-8 -*- +import sys import web -import time import math -import hashlib -import textwrap -from channel.chat_channel import ChatChannel -import channel.wechatmp.reply as reply -import channel.wechatmp.receive as receive +import time +import json +import requests +import threading from common.singleton import singleton from common.log import logger from config import conf from bridge.reply import * from bridge.context import * -from plugins import * -import traceback +from channel.chat_channel import ChatChannel +from channel.wechatmp.common import * # If using SSL, uncomment the following lines, and modify the certificate path. # from cheroot.server import HTTPServer @@ -22,199 +21,96 @@ import traceback # certificate='/ssl/cert.pem', # private_key='/ssl/cert.key') - -# from concurrent.futures import ThreadPoolExecutor -# thread_pool = ThreadPoolExecutor(max_workers=8) - @singleton class WechatMPChannel(ChatChannel): - def __init__(self): + def __init__(self, passive_reply = True): super().__init__() - self.cache_dict = dict() - self.query1 = dict() - self.query2 = dict() - self.query3 = dict() - + self.passive_reply = passive_reply + if self.passive_reply: + self.cache_dict = dict() + self.query1 = dict() + self.query2 = dict() + self.query3 = dict() + else: + self.app_id = conf().get('wechatmp_app_id') + self.app_secret = conf().get('wechatmp_app_secret') + self.access_token = None + self.access_token_expires_time = 0 + self.access_token_lock = threading.Lock() + self.get_access_token() def startup(self): - urls = ( - '/wx', 'SubsribeAccountQuery', - ) + if self.passive_reply: + urls = ('/wx', 'channel.wechatmp.SubscribeAccount.Query') + else: + urls = ('/wx', 'channel.wechatmp.ServiceAccount.Query') app = web.application(urls, globals()) app.run() - def send(self, reply: Reply, context: Context): - reply_cnt = math.ceil(len(reply.content) / 600) - receiver = context["receiver"] - self.cache_dict[receiver] = (reply_cnt, reply.content) - logger.debug("[send] reply to {} saved to cache: {}".format(receiver, reply)) - - -def verify_server(): - try: - data = web.input() - if len(data) == 0: - return "None" - signature = data.signature - timestamp = data.timestamp - nonce = data.nonce - echostr = data.echostr - token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写 - - data_list = [token, timestamp, nonce] - data_list.sort() - sha1 = hashlib.sha1() - # map(sha1.update, data_list) #python2 - sha1.update("".join(data_list).encode('utf-8')) - hashcode = sha1.hexdigest() - print("handle/GET func: hashcode, signature: ", hashcode, signature) - if hashcode == signature: - return echostr + def wechatmp_request(self, method, url, **kwargs): + r = requests.request(method=method, url=url, **kwargs) + r.raise_for_status() + r.encoding = "utf-8" + ret = r.json() + if "errcode" in ret and ret["errcode"] != 0: + raise WeChatAPIException("{}".format(ret)) + return ret + + def get_access_token(self): + + # return the access_token + if self.access_token: + if self.access_token_expires_time - time.time() > 60: + return self.access_token + + # Get new access_token + # Do not request access_token in parallel! Only the last obtained is valid. + if self.access_token_lock.acquire(blocking=False): + # Wait for other threads that have previously obtained access_token to complete the request + # This happens every 2 hours, so it doesn't affect the experience very much + time.sleep(1) + self.access_token = None + url="https://api.weixin.qq.com/cgi-bin/token" + params={ + "grant_type": "client_credential", + "appid": self.app_id, + "secret": self.app_secret + } + data = self.wechatmp_request(method='get', url=url, params=params) + self.access_token = data['access_token'] + self.access_token_expires_time = int(time.time()) + data['expires_in'] + logger.info("[wechatmp] access_token: {}".format(self.access_token)) + self.access_token_lock.release() else: - return "" - except Exception as Argument: - return Argument - - -# This class is instantiated once per query -class SubsribeAccountQuery(): - - def GET(self): - return verify_server() - - def POST(self): - channel_instance = WechatMPChannel() - try: - query_time = time.time() - webData = web.data() - # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8")) - wechat_msg = receive.parse_xml(webData) - if wechat_msg.msg_type == 'text': - from_user = wechat_msg.from_user_id - to_user = wechat_msg.to_user_id - message = wechat_msg.content.decode("utf-8") - message_id = wechat_msg.msg_id - - logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message)) - - cache_key = from_user - cache = channel_instance.cache_dict.get(cache_key) - - reply_text = "" - # New request - if cache == None: - # The first query begin, reset the cache - channel_instance.cache_dict[cache_key] = (0, "") - - context = channel_instance._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechat_msg) - if context: - # set private openai_api_key - # if from_user is not changed in itchat, this can be placed at chat_channel - user_data = conf().get_user_data(from_user) - context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key - channel_instance.produce(context) - - - channel_instance.query1[cache_key] = False - channel_instance.query2[cache_key] = False - channel_instance.query3[cache_key] = False - # Request again - elif cache[0] == 0 and channel_instance.query1.get(cache_key) == True and channel_instance.query2.get(cache_key) == True and channel_instance.query3.get(cache_key) == True: - channel_instance.query1[cache_key] = False #To improve waiting experience, this can be set to True. - channel_instance.query2[cache_key] = False #To improve waiting experience, this can be set to True. - channel_instance.query3[cache_key] = False - elif cache[0] >= 1: - # Skip the waiting phase - channel_instance.query1[cache_key] = True - channel_instance.query2[cache_key] = True - channel_instance.query3[cache_key] = True - - - cache = channel_instance.cache_dict.get(cache_key) - if channel_instance.query1.get(cache_key) == False: - # The first query from wechat official server - logger.debug("[wechatmp] query1 {}".format(cache_key)) - channel_instance.query1[cache_key] = True - cnt = 0 - while cache[0] == 0 and cnt < 45: - cnt = cnt + 1 - time.sleep(0.1) - cache = channel_instance.cache_dict.get(cache_key) - if cnt == 45: - # waiting for timeout (the POST query will be closed by wechat official server) - time.sleep(5) - # and do nothing - return - else: - pass - elif channel_instance.query2.get(cache_key) == False: - # The second query from wechat official server - logger.debug("[wechatmp] query2 {}".format(cache_key)) - channel_instance.query2[cache_key] = True - cnt = 0 - while cache[0] == 0 and cnt < 45: - cnt = cnt + 1 - time.sleep(0.1) - cache = channel_instance.cache_dict.get(cache_key) - if cnt == 45: - # waiting for timeout (the POST query will be closed by wechat official server) - time.sleep(5) - # and do nothing - return - else: - pass - elif channel_instance.query3.get(cache_key) == False: - # The third query from wechat official server - logger.debug("[wechatmp] query3 {}".format(cache_key)) - channel_instance.query3[cache_key] = True - cnt = 0 - while cache[0] == 0 and cnt < 45: - cnt = cnt + 1 - time.sleep(0.1) - cache = channel_instance.cache_dict.get(cache_key) - if cnt == 45: - # Have waiting for 3x5 seconds - # return timeout message - reply_text = "【正在响应中,回复任意文字尝试获取回复】" - logger.info("[wechatmp] Three queries has finished For {}: {}".format(from_user, message_id)) - replyPost = reply.TextMsg(from_user, to_user, reply_text).send() - return replyPost - else: - pass - - if float(time.time()) - float(query_time) > 4.8: - logger.info("[wechatmp] Timeout for {} {}".format(from_user, message_id)) - return - - - if cache[0] > 1: - reply_text = cache[1][:600] + "\n【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit - channel_instance.cache_dict[cache_key] = (cache[0] - 1, cache[1][600:]) - elif cache[0] == 1: - reply_text = cache[1] - channel_instance.cache_dict.pop(cache_key) - logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text)) - replyPost = reply.TextMsg(from_user, to_user, reply_text).send() - return replyPost - - elif wechat_msg.msg_type == 'event': - logger.info("[wechatmp] Event {} from {}".format(wechat_msg.Event, wechat_msg.from_user_id)) - content = textwrap.dedent("""\ - 感谢您的关注! - 这里是ChatGPT,可以自由对话。 - 资源有限,回复较慢,请勿着急。 - 支持通用表情输入。 - 暂时不支持图片输入。 - 支持图片输出,画字开头的问题将回复图片链接。 - 支持角色扮演和文字冒险两种定制模式对话。 - 输入'#帮助' 查看详细指令。""") - replyMsg = reply.TextMsg(wechat_msg.from_user_id, wechat_msg.to_user_id, content) - return replyMsg.send() - else: - logger.info("暂且不处理") - return "success" - except Exception as exc: - logger.exception(exc) - return exc + # Wait for token update + while self.access_token_lock.locked(): + time.sleep(0.1) + return self.access_token + def send(self, reply: Reply, context: Context): + if self.passive_reply: + receiver = context["receiver"] + reply_text = reply.content + reply_cnt = math.ceil(len(reply_text) / 600) + self.cache_dict[receiver] = (reply_cnt, reply_text) + logger.debug("[send] reply to {} saved to cache: {}".format(receiver, reply_text)) + else: + receiver = context["receiver"] + reply_text = reply.content + url="https://api.weixin.qq.com/cgi-bin/message/custom/send" + params = { + "access_token": self.get_access_token() + } + json_data = { + "touser": receiver, + "msgtype": "text", + "text": {"content": reply_text} + } + self.wechatmp_request(method='post', url=url, params=params, data=json.dumps(json_data, ensure_ascii=False).encode('utf8')) + logger.info("[send] Do send to {}: {}".format(receiver, reply_text)) + return + +# Last import to avoid circular import +import channel.wechatmp.SubscribeAccount +import channel.wechatmp.ServiceAccount diff --git a/config.py b/config.py index 13f5a10..2ef57af 100644 --- a/config.py +++ b/config.py @@ -79,13 +79,15 @@ available_setting = { "wechaty_puppet_service_token": "", # wechaty的token # wechatmp的配置 - "wechatmp_token": "", # 微信公众平台的Token + "wechatmp_token": "", # 微信公众平台的Token + "wechatmp_app_id": "", # 微信公众平台的appID + "wechatmp_app_secret": "", # 微信公众平台的appsecret # chatgpt指令自定义触发词 "clear_memory_commands": ['#清除记忆'], # 重置会话指令,必须以#开头 # channel配置 - "channel_type": "wx", # 通道类型,支持:{wx,wxy,terminal,wechatmp} + "channel_type": "wx", # 通道类型,支持:{wx,wxy,terminal,wechatmp,wechatmp_service} "debug": False, # 是否开启debug模式,开启后会打印更多日志