# -*- coding: utf-8 -*- # filename: main.py import web import time import math import hashlib import textwrap from channel.channel import Channel import channel.wechatmp.reply as reply import channel.wechatmp.receive as receive from common.log import logger from config import conf class WechatMPServer(): def __init__(self): pass def startup(self): urls = ( '/wx', 'WechatMPChannel', ) app = web.application(urls, globals()) app.run() from concurrent.futures import ThreadPoolExecutor thread_pool = ThreadPoolExecutor(max_workers=8) cache_dict = dict() query1 = dict() query2 = dict() query3 = dict() class WechatMPChannel(Channel): def GET(self): try: data = web.input() if len(data) == 0: return "hello, this is handle view" signature = data.signature timestamp = data.timestamp nonce = data.nonce echostr = data.echostr token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写 data_list = [token, timestamp, nonce] data_list.sort() sha1 = hashlib.sha1() # map(sha1.update, data_list) #python2 sha1.update("".join(data_list).encode('utf-8')) hashcode = sha1.hexdigest() print("handle/GET func: hashcode, signature: ", hashcode, signature) if hashcode == signature: return echostr else: return "" except Exception as Argument: return Argument def _do_build_reply(self, cache_key, fromUser, message): context = dict() context['session_id'] = fromUser reply_text = super().build_reply_content(message, context) # The query is done, record the cache logger.info("[threaded] Get reply for {}: {} \nA: {}".format(fromUser, message, reply_text)) reply_cnt = math.ceil(len(reply_text) / 600) global cache_dict cache_dict[cache_key] = (reply_cnt, reply_text) def POST(self): try: queryTime = time.time() webData = web.data() # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8")) recMsg = receive.parse_xml(webData) if isinstance(recMsg, receive.Msg) and recMsg.MsgType == 'text': fromUser = recMsg.FromUserName toUser = recMsg.ToUserName createTime = recMsg.CreateTime message = recMsg.Content.decode("utf-8") message_id = recMsg.MsgId logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), fromUser, message_id, message)) global cache_dict global query1 global query2 global query3 cache_key = fromUser cache = cache_dict.get(cache_key) reply_text = "" # New request if cache == None: # The first query begin, reset the cache cache_dict[cache_key] = (0, "") thread_pool.submit(self._do_build_reply, cache_key, fromUser, message) query1[cache_key] = False query2[cache_key] = False query3[cache_key] = False # Request again elif cache[0] == 0 and query1.get(cache_key) == True and query2.get(cache_key) == True and query3.get(cache_key) == True: query1[cache_key] = False #To improve waiting experience, this can be set to True. query2[cache_key] = False #To improve waiting experience, this can be set to True. query3[cache_key] = False elif cache[0] >= 1: # Skip the waiting phase query1[cache_key] = True query2[cache_key] = True query3[cache_key] = True cache = cache_dict.get(cache_key) if query1.get(cache_key) == False: # The first query from wechat official server logger.debug("[wechatmp] query1 {}".format(cache_key)) query1[cache_key] = True cnt = 0 while cache[0] == 0 and cnt < 45: cnt = cnt + 1 time.sleep(0.1) cache = cache_dict.get(cache_key) if cnt == 45: # waiting for timeout (the POST query will be closed by wechat official server) time.sleep(5) # and do nothing return else: pass elif query2.get(cache_key) == False: # The second query from wechat official server logger.debug("[wechatmp] query2 {}".format(cache_key)) query2[cache_key] = True cnt = 0 while cache[0] == 0 and cnt < 45: cnt = cnt + 1 time.sleep(0.1) cache = cache_dict.get(cache_key) if cnt == 45: # waiting for timeout (the POST query will be closed by wechat official server) time.sleep(5) # and do nothing return else: pass elif query3.get(cache_key) == False: # The third query from wechat official server logger.debug("[wechatmp] query3 {}".format(cache_key)) query3[cache_key] = True cnt = 0 while cache[0] == 0 and cnt < 45: cnt = cnt + 1 time.sleep(0.1) cache = cache_dict.get(cache_key) if cnt == 45: # Have waiting for 3x5 seconds # return timeout message reply_text = "【服务器有点忙,回复任意文字再次尝试】" logger.info("[wechatmp] Three queries has finished For {}: {}".format(fromUser, message_id)) replyPost = reply.TextMsg(fromUser, toUser, reply_text).send() return replyPost else: pass if float(time.time()) - float(queryTime) > 4.8: logger.info("[wechatmp] Timeout for {} {}".format(fromUser, message_id)) return if cache[0] > 1: reply_text = cache[1][:600] + " 【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit cache_dict[cache_key] = (cache[0] - 1, cache[1][600:]) elif cache[0] == 1: reply_text = cache[1] cache_dict.pop(cache_key) logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text)) replyPost = reply.TextMsg(fromUser, toUser, reply_text).send() return replyPost elif isinstance(recMsg, receive.Event) and recMsg.MsgType == 'event': toUser = recMsg.FromUserName fromUser = recMsg.ToUserName content = textwrap.dedent("""\ 感谢您的关注! 这里是ChatGPT,可以自由对话。 资源有限,回复较慢,请不要着急。 暂时不支持图片输入输出,但是支持通用表情输入。""") replyMsg = reply.TextMsg(toUser, fromUser, content) return replyMsg.send() else: print("暂且不处理") return "success" except Exception as Argment: print(Argment) return Argment