diff --git a/channel/channel_factory.py b/channel/channel_factory.py index 546f18a..905ea02 100644 --- a/channel/channel_factory.py +++ b/channel/channel_factory.py @@ -17,4 +17,7 @@ def create_channel(channel_type): elif channel_type == 'terminal': from channel.terminal.terminal_channel import TerminalChannel return TerminalChannel() + elif channel_type == 'mp': + from channel.wechatmp.wechatmp_channel import WechatMPServer + return WechatMPServer() raise RuntimeError diff --git a/channel/wechatmp/receive.py b/channel/wechatmp/receive.py new file mode 100644 index 0000000..40fc35f --- /dev/null +++ b/channel/wechatmp/receive.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*-# +# filename: receive.py +import xml.etree.ElementTree as ET + + +def parse_xml(web_data): + if len(web_data) == 0: + return None + xmlData = ET.fromstring(web_data) + msg_type = xmlData.find('MsgType').text + if msg_type == 'text': + return TextMsg(xmlData) + elif msg_type == 'image': + return ImageMsg(xmlData) + elif msg_type == 'event': + return Event(xmlData) + + +class Msg(object): + def __init__(self, xmlData): + self.ToUserName = xmlData.find('ToUserName').text + self.FromUserName = xmlData.find('FromUserName').text + self.CreateTime = xmlData.find('CreateTime').text + self.MsgType = xmlData.find('MsgType').text + self.MsgId = xmlData.find('MsgId').text + + +class TextMsg(Msg): + def __init__(self, xmlData): + Msg.__init__(self, xmlData) + self.Content = xmlData.find('Content').text.encode("utf-8") + + +class ImageMsg(Msg): + def __init__(self, xmlData): + Msg.__init__(self, xmlData) + self.PicUrl = xmlData.find('PicUrl').text + self.MediaId = xmlData.find('MediaId').text + + +class Event(object): + def __init__(self, xmlData): + self.ToUserName = xmlData.find('ToUserName').text + self.FromUserName = xmlData.find('FromUserName').text + self.CreateTime = xmlData.find('CreateTime').text + self.MsgType = xmlData.find('MsgType').text + self.Event = xmlData.find('Event').text diff --git a/channel/wechatmp/reply.py b/channel/wechatmp/reply.py new file mode 100644 index 0000000..5f3a934 --- /dev/null +++ b/channel/wechatmp/reply.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*-# +# filename: reply.py +import time + +class Msg(object): + def __init__(self): + pass + + def send(self): + return "success" + +class TextMsg(Msg): + def __init__(self, toUserName, fromUserName, content): + self.__dict = dict() + self.__dict['ToUserName'] = toUserName + self.__dict['FromUserName'] = fromUserName + self.__dict['CreateTime'] = int(time.time()) + self.__dict['Content'] = content + + def send(self): + XmlForm = """ + + + + {CreateTime} + + + + """ + return XmlForm.format(**self.__dict) + +class ImageMsg(Msg): + def __init__(self, toUserName, fromUserName, mediaId): + self.__dict = dict() + self.__dict['ToUserName'] = toUserName + self.__dict['FromUserName'] = fromUserName + self.__dict['CreateTime'] = int(time.time()) + self.__dict['MediaId'] = mediaId + + def send(self): + XmlForm = """ + + + + {CreateTime} + + + + + + """ + return XmlForm.format(**self.__dict) \ No newline at end of file diff --git a/channel/wechatmp/wechatmp_channel.py b/channel/wechatmp/wechatmp_channel.py new file mode 100644 index 0000000..67bad0d --- /dev/null +++ b/channel/wechatmp/wechatmp_channel.py @@ -0,0 +1,193 @@ +# -*- coding: utf-8 -*- +# filename: main.py +import web +import time +import hashlib +import textwrap +from channel.channel import Channel +import channel.wechatmp.reply as reply +import channel.wechatmp.receive as receive +from common.log import logger +from config import conf + + +class WechatMPServer(): + def __init__(self): + pass + + def startup(self): + urls = ( + '/wx', 'WechatMPChannel', + ) + app = web.application(urls, globals()) + app.run() + + +from concurrent.futures import ThreadPoolExecutor +thread_pool = ThreadPoolExecutor(max_workers=8) + +cache_dict = dict() +query1 = dict() +query2 = dict() +query3 = dict() + +class WechatMPChannel(Channel): + + def GET(self): + try: + data = web.input() + if len(data) == 0: + return "hello, this is handle view" + signature = data.signature + timestamp = data.timestamp + nonce = data.nonce + echostr = data.echostr + token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写 + + data_list = [token, timestamp, nonce] + data_list.sort() + sha1 = hashlib.sha1() + # map(sha1.update, data_list) #python2 + sha1.update("".join(data_list).encode('utf-8')) + hashcode = sha1.hexdigest() + print("handle/GET func: hashcode, signature: ", hashcode, signature) + if hashcode == signature: + return echostr + else: + return "" + except Exception as Argument: + return Argument + + + def _do_build_reply(self, cache_key, fromUser, message): + context = dict() + context['session_id'] = fromUser + reply_text = super().build_reply_content(message, context) + # The query is done, record the cache + logger.info("[threaded] Get reply_text for {}".format(message)) + global cache_dict + cache_dict[cache_key] = (1, reply_text) + + + def POST(self): + try: + queryTime = time.time() + webData = web.data() + # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8")) + recMsg = receive.parse_xml(webData) + if isinstance(recMsg, receive.Msg) and recMsg.MsgType == 'text': + fromUser = recMsg.FromUserName + toUser = recMsg.ToUserName + createTime = recMsg.CreateTime + message = recMsg.Content.decode("utf-8") + message_id = recMsg.MsgId + + logger.info("{}:{} [wechatmp] Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), fromUser, message_id, message)) + + + global cache_dict + global query1 + global query2 + global query3 + cache_key = fromUser + cache = cache_dict.get(cache_key) + + reply_text = "" + + # New request + if cache == None: + # The first query begin, reset the cache + cache_dict[cache_key] = (0, "") + thread_pool.submit(self._do_build_reply, cache_key, fromUser, message) + query1[cache_key] = False + query2[cache_key] = False + query3[cache_key] = False + # Request again + elif cache[0] == 0 and query1.get(cache_key) == True and query2.get(cache_key) == True and query3.get(cache_key) == True: + query1[cache_key] = False + query2[cache_key] = False + query3[cache_key] = False + elif cache[0] == 1: + reply_text = cache[1] + query1[cache_key] = True + query2[cache_key] = True + query3[cache_key] = True + + + cache = cache_dict.get(cache_key) + if query1.get(cache_key) == False: + # The first query from wechat official server + logger.debug("[wechatmp] query1 {}".format(cache_key)) + query1[cache_key] = True + cnt = 0 + while cache[0] == 0 and cnt < 45: + cnt = cnt + 1 + time.sleep(0.1) + cache = cache_dict.get(cache_key) + if cnt == 45: + # waiting for timeout (the POST query will be closed by wechat official server) + time.sleep(5) + # and do nothing + return + else: + reply_text = cache[1] + elif query2.get(cache_key) == False: + # The second query from wechat official server + logger.debug("[wechatmp] query2 {}".format(cache_key)) + query2[cache_key] = True + cnt = 0 + while cache[0] == 0 and cnt < 45: + cnt = cnt + 1 + time.sleep(0.1) + cache = cache_dict.get(cache_key) + if cnt == 45: + # waiting for timeout (the POST query will be closed by wechat official server) + time.sleep(5) + # and do nothing + return + else: + reply_text = cache[1] + elif query3.get(cache_key) == False: + # The third query from wechat official server + logger.debug("[wechatmp] query3 {}".format(cache_key)) + query3[cache_key] = True + cnt = 0 + while cache[0] == 0 and cnt < 45: + cnt = cnt + 1 + time.sleep(0.1) + cache = cache_dict.get(cache_key) + if cnt == 45: + # Have waiting for 3x5 seconds + # return timeout message + reply_text = "服务器有点忙,回复任意文字再次尝试。" + logger.info("[wechatmp] Three queries has finished For {}: {}".format(fromUser, message_id)) + replyPost = reply.TextMsg(fromUser, toUser, reply_text).send() + return replyPost + else: + reply_text = cache[1] + + if float(time.time()) - float(queryTime) > 4.8: + logger.info("[wechatmp] Timeout for {} {}".format(fromUser, message_id)) + return + + cache_dict.pop(cache_key) + logger.info("{}:{} [wechatmp] Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text)) + replyPost = reply.TextMsg(fromUser, toUser, reply_text).send() + return replyPost + + elif isinstance(recMsg, receive.Event) and recMsg.MsgType == 'event': + toUser = recMsg.FromUserName + fromUser = recMsg.ToUserName + content = textwrap.dedent("""\ + 感谢您的关注! + 这里是ChatGPT。 + 资源有限,回复较慢,请不要着急。 + """) + replyMsg = reply.TextMsg(toUser, fromUser, content) + return replyMsg.send() + else: + print("暂且不处理") + return "success" + except Exception as Argment: + print(Argment) + return Argment