diff --git a/app.py b/app.py index 637b6e4..e11f46c 100644 --- a/app.py +++ b/app.py @@ -19,7 +19,6 @@ def sigterm_handler_wrap(_signo): if callable(old_handler): # check old_handler return old_handler(_signo, _stack_frame) sys.exit(0) - signal.signal(_signo, func) diff --git a/channel/wechatmp/README.md b/channel/wechatmp/README.md index 69b8037..219d276 100644 --- a/channel/wechatmp/README.md +++ b/channel/wechatmp/README.md @@ -1,9 +1,7 @@ # 微信公众号channel 鉴于个人微信号在服务器上通过itchat登录有封号风险,这里新增了微信公众号channel,提供无风险的服务。 -目前支持订阅号(个人)和服务号(企业)两种类型的公众号,它们的主要区别就是被动回复和主动回复。 -个人微信订阅号有许多接口限制,目前仅支持最基本的文本对话和语音输入,支持加载插件,支持私有api_key。 -暂未实现图片输入输出、语音输出等交互形式。 +目前支持订阅号和服务号两种类型的公众号。个人主体的微信订阅号由于无法通过微信认证,接口存在限制,目前仅支持最基本的文本交互和语音输入。通过微信认证的订阅号或者服务号可以回复图片和语音。 ## 使用方法(订阅号,服务号类似) @@ -21,37 +19,79 @@ pip3 install web.py 相关的服务器验证代码已经写好,你不需要再添加任何代码。你只需要在本项目根目录的`config.json`中添加 ``` -"channel_type": "wechatmp", -"wechatmp_token": "Token", # 微信公众平台的Token -"wechatmp_port": 8080, # 微信公众平台的端口,需要端口转发到80或443 -"wechatmp_app_id": "", # 微信公众平台的appID,仅服务号需要 -"wechatmp_app_secret": "", # 微信公众平台的appsecret,仅服务号需要 +"channel_type": "wechatmp", # 如果通过了微信认证,将"wechatmp"替换为"wechatmp_service",可极大的优化使用体验 +"wechatmp_token": "xxxx", # 微信公众平台的Token +"wechatmp_port": 8080, # 微信公众平台的端口,需要端口转发到80或443 +"wechatmp_app_id": "xxxx", # 微信公众平台的appID +"wechatmp_app_secret": "xxxx", # 微信公众平台的appsecret +"single_chat_prefix": [""], # 推荐设置,任意对话都可以触发回复,不添加前缀 +"single_chat_reply_prefix": "", # 推荐设置,回复不设置前缀 +"plugin_trigger_prefix": "&", # 推荐设置,在手机微信客户端中,$%^等符号与中文连在一起时会自动显示一段较大的间隔,用户体验不好。请不要使用管理员指令前缀"#",这会造成未知问题。 ``` -然后运行`python3 app.py`启动web服务器。这里会默认监听8080端口,但是微信公众号的服务器配置只支持80/443端口,有两种方法来解决这个问题。第一个是推荐的方法,使用端口转发命令将80端口转发到8080端口(443同理,注意需要支持SSL,也就是https的访问,在`wechatmp_channel.py`需要修改相应的证书路径): +然后运行`python3 app.py`启动web服务器。这里会默认监听8080端口,但是微信公众号的服务器配置只支持80/443端口,有两种方法来解决这个问题。第一个是推荐的方法,使用端口转发命令将80端口转发到8080端口: ``` sudo iptables -t nat -A PREROUTING -p tcp --dport 80 -j REDIRECT --to-port 8080 sudo iptables-save > /etc/iptables/rules.v4 ``` -第二个方法是让python程序直接监听80端口。这样可能会导致权限问题,在linux上需要使用`sudo`。然而这会导致后续缓存文件的权限问题,因此不是推荐的方法。 -最后在刚才的“服务器配置”中点击`提交`即可验证你的服务器。 +第二个方法是让python程序直接监听80端口,在配置文件中设置`"wechatmp_port": 80` ,在linux上需要使用`sudo python3 app.py`启动程序。然而这会导致一系列环境和权限问题,因此不是推荐的方法。 +443端口同理,注意需要支持SSL,也就是https的访问,在`wechatmp_channel.py`中需要修改相应的证书路径。 + +程序启动并监听端口后,在刚才的“服务器配置”中点击`提交`即可验证你的服务器。 随后在[微信公众平台](https://mp.weixin.qq.com)启用服务器,关闭手动填写规则的自动回复,即可实现ChatGPT的自动回复。 +如果在启用后如果遇到如下报错: +``` +'errcode': 40164, 'errmsg': 'invalid ip xx.xx.xx.xx not in whitelist rid +``` + +需要在公众号开发信息下将IP加入到IP白名单。 + ## 个人微信公众号的限制 由于人微信公众号不能通过微信认证,所以没有客服接口,因此公众号无法主动发出消息,只能被动回复。而微信官方对被动回复有5秒的时间限制,最多重试2次,因此最多只有15秒的自动回复时间窗口。因此如果问题比较复杂或者我们的服务器比较忙,ChatGPT的回答就没办法及时回复给用户。为了解决这个问题,这里做了回答缓存,它需要你在回复超时后,再次主动发送任意文字(例如1)来尝试拿到回答缓存。为了优化使用体验,目前设置了两分钟(120秒)的timeout,用户在至多两分钟后即可得到查询到回复或者错误原因。 -另外,由于微信官方的限制,自动回复有长度限制。因此这里将ChatGPT的回答拆分,分成每段600字回复(限制大约在700字)。 +另外,由于微信官方的限制,自动回复有长度限制。因此这里将ChatGPT的回答进行了拆分,以满足限制。 ## 私有api_key -公共api有访问频率限制(免费账号每分钟最多20次ChatGPT的API调用),这在服务多人的时候会遇到问题。因此这里多加了一个设置私有api_key的功能。目前通过godcmd插件的命令来设置私有api_key。 +公共api有访问频率限制(免费账号每分钟最多3次ChatGPT的API调用),这在服务多人的时候会遇到问题。因此这里多加了一个设置私有api_key的功能。目前通过godcmd插件的命令来设置私有api_key。 ## 语音输入 利用微信自带的语音识别功能,提供语音输入能力。需要在公众号管理页面的“设置与开发”->“接口权限”页面开启“接收语音识别结果”。 -## 测试范围 -目前在`RoboStyle`这个公众号上进行了测试(基于[wechatmp分支](https://github.com/JS00000/chatgpt-on-wechat/tree/wechatmp)),感兴趣的可以关注并体验。开启了godcmd, Banwords, role, dungeon, finish这五个插件,其他的插件还没有测试。百度的接口暂未测试。语音对话没有测试。图片直接以链接形式回复(没有临时素材上传接口的权限)。 +## 语音回复 +请在配置文件中添加以下词条: +``` + "voice_reply_voice": true, +``` +这样公众号将会用语音回复语音消息,实现语音对话。 + +默认的语音合成引擎是`google`,它是免费使用的。 + +如果要选择其他的语音合成引擎,请添加以下配置项: +``` +"text_to_voice": "pytts" +``` + +pytts是本地的语音合成引擎。还支持baidu,azure,这些你需要自行配置相关的依赖和key。 + +如果使用pytts,在ubuntu上需要安装如下依赖: +``` +sudo apt update +sudo apt install espeak +sudo apt install ffmpeg +python3 -m pip install pyttsx3 +``` +不是很建议开启pytts语音回复,因为它是离线本地计算,算的慢会拖垮服务器,且声音不好听。 + +## 图片回复 +现在认证公众号和非认证公众号都可以实现的图片和语音回复。但是非认证公众号使用了永久素材接口,每天有1000次的调用上限(每个月有10次重置机会,程序中已设定遇到上限会自动重置),且永久素材库存也有上限。因此对于非认证公众号,我们会在回复图片或者语音消息后的10秒内从永久素材库存内删除该素材。 + +## 测试 +目前在`RoboStyle`这个公众号上进行了测试(基于[wechatmp分支](https://github.com/JS00000/chatgpt-on-wechat/tree/wechatmp)),感兴趣的可以关注并体验。开启了godcmd, Banwords, role, dungeon, finish这五个插件,其他的插件还没有详尽测试。百度的接口暂未测试。[wechatmp-stable分支](https://github.com/JS00000/chatgpt-on-wechat/tree/wechatmp-stable)是较稳定的上个版本,但也缺少最新的功能支持。 ## TODO -* 服务号交互完善 -* 服务号使用临时素材接口,提供图片回复能力 -* 插件测试 + - [x] 语音输入 + - [ ] 图片输入 + - [x] 使用临时素材接口提供认证公众号的图片和语音回复 + - [x] 使用永久素材接口提供未认证公众号的图片和语音回复 + - [ ] 高并发支持 diff --git a/channel/wechatmp/SubscribeAccount.py b/channel/wechatmp/SubscribeAccount.py deleted file mode 100644 index 8eeedb4..0000000 --- a/channel/wechatmp/SubscribeAccount.py +++ /dev/null @@ -1,232 +0,0 @@ -import time - -import web - -import channel.wechatmp.receive as receive -import channel.wechatmp.reply as reply -from bridge.context import * -from channel.wechatmp.common import * -from channel.wechatmp.wechatmp_channel import WechatMPChannel -from common.log import logger -from config import conf - - -# This class is instantiated once per query -class Query: - def GET(self): - return verify_server(web.input()) - - def POST(self): - # Make sure to return the instance that first created, @singleton will do that. - channel = WechatMPChannel() - try: - query_time = time.time() - webData = web.data() - logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8")) - wechatmp_msg = receive.parse_xml(webData) - if wechatmp_msg.msg_type == "text" or wechatmp_msg.msg_type == "voice": - from_user = wechatmp_msg.from_user_id - to_user = wechatmp_msg.to_user_id - message = wechatmp_msg.content.decode("utf-8") - message_id = wechatmp_msg.msg_id - - logger.info( - "[wechatmp] {}:{} Receive post query {} {}: {}".format( - web.ctx.env.get("REMOTE_ADDR"), - web.ctx.env.get("REMOTE_PORT"), - from_user, - message_id, - message, - ) - ) - supported = True - if "【收到不支持的消息类型,暂无法显示】" in message: - supported = False # not supported, used to refresh - cache_key = from_user - - reply_text = "" - # New request - if ( - cache_key not in channel.cache_dict - and cache_key not in channel.running - ): - # The first query begin, reset the cache - context = channel._compose_context( - ContextType.TEXT, message, isgroup=False, msg=wechatmp_msg - ) - logger.debug( - "[wechatmp] context: {} {}".format(context, wechatmp_msg) - ) - if message_id in channel.received_msgs: # received and finished - # no return because of bandwords or other reasons - return "success" - if supported and context: - # set private openai_api_key - # if from_user is not changed in itchat, this can be placed at chat_channel - user_data = conf().get_user_data(from_user) - context["openai_api_key"] = user_data.get( - "openai_api_key" - ) # None or user openai_api_key - channel.received_msgs[message_id] = wechatmp_msg - channel.running.add(cache_key) - channel.produce(context) - else: - trigger_prefix = conf().get("single_chat_prefix", [""])[0] - if trigger_prefix or not supported: - if trigger_prefix: - content = textwrap.dedent( - f"""\ - 请输入'{trigger_prefix}'接你想说的话跟我说话。 - 例如: - {trigger_prefix}你好,很高兴见到你。""" - ) - else: - content = textwrap.dedent( - """\ - 你好,很高兴见到你。 - 请跟我说话吧。""" - ) - else: - logger.error(f"[wechatmp] unknown error") - content = textwrap.dedent( - """\ - 未知错误,请稍后再试""" - ) - replyMsg = reply.TextMsg( - wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content - ) - return replyMsg.send() - channel.query1[cache_key] = False - channel.query2[cache_key] = False - channel.query3[cache_key] = False - # User request again, and the answer is not ready - elif ( - cache_key in channel.running - and channel.query1.get(cache_key) == True - and channel.query2.get(cache_key) == True - and channel.query3.get(cache_key) == True - ): - channel.query1[ - cache_key - ] = False # To improve waiting experience, this can be set to True. - channel.query2[ - cache_key - ] = False # To improve waiting experience, this can be set to True. - channel.query3[cache_key] = False - # User request again, and the answer is ready - elif cache_key in channel.cache_dict: - # Skip the waiting phase - channel.query1[cache_key] = True - channel.query2[cache_key] = True - channel.query3[cache_key] = True - - assert not ( - cache_key in channel.cache_dict and cache_key in channel.running - ) - - if channel.query1.get(cache_key) == False: - # The first query from wechat official server - logger.debug("[wechatmp] query1 {}".format(cache_key)) - channel.query1[cache_key] = True - cnt = 0 - while cache_key in channel.running and cnt < 45: - cnt = cnt + 1 - time.sleep(0.1) - if cnt == 45: - # waiting for timeout (the POST query will be closed by wechat official server) - time.sleep(1) - # and do nothing - return - else: - pass - elif channel.query2.get(cache_key) == False: - # The second query from wechat official server - logger.debug("[wechatmp] query2 {}".format(cache_key)) - channel.query2[cache_key] = True - cnt = 0 - while cache_key in channel.running and cnt < 45: - cnt = cnt + 1 - time.sleep(0.1) - if cnt == 45: - # waiting for timeout (the POST query will be closed by wechat official server) - time.sleep(1) - # and do nothing - return - else: - pass - elif channel.query3.get(cache_key) == False: - # The third query from wechat official server - logger.debug("[wechatmp] query3 {}".format(cache_key)) - channel.query3[cache_key] = True - cnt = 0 - while cache_key in channel.running and cnt < 40: - cnt = cnt + 1 - time.sleep(0.1) - if cnt == 40: - # Have waiting for 3x5 seconds - # return timeout message - reply_text = "【正在思考中,回复任意文字尝试获取回复】" - logger.info( - "[wechatmp] Three queries has finished For {}: {}".format( - from_user, message_id - ) - ) - replyPost = reply.TextMsg(from_user, to_user, reply_text).send() - return replyPost - else: - pass - - if ( - cache_key not in channel.cache_dict - and cache_key not in channel.running - ): - # no return because of bandwords or other reasons - return "success" - - # if float(time.time()) - float(query_time) > 4.8: - # reply_text = "【正在思考中,回复任意文字尝试获取回复】" - # logger.info("[wechatmp] Timeout for {} {}, return".format(from_user, message_id)) - # replyPost = reply.TextMsg(from_user, to_user, reply_text).send() - # return replyPost - - if cache_key in channel.cache_dict: - content = channel.cache_dict[cache_key] - if len(content.encode("utf8")) <= MAX_UTF8_LEN: - reply_text = channel.cache_dict[cache_key] - channel.cache_dict.pop(cache_key) - else: - continue_text = "\n【未完待续,回复任意文字以继续】" - splits = split_string_by_utf8_length( - content, - MAX_UTF8_LEN - len(continue_text.encode("utf-8")), - max_split=1, - ) - reply_text = splits[0] + continue_text - channel.cache_dict[cache_key] = splits[1] - logger.info( - "[wechatmp] {}:{} Do send {}".format( - web.ctx.env.get("REMOTE_ADDR"), - web.ctx.env.get("REMOTE_PORT"), - reply_text, - ) - ) - replyPost = reply.TextMsg(from_user, to_user, reply_text).send() - return replyPost - - elif wechatmp_msg.msg_type == "event": - logger.info( - "[wechatmp] Event {} from {}".format( - wechatmp_msg.content, wechatmp_msg.from_user_id - ) - ) - content = subscribe_msg() - replyMsg = reply.TextMsg( - wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content - ) - return replyMsg.send() - else: - logger.info("暂且不处理") - return "success" - except Exception as exc: - logger.exception(exc) - return exc diff --git a/channel/wechatmp/ServiceAccount.py b/channel/wechatmp/active_reply.py similarity index 75% rename from channel/wechatmp/ServiceAccount.py rename to channel/wechatmp/active_reply.py index 699581d..d8a8dde 100644 --- a/channel/wechatmp/ServiceAccount.py +++ b/channel/wechatmp/active_reply.py @@ -2,9 +2,10 @@ import time import web -import channel.wechatmp.receive as receive -import channel.wechatmp.reply as reply +from channel.wechatmp.wechatmp_message import parse_xml +from channel.wechatmp.passive_reply_message import TextMsg from bridge.context import * +from bridge.reply import ReplyType from channel.wechatmp.common import * from channel.wechatmp.wechatmp_channel import WechatMPChannel from common.log import logger @@ -22,10 +23,14 @@ class Query: try: webData = web.data() # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8")) - wechatmp_msg = receive.parse_xml(webData) - if wechatmp_msg.msg_type == "text" or wechatmp_msg.msg_type == "voice": + wechatmp_msg = parse_xml(webData) + if ( + wechatmp_msg.msg_type == "text" + or wechatmp_msg.msg_type == "voice" + # or wechatmp_msg.msg_type == "image" + ): from_user = wechatmp_msg.from_user_id - message = wechatmp_msg.content.decode("utf-8") + message = wechatmp_msg.content message_id = wechatmp_msg.msg_id logger.info( @@ -37,8 +42,12 @@ class Query: message, ) ) + if (wechatmp_msg.msg_type == "voice" and conf().get("voice_reply_voice") == True): + rtype = ReplyType.VOICE + else: + rtype = None context = channel._compose_context( - ContextType.TEXT, message, isgroup=False, msg=wechatmp_msg + ContextType.TEXT, message, isgroup=False, desire_rtype=rtype, msg=wechatmp_msg ) if context: # set private openai_api_key @@ -58,7 +67,7 @@ class Query: ) ) content = subscribe_msg() - replyMsg = reply.TextMsg( + replyMsg = TextMsg( wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content ) return replyMsg.send() diff --git a/channel/wechatmp/common.py b/channel/wechatmp/common.py index 192b86b..5efccfc 100644 --- a/channel/wechatmp/common.py +++ b/channel/wechatmp/common.py @@ -36,16 +36,16 @@ def verify_server(data): def subscribe_msg(): - trigger_prefix = conf().get("single_chat_prefix", [""])[0] + trigger_prefix = conf().get("single_chat_prefix", [""]) msg = textwrap.dedent( f"""\ 感谢您的关注! 这里是ChatGPT,可以自由对话。 资源有限,回复较慢,请勿着急。 - 支持通用表情输入。 + 支持语音对话。 暂时不支持图片输入。 - 支持图片输出,画字开头的问题将回复图片链接。 - 支持角色扮演和文字冒险两种定制模式对话。 + 支持图片输出,画字开头的消息将按要求创作图片。 + 支持tool、角色扮演和文字冒险等丰富的插件。 输入'{trigger_prefix}#帮助' 查看详细指令。""" ) return msg diff --git a/channel/wechatmp/passive_reply.py b/channel/wechatmp/passive_reply.py new file mode 100644 index 0000000..eca94ba --- /dev/null +++ b/channel/wechatmp/passive_reply.py @@ -0,0 +1,196 @@ +import time +import asyncio + +import web + +from channel.wechatmp.wechatmp_message import parse_xml +from channel.wechatmp.passive_reply_message import TextMsg, VoiceMsg, ImageMsg +from bridge.context import * +from bridge.reply import ReplyType +from channel.wechatmp.common import * +from channel.wechatmp.wechatmp_channel import WechatMPChannel +from common.log import logger +from config import conf + + +# This class is instantiated once per query +class Query: + def GET(self): + return verify_server(web.input()) + + def POST(self): + try: + request_time = time.time() + channel = WechatMPChannel() + webData = web.data() + logger.debug("[wechatmp] Receive post data:\n" + webData.decode("utf-8")) + wechatmp_msg = parse_xml(webData) + if wechatmp_msg.msg_type == "text" or wechatmp_msg.msg_type == "voice": + from_user = wechatmp_msg.from_user_id + to_user = wechatmp_msg.to_user_id + message = wechatmp_msg.content + message_id = wechatmp_msg.msg_id + + supported = True + if "【收到不支持的消息类型,暂无法显示】" in message: + supported = False # not supported, used to refresh + + # New request + if ( + from_user not in channel.cache_dict + and from_user not in channel.running + or message.startswith("#") + and message_id not in channel.request_cnt # insert the godcmd + ): + # The first query begin + if (wechatmp_msg.msg_type == "voice" and conf().get("voice_reply_voice") == True): + rtype = ReplyType.VOICE + else: + rtype = None + context = channel._compose_context( + ContextType.TEXT, message, isgroup=False, desire_rtype=rtype, msg=wechatmp_msg + ) + logger.debug( + "[wechatmp] context: {} {}".format(context, wechatmp_msg) + ) + + if supported and context: + # set private openai_api_key + # if from_user is not changed in itchat, this can be placed at chat_channel + user_data = conf().get_user_data(from_user) + context["openai_api_key"] = user_data.get("openai_api_key") + channel.running.add(from_user) + channel.produce(context) + else: + trigger_prefix = conf().get("single_chat_prefix", [""]) + if trigger_prefix or not supported: + if trigger_prefix: + content = textwrap.dedent( + f"""\ + 请输入'{trigger_prefix}'接你想说的话跟我说话。 + 例如: + {trigger_prefix}你好,很高兴见到你。""" + ) + else: + content = textwrap.dedent( + """\ + 你好,很高兴见到你。 + 请跟我说话吧。""" + ) + else: + logger.error(f"[wechatmp] unknown error") + content = textwrap.dedent( + """\ + 未知错误,请稍后再试""" + ) + replyPost = TextMsg(wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content).send() + return replyPost + + + # Wechat official server will request 3 times (5 seconds each), with the same message_id. + # Because the interval is 5 seconds, here assumed that do not have multithreading problems. + request_cnt = channel.request_cnt.get(message_id, 0) + 1 + channel.request_cnt[message_id] = request_cnt + logger.info( + "[wechatmp] Request {} from {} {}\n{}\n{}:{}".format( + request_cnt, + from_user, + message_id, + message, + web.ctx.env.get("REMOTE_ADDR"), + web.ctx.env.get("REMOTE_PORT"), + ) + ) + + task_running = True + waiting_until = request_time + 4 + while time.time() < waiting_until: + if from_user in channel.running: + time.sleep(0.1) + else: + task_running = False + break + + reply_text = "" + if task_running: + if request_cnt < 3: + # waiting for timeout (the POST request will be closed by Wechat official server) + time.sleep(2) + # and do nothing, waiting for the next request + return "success" + else: # request_cnt == 3: + # return timeout message + reply_text = "【正在思考中,回复任意文字尝试获取回复】" + replyPost = TextMsg(from_user, to_user, reply_text).send() + return replyPost + + # reply is ready + channel.request_cnt.pop(message_id) + + # no return because of bandwords or other reasons + if ( + from_user not in channel.cache_dict + and from_user not in channel.running + ): + return "success" + + # Only one request can access to the cached data + try: + (reply_type, content) = channel.cache_dict.pop(from_user) + except KeyError: + return "success" + + if (reply_type == "text"): + if len(content.encode("utf8")) <= MAX_UTF8_LEN: + reply_text = content + else: + continue_text = "\n【未完待续,回复任意文字以继续】" + splits = split_string_by_utf8_length( + content, + MAX_UTF8_LEN - len(continue_text.encode("utf-8")), + max_split=1, + ) + reply_text = splits[0] + continue_text + channel.cache_dict[from_user] = ("text", splits[1]) + + logger.info( + "[wechatmp] Request {} do send to {} {}: {}\n{}".format( + request_cnt, + from_user, + message_id, + message, + reply_text, + ) + ) + replyPost = TextMsg(from_user, to_user, reply_text).send() + return replyPost + + elif (reply_type == "voice"): + media_id = content + asyncio.run_coroutine_threadsafe(channel.delete_media(media_id), channel.delete_media_loop) + replyPost = VoiceMsg(from_user, to_user, media_id).send() + return replyPost + + elif (reply_type == "image"): + media_id = content + asyncio.run_coroutine_threadsafe(channel.delete_media(media_id), channel.delete_media_loop) + replyPost = ImageMsg(from_user, to_user, media_id).send() + return replyPost + + elif wechatmp_msg.msg_type == "event": + logger.info( + "[wechatmp] Event {} from {}".format( + wechatmp_msg.content, wechatmp_msg.from_user_id + ) + ) + content = subscribe_msg() + replyMsg = TextMsg( + wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content + ) + return replyMsg.send() + else: + logger.info("暂且不处理") + return "success" + except Exception as exc: + logger.exception(exc) + return exc diff --git a/channel/wechatmp/reply.py b/channel/wechatmp/passive_reply_message.py similarity index 68% rename from channel/wechatmp/reply.py rename to channel/wechatmp/passive_reply_message.py index 2f852f9..ef58d70 100644 --- a/channel/wechatmp/reply.py +++ b/channel/wechatmp/passive_reply_message.py @@ -32,6 +32,29 @@ class TextMsg(Msg): return XmlForm.format(**self.__dict) +class VoiceMsg(Msg): + def __init__(self, toUserName, fromUserName, mediaId): + self.__dict = dict() + self.__dict["ToUserName"] = toUserName + self.__dict["FromUserName"] = fromUserName + self.__dict["CreateTime"] = int(time.time()) + self.__dict["MediaId"] = mediaId + + def send(self): + XmlForm = """ + + + + {CreateTime} + + + + + + """ + return XmlForm.format(**self.__dict) + + class ImageMsg(Msg): def __init__(self, toUserName, fromUserName, mediaId): self.__dict = dict() diff --git a/channel/wechatmp/wechatmp_channel.py b/channel/wechatmp/wechatmp_channel.py index ac3c3ac..9780048 100644 --- a/channel/wechatmp/wechatmp_channel.py +++ b/channel/wechatmp/wechatmp_channel.py @@ -1,20 +1,22 @@ # -*- coding: utf-8 -*- -import json -import threading +import io +import os import time - +import imghdr import requests -import web - from bridge.context import * from bridge.reply import * from channel.chat_channel import ChatChannel +from channel.wechatmp.wechatmp_client import WechatMPClient from channel.wechatmp.common import * -from common.expired_dict import ExpiredDict from common.log import logger from common.singleton import singleton from config import conf +import asyncio +from threading import Thread + +import web # If using SSL, uncomment the following lines, and modify the certificate path. # from cheroot.server import HTTPServer # from cheroot.ssl.builtin import BuiltinSSLAdapter @@ -28,94 +30,120 @@ class WechatMPChannel(ChatChannel): def __init__(self, passive_reply=True): super().__init__() self.passive_reply = passive_reply - self.running = set() - self.received_msgs = ExpiredDict(60 * 60 * 24) + self.NOT_SUPPORT_REPLYTYPE = [] + self.client = WechatMPClient() if self.passive_reply: - self.NOT_SUPPORT_REPLYTYPE = [ReplyType.IMAGE, ReplyType.VOICE] + # Cache the reply to the user's first message self.cache_dict = dict() - self.query1 = dict() - self.query2 = dict() - self.query3 = dict() - else: - # TODO support image - self.NOT_SUPPORT_REPLYTYPE = [ReplyType.IMAGE, ReplyType.VOICE] - self.app_id = conf().get("wechatmp_app_id") - self.app_secret = conf().get("wechatmp_app_secret") - self.access_token = None - self.access_token_expires_time = 0 - self.access_token_lock = threading.Lock() - self.get_access_token() + # Record whether the current message is being processed + self.running = set() + # Count the request from wechat official server by message_id + self.request_cnt = dict() + # The permanent media need to be deleted to avoid media number limit + self.delete_media_loop = asyncio.new_event_loop() + t = Thread(target=self.start_loop, args=(self.delete_media_loop,)) + t.setDaemon(True) + t.start() + def startup(self): if self.passive_reply: - urls = ("/wx", "channel.wechatmp.SubscribeAccount.Query") + urls = ("/wx", "channel.wechatmp.passive_reply.Query") else: - urls = ("/wx", "channel.wechatmp.ServiceAccount.Query") + urls = ("/wx", "channel.wechatmp.active_reply.Query") app = web.application(urls, globals(), autoreload=False) port = conf().get("wechatmp_port", 8080) web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) - def wechatmp_request(self, method, url, **kwargs): - r = requests.request(method=method, url=url, **kwargs) - r.raise_for_status() - r.encoding = "utf-8" - ret = r.json() - if "errcode" in ret and ret["errcode"] != 0: - raise WeChatAPIException("{}".format(ret)) - return ret + def start_loop(self, loop): + asyncio.set_event_loop(loop) + loop.run_forever() - def get_access_token(self): - # return the access_token - if self.access_token: - if self.access_token_expires_time - time.time() > 60: - return self.access_token - - # Get new access_token - # Do not request access_token in parallel! Only the last obtained is valid. - if self.access_token_lock.acquire(blocking=False): - # Wait for other threads that have previously obtained access_token to complete the request - # This happens every 2 hours, so it doesn't affect the experience very much - time.sleep(1) - self.access_token = None - url = "https://api.weixin.qq.com/cgi-bin/token" - params = { - "grant_type": "client_credential", - "appid": self.app_id, - "secret": self.app_secret, - } - data = self.wechatmp_request(method="get", url=url, params=params) - self.access_token = data["access_token"] - self.access_token_expires_time = int(time.time()) + data["expires_in"] - logger.info("[wechatmp] access_token: {}".format(self.access_token)) - self.access_token_lock.release() - else: - # Wait for token update - while self.access_token_lock.locked(): - time.sleep(0.1) - return self.access_token + async def delete_media(self, media_id): + logger.debug("[wechatmp] permanent media {} will be deleted in 10s".format(media_id)) + await asyncio.sleep(10) + self.client.delete_permanent_media(media_id) + logger.info("[wechatmp] permanent media {} has been deleted".format(media_id)) def send(self, reply: Reply, context: Context): + receiver = context["receiver"] if self.passive_reply: - receiver = context["receiver"] - self.cache_dict[receiver] = reply.content - logger.info("[send] reply to {} saved to cache: {}".format(receiver, reply)) + if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR: + reply_text = reply.content + logger.info("[wechatmp] reply to {} cached:\n{}".format(receiver, reply_text)) + self.cache_dict[receiver] = ("text", reply_text) + elif reply.type == ReplyType.VOICE: + voice_file_path = reply.content + logger.info("[wechatmp] voice file path {}".format(voice_file_path)) + with open(voice_file_path, 'rb') as f: + filename = receiver + "-" + context["msg"].msg_id + ".mp3" + media_id = self.client.upload_permanent_media("voice", (filename, f, "audio/mpeg")) + # 根据文件大小估计一个微信自动审核的时间,审核结束前返回将会导致语音无法播放,这个估计有待验证 + f_size = os.fstat(f.fileno()).st_size + print(f_size) + time.sleep(1.0 + 2 * f_size / 1024 / 1024) + logger.info("[wechatmp] voice reply to {} uploaded: {}".format(receiver, media_id)) + self.cache_dict[receiver] = ("voice", media_id) + elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片 + img_url = reply.content + pic_res = requests.get(img_url, stream=True) + print(pic_res.headers) + image_storage = io.BytesIO() + for block in pic_res.iter_content(1024): + image_storage.write(block) + image_storage.seek(0) + image_type = imghdr.what(image_storage) + filename = receiver + "-" + context["msg"].msg_id + "." + image_type + content_type = "image/" + image_type + media_id = self.client.upload_permanent_media("image", (filename, image_storage, content_type)) + logger.info("[wechatmp] image reply to {} uploaded: {}".format(receiver, media_id)) + self.cache_dict[receiver] = ("image", media_id) + elif reply.type == ReplyType.IMAGE: # 从文件读取图片 + image_storage = reply.content + image_storage.seek(0) + image_type = imghdr.what(image_storage) + filename = receiver + "-" + context["msg"].msg_id + "." + image_type + content_type = "image/" + image_type + media_id = self.client.upload_permanent_media("image", (filename, image_storage, content_type)) + logger.info("[wechatmp] image reply to {} uploaded: {}".format(receiver, media_id)) + self.cache_dict[receiver] = ("image", media_id) else: - receiver = context["receiver"] - reply_text = reply.content - url = "https://api.weixin.qq.com/cgi-bin/message/custom/send" - params = {"access_token": self.get_access_token()} - json_data = { - "touser": receiver, - "msgtype": "text", - "text": {"content": reply_text}, - } - self.wechatmp_request( - method="post", - url=url, - params=params, - data=json.dumps(json_data, ensure_ascii=False).encode("utf8"), - ) - logger.info("[send] Do send to {}: {}".format(receiver, reply_text)) + if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR: + reply_text = reply.content + self.client.send_text(receiver, reply_text) + logger.info("[wechatmp] Do send to {}: {}".format(receiver, reply_text)) + elif reply.type == ReplyType.VOICE: + voice_file_path = reply.content + logger.info("[wechatmp] voice file path {}".format(voice_file_path)) + with open(voice_file_path, 'rb') as f: + filename = receiver + "-" + context["msg"].msg_id + ".mp3" + media_id = self.client.upload_media("voice", (filename, f, "audio/mpeg")) + self.client.send_voice(receiver, media_id) + logger.info("[wechatmp] Do send voice to {}".format(receiver)) + elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片 + img_url = reply.content + pic_res = requests.get(img_url, stream=True) + print(pic_res.headers) + image_storage = io.BytesIO() + for block in pic_res.iter_content(1024): + image_storage.write(block) + image_storage.seek(0) + image_type = imghdr.what(image_storage) + filename = receiver + "-" + context["msg"].msg_id + "." + image_type + content_type = "image/" + image_type + # content_type = pic_res.headers.get('content-type') + media_id = self.client.upload_media("image", (filename, image_storage, content_type)) + self.client.send_image(receiver, media_id) + logger.info("[wechatmp] sendImage url={}, receiver={}".format(img_url, receiver)) + elif reply.type == ReplyType.IMAGE: # 从文件读取图片 + image_storage = reply.content + image_storage.seek(0) + image_type = imghdr.what(image_storage) + filename = receiver + "-" + context["msg"].msg_id + "." + image_type + content_type = "image/" + image_type + media_id = self.client.upload_media("image", (filename, image_storage, content_type)) + self.client.send_image(receiver, media_id) + logger.info("[wechatmp] sendImage, receiver={}".format(receiver)) return def _success_callback(self, session_id, context, **kwargs): # 线程异常结束时的回调函数 diff --git a/channel/wechatmp/wechatmp_client.py b/channel/wechatmp/wechatmp_client.py new file mode 100644 index 0000000..96ebddb --- /dev/null +++ b/channel/wechatmp/wechatmp_client.py @@ -0,0 +1,180 @@ +import time +import json +import requests +import threading +from channel.wechatmp.common import * +from common.log import logger +from config import conf + + +class WechatMPClient: + def __init__(self): + self.app_id = conf().get("wechatmp_app_id") + self.app_secret = conf().get("wechatmp_app_secret") + self.access_token = None + self.access_token_expires_time = 0 + self.access_token_lock = threading.Lock() + self.get_access_token() + + + def wechatmp_request(self, method, url, **kwargs): + r = requests.request(method=method, url=url, **kwargs) + r.raise_for_status() + r.encoding = "utf-8" + ret = r.json() + if "errcode" in ret and ret["errcode"] != 0: + if ret["errcode"] == 45009: + self.clear_quota_v2() + raise WeChatAPIException("{}".format(ret)) + return ret + + def get_access_token(self): + # return the access_token + if self.access_token: + if self.access_token_expires_time - time.time() > 60: + return self.access_token + + # Get new access_token + # Do not request access_token in parallel! Only the last obtained is valid. + if self.access_token_lock.acquire(blocking=False): + # Wait for other threads that have previously obtained access_token to complete the request + # This happens every 2 hours, so it doesn't affect the experience very much + time.sleep(1) + self.access_token = None + url = "https://api.weixin.qq.com/cgi-bin/token" + params = { + "grant_type": "client_credential", + "appid": self.app_id, + "secret": self.app_secret, + } + ret = self.wechatmp_request(method="get", url=url, params=params) + self.access_token = ret["access_token"] + self.access_token_expires_time = int(time.time()) + ret["expires_in"] + logger.info("[wechatmp] access_token: {}".format(self.access_token)) + self.access_token_lock.release() + else: + # Wait for token update + while self.access_token_lock.locked(): + time.sleep(0.1) + return self.access_token + + + def send_text(self, receiver, reply_text): + url = "https://api.weixin.qq.com/cgi-bin/message/custom/send" + params = {"access_token": self.get_access_token()} + json_data = { + "touser": receiver, + "msgtype": "text", + "text": {"content": reply_text}, + } + self.wechatmp_request( + method="post", + url=url, + params=params, + data=json.dumps(json_data, ensure_ascii=False).encode("utf8"), + ) + + + def send_voice(self, receiver, media_id): + url="https://api.weixin.qq.com/cgi-bin/message/custom/send" + params = {"access_token": self.get_access_token()} + json_data = { + "touser": receiver, + "msgtype": "voice", + "voice": { + "media_id": media_id + } + } + self.wechatmp_request( + method="post", + url=url, + params=params, + data=json.dumps(json_data, ensure_ascii=False).encode("utf8"), + ) + + def send_image(self, receiver, media_id): + url="https://api.weixin.qq.com/cgi-bin/message/custom/send" + params = {"access_token": self.get_access_token()} + json_data = { + "touser": receiver, + "msgtype": "image", + "image": { + "media_id": media_id + } + } + self.wechatmp_request( + method="post", + url=url, + params=params, + data=json.dumps(json_data, ensure_ascii=False).encode("utf8"), + ) + + + def upload_media(self, media_type, media_file): + url="https://api.weixin.qq.com/cgi-bin/media/upload" + params={ + "access_token": self.get_access_token(), + "type": media_type + } + files={"media": media_file} + ret = self.wechatmp_request( + method="post", + url=url, + params=params, + files=files + ) + logger.debug("[wechatmp] media {} uploaded".format(media_file)) + return ret["media_id"] + + + def upload_permanent_media(self, media_type, media_file): + url="https://api.weixin.qq.com/cgi-bin/material/add_material" + params={ + "access_token": self.get_access_token(), + "type": media_type + } + files={"media": media_file} + ret = self.wechatmp_request( + method="post", + url=url, + params=params, + files=files + ) + logger.debug("[wechatmp] permanent media {} uploaded".format(media_file)) + return ret["media_id"] + + + def delete_permanent_media(self, media_id): + url="https://api.weixin.qq.com/cgi-bin/material/del_material" + params={ + "access_token": self.get_access_token() + } + self.wechatmp_request( + method="post", + url=url, + params=params, + data=json.dumps({"media_id": media_id}, ensure_ascii=False).encode("utf8") + ) + logger.debug("[wechatmp] permanent media {} deleted".format(media_id)) + + def clear_quota(self): + url="https://api.weixin.qq.com/cgi-bin/clear_quota" + params = { + "access_token": self.get_access_token() + } + self.wechatmp_request( + method="post", + url=url, + params=params, + data={"appid": self.app_id} + ) + logger.debug("[wechatmp] API quata has been cleard") + + def clear_quota_v2(self): + url="https://api.weixin.qq.com/cgi-bin/clear_quota/v2" + self.wechatmp_request( + method="post", + url=url, + data={"appid": self.app_id, "appsecret": self.app_secret} + ) + logger.debug("[wechatmp] API quata has been cleard") diff --git a/channel/wechatmp/receive.py b/channel/wechatmp/wechatmp_message.py similarity index 82% rename from channel/wechatmp/receive.py rename to channel/wechatmp/wechatmp_message.py index 1285fd1..d385897 100644 --- a/channel/wechatmp/receive.py +++ b/channel/wechatmp/wechatmp_message.py @@ -32,12 +32,15 @@ class WeChatMPMessage(ChatMessage): if self.msg_type == "text": self.ctype = ContextType.TEXT - self.content = xmlData.find("Content").text.encode("utf-8") + self.content = xmlData.find("Content").text elif self.msg_type == "voice": self.ctype = ContextType.TEXT - self.content = xmlData.find("Recognition").text.encode("utf-8") # 接收语音识别结果 + self.content = xmlData.find("Recognition").text # 接收语音识别结果 + # other voice_to_text method not implemented yet + if self.content == None: + self.content = "你好" elif self.msg_type == "image": - # not implemented + # not implemented yet self.pic_url = xmlData.find("PicUrl").text self.media_id = xmlData.find("MediaId").text elif self.msg_type == "event":