您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

wechatmp_channel.py 13KB

2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
2 年前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # -*- coding: utf-8 -*-
  2. import web
  3. import time
  4. import math
  5. import hashlib
  6. import textwrap
  7. from channel.channel import Channel
  8. import channel.wechatmp.reply as reply
  9. import channel.wechatmp.receive as receive
  10. from common.log import logger
  11. from config import conf
  12. from bridge.reply import *
  13. from bridge.context import *
  14. from plugins import *
  15. import traceback
  16. class WechatMPServer():
  17. def __init__(self):
  18. pass
  19. def startup(self):
  20. urls = (
  21. '/wx', 'WechatMPChannel',
  22. )
  23. app = web.application(urls, globals())
  24. app.run()
  25. cache_dict = dict()
  26. query1 = dict()
  27. query2 = dict()
  28. query3 = dict()
  29. from concurrent.futures import ThreadPoolExecutor
  30. thread_pool = ThreadPoolExecutor(max_workers=8)
  31. class WechatMPChannel(Channel):
  32. def GET(self):
  33. try:
  34. data = web.input()
  35. if len(data) == 0:
  36. return "hello, this is handle view"
  37. signature = data.signature
  38. timestamp = data.timestamp
  39. nonce = data.nonce
  40. echostr = data.echostr
  41. token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写
  42. data_list = [token, timestamp, nonce]
  43. data_list.sort()
  44. sha1 = hashlib.sha1()
  45. # map(sha1.update, data_list) #python2
  46. sha1.update("".join(data_list).encode('utf-8'))
  47. hashcode = sha1.hexdigest()
  48. print("handle/GET func: hashcode, signature: ", hashcode, signature)
  49. if hashcode == signature:
  50. return echostr
  51. else:
  52. return ""
  53. except Exception as Argument:
  54. return Argument
  55. def _do_build_reply(self, cache_key, fromUser, message):
  56. context = dict()
  57. context['session_id'] = fromUser
  58. reply_text = super().build_reply_content(message, context)
  59. # The query is done, record the cache
  60. logger.info("[threaded] Get reply for {}: {} \nA: {}".format(fromUser, message, reply_text))
  61. global cache_dict
  62. reply_cnt = math.ceil(len(reply_text) / 600)
  63. cache_dict[cache_key] = (reply_cnt, reply_text)
  64. def send(self, reply : Reply, cache_key):
  65. global cache_dict
  66. reply_cnt = math.ceil(len(reply.content) / 600)
  67. cache_dict[cache_key] = (reply_cnt, reply.content)
  68. def handle(self, context):
  69. global cache_dict
  70. try:
  71. reply = Reply()
  72. logger.debug('[wechatmp] ready to handle context: {}'.format(context))
  73. # reply的构建步骤
  74. e_context = PluginManager().emit_event(EventContext(Event.ON_HANDLE_CONTEXT, {'channel' : self, 'context': context, 'reply': reply}))
  75. reply = e_context['reply']
  76. if not e_context.is_pass():
  77. logger.debug('[wechatmp] ready to handle context: type={}, content={}'.format(context.type, context.content))
  78. if context.type == ContextType.TEXT or context.type == ContextType.IMAGE_CREATE:
  79. reply = super().build_reply_content(context.content, context)
  80. # elif context.type == ContextType.VOICE:
  81. # msg = context['msg']
  82. # file_name = TmpDir().path() + context.content
  83. # msg.download(file_name)
  84. # reply = super().build_voice_to_text(file_name)
  85. # if reply.type != ReplyType.ERROR and reply.type != ReplyType.INFO:
  86. # context.content = reply.content # 语音转文字后,将文字内容作为新的context
  87. # context.type = ContextType.TEXT
  88. # reply = super().build_reply_content(context.content, context)
  89. # if reply.type == ReplyType.TEXT:
  90. # if conf().get('voice_reply_voice'):
  91. # reply = super().build_text_to_voice(reply.content)
  92. else:
  93. logger.error('[wechatmp] unknown context type: {}'.format(context.type))
  94. return
  95. logger.debug('[wechatmp] ready to decorate reply: {}'.format(reply))
  96. # reply的包装步骤
  97. if reply and reply.type:
  98. e_context = PluginManager().emit_event(EventContext(Event.ON_DECORATE_REPLY, {'channel' : self, 'context': context, 'reply': reply}))
  99. reply=e_context['reply']
  100. if not e_context.is_pass() and reply and reply.type:
  101. if reply.type == ReplyType.TEXT:
  102. pass
  103. elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
  104. reply.content = str(reply.type)+":\n" + reply.content
  105. elif reply.type == ReplyType.IMAGE_URL or reply.type == ReplyType.VOICE or reply.type == ReplyType.IMAGE:
  106. pass
  107. else:
  108. logger.error('[wechatmp] unknown reply type: {}'.format(reply.type))
  109. return
  110. # reply的发送步骤
  111. if reply and reply.type:
  112. e_context = PluginManager().emit_event(EventContext(Event.ON_SEND_REPLY, {'channel' : self, 'context': context, 'reply': reply}))
  113. reply=e_context['reply']
  114. if not e_context.is_pass() and reply and reply.type:
  115. logger.debug('[wechatmp] ready to send reply: {} to {}'.format(reply, context['receiver']))
  116. self.send(reply, context['receiver'])
  117. else:
  118. cache_dict[context['receiver']] = (1, "No reply")
  119. except Exception as exc:
  120. print(traceback.format_exc())
  121. cache_dict[context['receiver']] = (1, "ERROR")
  122. def POST(self):
  123. try:
  124. queryTime = time.time()
  125. webData = web.data()
  126. # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8"))
  127. recMsg = receive.parse_xml(webData)
  128. if isinstance(recMsg, receive.Msg) and recMsg.MsgType == 'text':
  129. fromUser = recMsg.FromUserName
  130. toUser = recMsg.ToUserName
  131. createTime = recMsg.CreateTime
  132. message = recMsg.Content.decode("utf-8")
  133. message_id = recMsg.MsgId
  134. logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), fromUser, message_id, message))
  135. global cache_dict
  136. global query1
  137. global query2
  138. global query3
  139. cache_key = fromUser
  140. cache = cache_dict.get(cache_key)
  141. reply_text = ""
  142. # New request
  143. if cache == None:
  144. # The first query begin, reset the cache
  145. cache_dict[cache_key] = (0, "")
  146. # thread_pool.submit(self._do_build_reply, cache_key, fromUser, message)
  147. context = Context()
  148. context.kwargs = {'isgroup': False, 'receiver': fromUser, 'session_id': fromUser}
  149. img_match_prefix = check_prefix(message, conf().get('image_create_prefix'))
  150. if img_match_prefix:
  151. message = message.replace(img_match_prefix, '', 1).strip()
  152. context.type = ContextType.IMAGE_CREATE
  153. else:
  154. context.type = ContextType.TEXT
  155. context.content = message
  156. thread_pool.submit(self.handle, context)
  157. query1[cache_key] = False
  158. query2[cache_key] = False
  159. query3[cache_key] = False
  160. # Request again
  161. elif cache[0] == 0 and query1.get(cache_key) == True and query2.get(cache_key) == True and query3.get(cache_key) == True:
  162. query1[cache_key] = False #To improve waiting experience, this can be set to True.
  163. query2[cache_key] = False #To improve waiting experience, this can be set to True.
  164. query3[cache_key] = False
  165. elif cache[0] >= 1:
  166. # Skip the waiting phase
  167. query1[cache_key] = True
  168. query2[cache_key] = True
  169. query3[cache_key] = True
  170. cache = cache_dict.get(cache_key)
  171. if query1.get(cache_key) == False:
  172. # The first query from wechat official server
  173. logger.debug("[wechatmp] query1 {}".format(cache_key))
  174. query1[cache_key] = True
  175. cnt = 0
  176. while cache[0] == 0 and cnt < 45:
  177. cnt = cnt + 1
  178. time.sleep(0.1)
  179. cache = cache_dict.get(cache_key)
  180. if cnt == 45:
  181. # waiting for timeout (the POST query will be closed by wechat official server)
  182. time.sleep(5)
  183. # and do nothing
  184. return
  185. else:
  186. pass
  187. elif query2.get(cache_key) == False:
  188. # The second query from wechat official server
  189. logger.debug("[wechatmp] query2 {}".format(cache_key))
  190. query2[cache_key] = True
  191. cnt = 0
  192. while cache[0] == 0 and cnt < 45:
  193. cnt = cnt + 1
  194. time.sleep(0.1)
  195. cache = cache_dict.get(cache_key)
  196. if cnt == 45:
  197. # waiting for timeout (the POST query will be closed by wechat official server)
  198. time.sleep(5)
  199. # and do nothing
  200. return
  201. else:
  202. pass
  203. elif query3.get(cache_key) == False:
  204. # The third query from wechat official server
  205. logger.debug("[wechatmp] query3 {}".format(cache_key))
  206. query3[cache_key] = True
  207. cnt = 0
  208. while cache[0] == 0 and cnt < 45:
  209. cnt = cnt + 1
  210. time.sleep(0.1)
  211. cache = cache_dict.get(cache_key)
  212. if cnt == 45:
  213. # Have waiting for 3x5 seconds
  214. # return timeout message
  215. reply_text = "【服务器有点忙,回复任意文字再次尝试】"
  216. logger.info("[wechatmp] Three queries has finished For {}: {}".format(fromUser, message_id))
  217. replyPost = reply.TextMsg(fromUser, toUser, reply_text).send()
  218. return replyPost
  219. else:
  220. pass
  221. if float(time.time()) - float(queryTime) > 4.8:
  222. logger.info("[wechatmp] Timeout for {} {}".format(fromUser, message_id))
  223. return
  224. if cache[0] > 1:
  225. reply_text = cache[1][:600] + " 【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit
  226. cache_dict[cache_key] = (cache[0] - 1, cache[1][600:])
  227. elif cache[0] == 1:
  228. reply_text = cache[1]
  229. cache_dict.pop(cache_key)
  230. logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text))
  231. replyPost = reply.TextMsg(fromUser, toUser, reply_text).send()
  232. return replyPost
  233. elif isinstance(recMsg, receive.Event) and recMsg.MsgType == 'event':
  234. logger.info("[wechatmp] Event {} from {}".format(recMsg.Event, recMsg.FromUserName))
  235. content = textwrap.dedent("""\
  236. 感谢您的关注!
  237. 这里是ChatGPT,可以自由对话。
  238. 资源有限,回复较慢,请勿着急。
  239. 支持通用表情输入。
  240. 暂时不支持图片输入。
  241. 支持图片输出,画字开头的问题将回复图片链接。
  242. 支持角色扮演和文字冒险两种定制模式对话。
  243. 输入'#帮助' 查看详细指令。""")
  244. replyMsg = reply.TextMsg(recMsg.FromUserName, recMsg.ToUserName, content)
  245. return replyMsg.send()
  246. else:
  247. print("暂且不处理")
  248. return "success"
  249. except Exception as exc:
  250. print(exc)
  251. return exc
  252. def check_prefix(content, prefix_list):
  253. for prefix in prefix_list:
  254. if content.startswith(prefix):
  255. return prefix
  256. return None