diff --git a/bot/chatgpt/chat_gpt_bot.py b/bot/chatgpt/chat_gpt_bot.py index 7d1253d..158392d 100644 --- a/bot/chatgpt/chat_gpt_bot.py +++ b/bot/chatgpt/chat_gpt_bot.py @@ -91,7 +91,8 @@ class ChatGPTBot(Bot,OpenAIImage): "top_p":1, "frequency_penalty":conf().get('frequency_penalty', 0.0), # [-2,2]之间,该值越大则更倾向于产生不同的内容 "presence_penalty":conf().get('presence_penalty', 0.0), # [-2,2]之间,该值越大则更倾向于产生不同的内容 - "request_timeout": conf().get('request_timeout', 30), # 请求超时时间 + "request_timeout": conf().get('request_timeout', 60), # 请求超时时间,openai接口默认设置为600,对于难问题一般需要较长时间 + "timeout": conf().get('request_timeout', 120), #重试超时时间,在这个时间内,将会自动重试 } def reply_text(self, session:ChatGPTSession, session_id, api_key, retry_count=0) -> dict: diff --git a/channel/channel_factory.py b/channel/channel_factory.py index 3d06154..3303ded 100644 --- a/channel/channel_factory.py +++ b/channel/channel_factory.py @@ -18,6 +18,6 @@ def create_channel(channel_type): from channel.terminal.terminal_channel import TerminalChannel return TerminalChannel() elif channel_type == 'wechatmp': - from channel.wechatmp.wechatmp_channel import WechatMPServer - return WechatMPServer() + from channel.wechatmp.wechatmp_channel import WechatMPChannel + return WechatMPChannel() raise RuntimeError diff --git a/channel/wechatmp/README.md b/channel/wechatmp/README.md index a01bf21..bfa8480 100644 --- a/channel/wechatmp/README.md +++ b/channel/wechatmp/README.md @@ -18,7 +18,18 @@ pip3 install web.py 然后根据[接入指南](https://developers.weixin.qq.com/doc/offiaccount/Basic_Information/Access_Overview.html)的说明,在[微信公众平台](https://mp.weixin.qq.com)的“设置与开发”-“基本配置”-“服务器配置”中填写服务器地址`URL`和令牌`Token`。这里的`URL`是`example.com/wx`的形式,不可以使用IP,`Token`是你自己编的一个特定的令牌。消息加解密方式目前选择的是明文模式。 -相关的服务器验证代码已经写好,你不需要再添加任何代码。你只需要在本项目根目录的`config.json`中添加`"channel_type": "wechatmp", "wechatmp_token": "your Token", ` 然后运行`python3 app.py`启动web服务器,然后在刚才的“服务器配置”中点击`提交`即可验证你的服务器。 +相关的服务器验证代码已经写好,你不需要再添加任何代码。你只需要在本项目根目录的`config.json`中添加 +``` +"channel_type": "wechatmp", +"wechatmp_token": "your Token", +``` +然后运行`python3 app.py`启动web服务器。这里会默认监听8080端口,但是微信公众号的服务器配置只支持80/443端口,有两种方法来解决这个问题。第一个是推荐的方法,使用端口转发命令将80端口转发到8080端口(443同理,注意需要支持SSL,也就是https的访问,在`wechatmp_channel.py`需要修改相应的证书路径): +``` +sudo iptables -t nat -A PREROUTING -p tcp --dport 80 -j REDIRECT --to-port 8080 +sudo iptables-save > /etc/iptables/rules.v4 +``` +第二个方法是让python程序直接监听80端口,可以直接使用命令`python3 app.py 80`。这样可能会导致权限问题,在linux上需要使用`sudo`。然而这会导致后续缓存文件的权限问题,因此不是推荐的方法。 +最后在刚才的“服务器配置”中点击`提交`即可验证你的服务器。 随后在[微信公众平台](https://mp.weixin.qq.com)启用服务器,关闭手动填写规则的自动回复,即可实现ChatGPT的自动回复。 diff --git a/channel/wechatmp/receive.py b/channel/wechatmp/receive.py index 40fc35f..9abcc92 100644 --- a/channel/wechatmp/receive.py +++ b/channel/wechatmp/receive.py @@ -1,47 +1,42 @@ # -*- coding: utf-8 -*-# # filename: receive.py import xml.etree.ElementTree as ET +from bridge.context import ContextType +from channel.chat_message import ChatMessage +from common.log import logger def parse_xml(web_data): if len(web_data) == 0: return None xmlData = ET.fromstring(web_data) - msg_type = xmlData.find('MsgType').text - if msg_type == 'text': - return TextMsg(xmlData) - elif msg_type == 'image': - return ImageMsg(xmlData) - elif msg_type == 'event': - return Event(xmlData) + return WeChatMPMessage(xmlData) - -class Msg(object): - def __init__(self, xmlData): - self.ToUserName = xmlData.find('ToUserName').text - self.FromUserName = xmlData.find('FromUserName').text - self.CreateTime = xmlData.find('CreateTime').text - self.MsgType = xmlData.find('MsgType').text - self.MsgId = xmlData.find('MsgId').text - - -class TextMsg(Msg): - def __init__(self, xmlData): - Msg.__init__(self, xmlData) - self.Content = xmlData.find('Content').text.encode("utf-8") - - -class ImageMsg(Msg): - def __init__(self, xmlData): - Msg.__init__(self, xmlData) - self.PicUrl = xmlData.find('PicUrl').text - self.MediaId = xmlData.find('MediaId').text - - -class Event(object): +class WeChatMPMessage(ChatMessage): def __init__(self, xmlData): - self.ToUserName = xmlData.find('ToUserName').text - self.FromUserName = xmlData.find('FromUserName').text - self.CreateTime = xmlData.find('CreateTime').text - self.MsgType = xmlData.find('MsgType').text - self.Event = xmlData.find('Event').text + super().__init__(xmlData) + self.to_user_id = xmlData.find('ToUserName').text + self.from_user_id = xmlData.find('FromUserName').text + self.create_time = xmlData.find('CreateTime').text + self.msg_type = xmlData.find('MsgType').text + self.msg_id = xmlData.find('MsgId').text + self.is_group = False + + # reply to other_user_id + self.other_user_id = self.from_user_id + + if self.msg_type == 'text': + self.ctype = ContextType.TEXT + self.content = xmlData.find('Content').text.encode("utf-8") + elif self.msg_type == 'voice': + self.ctype = ContextType.TEXT + self.content = xmlData.find('Recognition').text.encode("utf-8") # 接收语音识别结果 + elif self.msg_type == 'image': + # not implemented + self.pic_url = xmlData.find('PicUrl').text + self.media_id = xmlData.find('MediaId').text + elif self.msg_type == 'event': + self.event = xmlData.find('Event').text + else: # video, shortvideo, location, link + # not implemented + pass \ No newline at end of file diff --git a/channel/wechatmp/wechatmp_channel.py b/channel/wechatmp/wechatmp_channel.py index c2637a2..0453741 100644 --- a/channel/wechatmp/wechatmp_channel.py +++ b/channel/wechatmp/wechatmp_channel.py @@ -4,9 +4,10 @@ import time import math import hashlib import textwrap -from channel.channel import Channel +from channel.chat_channel import ChatChannel import channel.wechatmp.reply as reply import channel.wechatmp.receive as receive +from common.singleton import singleton from common.log import logger from config import conf from bridge.reply import * @@ -21,202 +22,136 @@ import traceback # certificate='/ssl/cert.pem', # private_key='/ssl/cert.key') -class WechatMPServer(): + +# from concurrent.futures import ThreadPoolExecutor +# thread_pool = ThreadPoolExecutor(max_workers=8) + +@singleton +class WechatMPChannel(ChatChannel): def __init__(self): - pass + super().__init__() + self.cache_dict = dict() + self.query1 = dict() + self.query2 = dict() + self.query3 = dict() + - def startup(self): + def startup(self): urls = ( - '/wx', 'WechatMPChannel', + '/wx', 'SubsribeAccountQuery', ) app = web.application(urls, globals()) - web.httpserver.runsimple(app.wsgifunc(), ('0.0.0.0', 80)) - -cache_dict = dict() -query1 = dict() -query2 = dict() -query3 = dict() - -from concurrent.futures import ThreadPoolExecutor -thread_pool = ThreadPoolExecutor(max_workers=8) - -class WechatMPChannel(Channel): - - def GET(self): - try: - data = web.input() - if len(data) == 0: - return "hello, this is handle view" - signature = data.signature - timestamp = data.timestamp - nonce = data.nonce - echostr = data.echostr - token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写 + app.run() - data_list = [token, timestamp, nonce] - data_list.sort() - sha1 = hashlib.sha1() - # map(sha1.update, data_list) #python2 - sha1.update("".join(data_list).encode('utf-8')) - hashcode = sha1.hexdigest() - print("handle/GET func: hashcode, signature: ", hashcode, signature) - if hashcode == signature: - return echostr - else: - return "" - except Exception as Argument: - return Argument - - - def _do_build_reply(self, cache_key, fromUser, message): - context = dict() - context['session_id'] = fromUser - reply_text = super().build_reply_content(message, context) - # The query is done, record the cache - logger.info("[threaded] Get reply for {}: {} \nA: {}".format(fromUser, message, reply_text)) - global cache_dict - reply_cnt = math.ceil(len(reply_text) / 600) - cache_dict[cache_key] = (reply_cnt, reply_text) - - def send(self, reply : Reply, cache_key): - global cache_dict + def send(self, reply: Reply, context: Context): reply_cnt = math.ceil(len(reply.content) / 600) - cache_dict[cache_key] = (reply_cnt, reply.content) - - - def handle(self, context): - global cache_dict - try: - reply = Reply() - logger.debug('[wechatmp] ready to handle context: {}'.format(context)) - - # reply的构建步骤 - e_context = PluginManager().emit_event(EventContext(Event.ON_HANDLE_CONTEXT, {'channel' : self, 'context': context, 'reply': reply})) - reply = e_context['reply'] - if not e_context.is_pass(): - logger.debug('[wechatmp] ready to handle context: type={}, content={}'.format(context.type, context.content)) - if context.type == ContextType.TEXT or context.type == ContextType.IMAGE_CREATE: - reply = super().build_reply_content(context.content, context) - # elif context.type == ContextType.VOICE: - # msg = context['msg'] - # file_name = TmpDir().path() + context.content - # msg.download(file_name) - # reply = super().build_voice_to_text(file_name) - # if reply.type != ReplyType.ERROR and reply.type != ReplyType.INFO: - # context.content = reply.content # 语音转文字后,将文字内容作为新的context - # context.type = ContextType.TEXT - # reply = super().build_reply_content(context.content, context) - # if reply.type == ReplyType.TEXT: - # if conf().get('voice_reply_voice'): - # reply = super().build_text_to_voice(reply.content) - else: - logger.error('[wechatmp] unknown context type: {}'.format(context.type)) - return - - logger.debug('[wechatmp] ready to decorate reply: {}'.format(reply)) - - # reply的包装步骤 - if reply and reply.type: - e_context = PluginManager().emit_event(EventContext(Event.ON_DECORATE_REPLY, {'channel' : self, 'context': context, 'reply': reply})) - reply=e_context['reply'] - if not e_context.is_pass() and reply and reply.type: - if reply.type == ReplyType.TEXT: - pass - elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO: - reply.content = str(reply.type)+":\n" + reply.content - elif reply.type == ReplyType.IMAGE_URL or reply.type == ReplyType.VOICE or reply.type == ReplyType.IMAGE: - pass - else: - logger.error('[wechatmp] unknown reply type: {}'.format(reply.type)) - return - - # reply的发送步骤 - if reply and reply.type: - e_context = PluginManager().emit_event(EventContext(Event.ON_SEND_REPLY, {'channel' : self, 'context': context, 'reply': reply})) - reply=e_context['reply'] - if not e_context.is_pass() and reply and reply.type: - logger.debug('[wechatmp] ready to send reply: {} to {}'.format(reply, context['receiver'])) - self.send(reply, context['receiver']) - else: - cache_dict[context['receiver']] = (1, "No reply") - - logger.info("[threaded] Get reply for {}: {} \nA: {}".format(context['receiver'], context.content, reply.content)) - except Exception as exc: - print(traceback.format_exc()) - cache_dict[context['receiver']] = (1, "ERROR") - + receiver = context["receiver"] + self.cache_dict[receiver] = (reply_cnt, reply.content) + logger.debug("[send] reply to {} saved to cache: {}".format(receiver, reply)) + + +def verify_server(): + try: + data = web.input() + if len(data) == 0: + return "None" + signature = data.signature + timestamp = data.timestamp + nonce = data.nonce + echostr = data.echostr + token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写 + + data_list = [token, timestamp, nonce] + data_list.sort() + sha1 = hashlib.sha1() + # map(sha1.update, data_list) #python2 + sha1.update("".join(data_list).encode('utf-8')) + hashcode = sha1.hexdigest() + print("handle/GET func: hashcode, signature: ", hashcode, signature) + if hashcode == signature: + return echostr + else: + return "" + except Exception as Argument: + return Argument + + +# This class is instantiated once per query +class SubsribeAccountQuery(): + def GET(self): + return verify_server() def POST(self): + channel_instance = WechatMPChannel() try: - queryTime = time.time() + query_time = time.time() webData = web.data() # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8")) - recMsg = receive.parse_xml(webData) - if isinstance(recMsg, receive.Msg) and recMsg.MsgType == 'text': - fromUser = recMsg.FromUserName - toUser = recMsg.ToUserName - createTime = recMsg.CreateTime - message = recMsg.Content.decode("utf-8") - message_id = recMsg.MsgId + wechat_msg = receive.parse_xml(webData) + if wechat_msg.msg_type == 'text': + from_user = wechat_msg.from_user_id + to_user = wechat_msg.to_user_id + message = wechat_msg.content.decode("utf-8") + message_id = wechat_msg.msg_id - logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), fromUser, message_id, message)) + logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message)) - global cache_dict - global query1 - global query2 - global query3 - cache_key = fromUser - cache = cache_dict.get(cache_key) + cache_key = from_user + cache = channel_instance.cache_dict.get(cache_key) reply_text = "" # New request if cache == None: # The first query begin, reset the cache - cache_dict[cache_key] = (0, "") - # thread_pool.submit(self._do_build_reply, cache_key, fromUser, message) - - context = Context() - context.kwargs = {'isgroup': False, 'receiver': fromUser, 'session_id': fromUser} - - user_data = conf().get_user_data(fromUser) - context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key - - img_match_prefix = check_prefix(message, conf().get('image_create_prefix')) - if img_match_prefix: - message = message.replace(img_match_prefix, '', 1).strip() - context.type = ContextType.IMAGE_CREATE + context = channel_instance._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechat_msg) + logger.debug("[wechatmp] context: {} {}".format(context, wechat_msg)) + if context: + # set private openai_api_key + # if from_user is not changed in itchat, this can be placed at chat_channel + user_data = conf().get_user_data(from_user) + context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key + channel_instance.cache_dict[cache_key] = (0, "") + channel_instance.produce(context) else: - context.type = ContextType.TEXT - context.content = message - thread_pool.submit(self.handle, context) - - query1[cache_key] = False - query2[cache_key] = False - query3[cache_key] = False + trigger_prefix = conf().get('single_chat_prefix',[''])[0] + if trigger_prefix: + content = textwrap.dedent(f"""\ + 请输入'{trigger_prefix}'接你想说的话跟我说话。 + 例如: + {trigger_prefix}你好,很高兴见到你。""") + else: + logger.error(f"[wechatmp] unknown error") + content = textwrap.dedent("""\ + 未知错误,请稍后再试""") + replyMsg = reply.TextMsg(wechat_msg.from_user_id, wechat_msg.to_user_id, content) + return replyMsg.send() + channel_instance.query1[cache_key] = False + channel_instance.query2[cache_key] = False + channel_instance.query3[cache_key] = False # Request again - elif cache[0] == 0 and query1.get(cache_key) == True and query2.get(cache_key) == True and query3.get(cache_key) == True: - query1[cache_key] = False #To improve waiting experience, this can be set to True. - query2[cache_key] = False #To improve waiting experience, this can be set to True. - query3[cache_key] = False + elif cache[0] == 0 and channel_instance.query1.get(cache_key) == True and channel_instance.query2.get(cache_key) == True and channel_instance.query3.get(cache_key) == True: + channel_instance.query1[cache_key] = False #To improve waiting experience, this can be set to True. + channel_instance.query2[cache_key] = False #To improve waiting experience, this can be set to True. + channel_instance.query3[cache_key] = False elif cache[0] >= 1: # Skip the waiting phase - query1[cache_key] = True - query2[cache_key] = True - query3[cache_key] = True + channel_instance.query1[cache_key] = True + channel_instance.query2[cache_key] = True + channel_instance.query3[cache_key] = True - cache = cache_dict.get(cache_key) - if query1.get(cache_key) == False: + cache = channel_instance.cache_dict.get(cache_key) + if channel_instance.query1.get(cache_key) == False: # The first query from wechat official server logger.debug("[wechatmp] query1 {}".format(cache_key)) - query1[cache_key] = True + channel_instance.query1[cache_key] = True cnt = 0 while cache[0] == 0 and cnt < 45: cnt = cnt + 1 time.sleep(0.1) - cache = cache_dict.get(cache_key) + cache = channel_instance.cache_dict.get(cache_key) if cnt == 45: # waiting for timeout (the POST query will be closed by wechat official server) time.sleep(5) @@ -224,15 +159,15 @@ class WechatMPChannel(Channel): return else: pass - elif query2.get(cache_key) == False: + elif channel_instance.query2.get(cache_key) == False: # The second query from wechat official server logger.debug("[wechatmp] query2 {}".format(cache_key)) - query2[cache_key] = True + channel_instance.query2[cache_key] = True cnt = 0 while cache[0] == 0 and cnt < 45: cnt = cnt + 1 time.sleep(0.1) - cache = cache_dict.get(cache_key) + cache = channel_instance.cache_dict.get(cache_key) if cnt == 45: # waiting for timeout (the POST query will be closed by wechat official server) time.sleep(5) @@ -240,43 +175,44 @@ class WechatMPChannel(Channel): return else: pass - elif query3.get(cache_key) == False: + elif channel_instance.query3.get(cache_key) == False: # The third query from wechat official server logger.debug("[wechatmp] query3 {}".format(cache_key)) - query3[cache_key] = True + channel_instance.query3[cache_key] = True cnt = 0 while cache[0] == 0 and cnt < 45: cnt = cnt + 1 time.sleep(0.1) - cache = cache_dict.get(cache_key) + cache = channel_instance.cache_dict.get(cache_key) if cnt == 45: # Have waiting for 3x5 seconds # return timeout message reply_text = "【正在响应中,回复任意文字尝试获取回复】" - logger.info("[wechatmp] Three queries has finished For {}: {}".format(fromUser, message_id)) - replyPost = reply.TextMsg(fromUser, toUser, reply_text).send() + logger.info("[wechatmp] Three queries has finished For {}: {}".format(from_user, message_id)) + replyPost = reply.TextMsg(from_user, to_user, reply_text).send() return replyPost else: pass - if float(time.time()) - float(queryTime) > 4.8: - logger.info("[wechatmp] Timeout for {} {}".format(fromUser, message_id)) + if float(time.time()) - float(query_time) > 4.8: + logger.info("[wechatmp] Timeout for {} {}".format(from_user, message_id)) return if cache[0] > 1: reply_text = cache[1][:600] + "\n【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit - cache_dict[cache_key] = (cache[0] - 1, cache[1][600:]) + channel_instance.cache_dict[cache_key] = (cache[0] - 1, cache[1][600:]) elif cache[0] == 1: reply_text = cache[1] - cache_dict.pop(cache_key) + channel_instance.cache_dict.pop(cache_key) logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text)) - replyPost = reply.TextMsg(fromUser, toUser, reply_text).send() + replyPost = reply.TextMsg(from_user, to_user, reply_text).send() return replyPost - elif isinstance(recMsg, receive.Event) and recMsg.MsgType == 'event': - logger.info("[wechatmp] Event {} from {}".format(recMsg.Event, recMsg.FromUserName)) - content = textwrap.dedent("""\ + elif wechat_msg.msg_type == 'event': + logger.info("[wechatmp] Event {} from {}".format(wechat_msg.Event, wechat_msg.from_user_id)) + trigger_prefix = conf().get('single_chat_prefix',[''])[0] + content = textwrap.dedent(f"""\ 感谢您的关注! 这里是ChatGPT,可以自由对话。 资源有限,回复较慢,请勿着急。 @@ -284,8 +220,8 @@ class WechatMPChannel(Channel): 暂时不支持图片输入。 支持图片输出,画字开头的问题将回复图片链接。 支持角色扮演和文字冒险两种定制模式对话。 - 输入'#帮助' 查看详细指令。""") - replyMsg = reply.TextMsg(recMsg.FromUserName, recMsg.ToUserName, content) + 输入'{trigger_prefix}#帮助' 查看详细指令。""") + replyMsg = reply.TextMsg(wechat_msg.from_user_id, wechat_msg.to_user_id, content) return replyMsg.send() else: logger.info("暂且不处理") @@ -294,9 +230,3 @@ class WechatMPChannel(Channel): logger.exception(exc) return exc - -def check_prefix(content, prefix_list): - for prefix in prefix_list: - if content.startswith(prefix): - return prefix - return None diff --git a/config.py b/config.py index 282c071..48a6568 100644 --- a/config.py +++ b/config.py @@ -45,7 +45,8 @@ available_setting = { "top_p": 1, "frequency_penalty": 0, "presence_penalty": 0, - "request_timeout": 30, # chatgpt请求超时时间 + "request_timeout": 60, # chatgpt请求超时时间,openai接口默认设置为600,对于难问题一般需要较长时间 + "timeout": 120, # chatgpt重试超时时间,在这个时间内,将会自动重试 # 语音设置 "speech_recognition": False, # 是否开启语音识别 diff --git a/plugins/role/roles.json b/plugins/role/roles.json index ae3557f..4da8c8d 100644 --- a/plugins/role/roles.json +++ b/plugins/role/roles.json @@ -170,10 +170,10 @@ }, { "title": "群聊取名", - "description": "我希望你充当微信群聊的命名专家。根据我提供的信息和背景,为这个群聊起几个有趣顺口且贴切的名字,每个不要超过8个字。请在回答中仅给出群聊名称,不要写任何额外的解释。", - "descn": "我希望你充当微信群聊的命名专家。根据我提供的信息和背景,为这个群聊起几个有趣顺口且贴切的名字,每个不要超过8个字。请在回答中仅给出群聊名称,不要写任何额外的解释。", - "wrapper": "信息和背景是:\n\"%s\"", - "remark": "根据给出的信息和背景为群聊取名。" + "description": "我希望你充当微信群聊的命名专家。根据我提供的信息和背景,为这个群聊起几个有趣顺口且贴切的名字,每个不要超过8个字。请在回答中仅给出群聊名称,不要写任何额外的解释。", + "descn": "我希望你充当微信群聊的命名专家。根据我提供的信息和背景,为这个群聊起几个有趣顺口且贴切的名字,每个不要超过8个字。请在回答中仅给出群聊名称,不要写任何额外的解释。", + "wrapper": "信息和背景是:\n\"%s\"", + "remark": "根据给出的信息和背景为群聊取名。" }, { "title": "表情符号翻译器",