Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

wechatmp_channel.py 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. # -*- coding: utf-8 -*-
  2. import web
  3. import time
  4. import math
  5. import hashlib
  6. import textwrap
  7. from channel.channel import Channel
  8. import channel.wechatmp.reply as reply
  9. import channel.wechatmp.receive as receive
  10. from common.log import logger
  11. from config import conf
  12. from bridge.reply import *
  13. from bridge.context import *
  14. from plugins import *
  15. import traceback
  16. # If using SSL, uncomment the following lines, and modify the certificate path.
  17. # from cheroot.server import HTTPServer
  18. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  19. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  20. # certificate='/ssl/cert.pem',
  21. # private_key='/ssl/cert.key')
  22. class WechatMPServer():
  23. def __init__(self):
  24. pass
  25. def startup(self):
  26. urls = (
  27. '/wx', 'WechatMPChannel',
  28. )
  29. app = web.application(urls, globals())
  30. web.httpserver.runsimple(app.wsgifunc(), ('0.0.0.0', 80))
  31. cache_dict = dict()
  32. query1 = dict()
  33. query2 = dict()
  34. query3 = dict()
  35. from concurrent.futures import ThreadPoolExecutor
  36. thread_pool = ThreadPoolExecutor(max_workers=8)
  37. class WechatMPChannel(Channel):
  38. def GET(self):
  39. try:
  40. data = web.input()
  41. if len(data) == 0:
  42. return "hello, this is handle view"
  43. signature = data.signature
  44. timestamp = data.timestamp
  45. nonce = data.nonce
  46. echostr = data.echostr
  47. token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写
  48. data_list = [token, timestamp, nonce]
  49. data_list.sort()
  50. sha1 = hashlib.sha1()
  51. # map(sha1.update, data_list) #python2
  52. sha1.update("".join(data_list).encode('utf-8'))
  53. hashcode = sha1.hexdigest()
  54. print("handle/GET func: hashcode, signature: ", hashcode, signature)
  55. if hashcode == signature:
  56. return echostr
  57. else:
  58. return ""
  59. except Exception as Argument:
  60. return Argument
  61. def _do_build_reply(self, cache_key, fromUser, message):
  62. context = dict()
  63. context['session_id'] = fromUser
  64. reply_text = super().build_reply_content(message, context)
  65. # The query is done, record the cache
  66. logger.info("[threaded] Get reply for {}: {} \nA: {}".format(fromUser, message, reply_text))
  67. global cache_dict
  68. reply_cnt = math.ceil(len(reply_text) / 600)
  69. cache_dict[cache_key] = (reply_cnt, reply_text)
  70. def send(self, reply : Reply, cache_key):
  71. global cache_dict
  72. reply_cnt = math.ceil(len(reply.content) / 600)
  73. cache_dict[cache_key] = (reply_cnt, reply.content)
  74. def handle(self, context):
  75. global cache_dict
  76. try:
  77. reply = Reply()
  78. logger.debug('[wechatmp] ready to handle context: {}'.format(context))
  79. # reply的构建步骤
  80. e_context = PluginManager().emit_event(EventContext(Event.ON_HANDLE_CONTEXT, {'channel' : self, 'context': context, 'reply': reply}))
  81. reply = e_context['reply']
  82. if not e_context.is_pass():
  83. logger.debug('[wechatmp] ready to handle context: type={}, content={}'.format(context.type, context.content))
  84. if context.type == ContextType.TEXT or context.type == ContextType.IMAGE_CREATE:
  85. reply = super().build_reply_content(context.content, context)
  86. # elif context.type == ContextType.VOICE:
  87. # msg = context['msg']
  88. # file_name = TmpDir().path() + context.content
  89. # msg.download(file_name)
  90. # reply = super().build_voice_to_text(file_name)
  91. # if reply.type != ReplyType.ERROR and reply.type != ReplyType.INFO:
  92. # context.content = reply.content # 语音转文字后,将文字内容作为新的context
  93. # context.type = ContextType.TEXT
  94. # reply = super().build_reply_content(context.content, context)
  95. # if reply.type == ReplyType.TEXT:
  96. # if conf().get('voice_reply_voice'):
  97. # reply = super().build_text_to_voice(reply.content)
  98. else:
  99. logger.error('[wechatmp] unknown context type: {}'.format(context.type))
  100. return
  101. logger.debug('[wechatmp] ready to decorate reply: {}'.format(reply))
  102. # reply的包装步骤
  103. if reply and reply.type:
  104. e_context = PluginManager().emit_event(EventContext(Event.ON_DECORATE_REPLY, {'channel' : self, 'context': context, 'reply': reply}))
  105. reply=e_context['reply']
  106. if not e_context.is_pass() and reply and reply.type:
  107. if reply.type == ReplyType.TEXT:
  108. pass
  109. elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
  110. reply.content = str(reply.type)+":\n" + reply.content
  111. elif reply.type == ReplyType.IMAGE_URL or reply.type == ReplyType.VOICE or reply.type == ReplyType.IMAGE:
  112. pass
  113. else:
  114. logger.error('[wechatmp] unknown reply type: {}'.format(reply.type))
  115. return
  116. # reply的发送步骤
  117. if reply and reply.type:
  118. e_context = PluginManager().emit_event(EventContext(Event.ON_SEND_REPLY, {'channel' : self, 'context': context, 'reply': reply}))
  119. reply=e_context['reply']
  120. if not e_context.is_pass() and reply and reply.type:
  121. logger.debug('[wechatmp] ready to send reply: {} to {}'.format(reply, context['receiver']))
  122. self.send(reply, context['receiver'])
  123. else:
  124. cache_dict[context['receiver']] = (1, "No reply")
  125. logger.info("[threaded] Get reply for {}: {} \nA: {}".format(context['receiver'], context.content, reply.content))
  126. except Exception as exc:
  127. print(traceback.format_exc())
  128. cache_dict[context['receiver']] = (1, "ERROR")
  129. def POST(self):
  130. try:
  131. queryTime = time.time()
  132. webData = web.data()
  133. # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8"))
  134. recMsg = receive.parse_xml(webData)
  135. if isinstance(recMsg, receive.Msg) and recMsg.MsgType == 'text':
  136. fromUser = recMsg.FromUserName
  137. toUser = recMsg.ToUserName
  138. createTime = recMsg.CreateTime
  139. message = recMsg.Content.decode("utf-8")
  140. message_id = recMsg.MsgId
  141. logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), fromUser, message_id, message))
  142. global cache_dict
  143. global query1
  144. global query2
  145. global query3
  146. cache_key = fromUser
  147. cache = cache_dict.get(cache_key)
  148. reply_text = ""
  149. # New request
  150. if cache == None:
  151. # The first query begin, reset the cache
  152. cache_dict[cache_key] = (0, "")
  153. # thread_pool.submit(self._do_build_reply, cache_key, fromUser, message)
  154. context = Context()
  155. context.kwargs = {'isgroup': False, 'receiver': fromUser, 'session_id': fromUser}
  156. user_data = conf().get_user_data(fromUser)
  157. context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key
  158. img_match_prefix = check_prefix(message, conf().get('image_create_prefix'))
  159. if img_match_prefix:
  160. message = message.replace(img_match_prefix, '', 1).strip()
  161. context.type = ContextType.IMAGE_CREATE
  162. else:
  163. context.type = ContextType.TEXT
  164. context.content = message
  165. thread_pool.submit(self.handle, context)
  166. query1[cache_key] = False
  167. query2[cache_key] = False
  168. query3[cache_key] = False
  169. # Request again
  170. elif cache[0] == 0 and query1.get(cache_key) == True and query2.get(cache_key) == True and query3.get(cache_key) == True:
  171. query1[cache_key] = False #To improve waiting experience, this can be set to True.
  172. query2[cache_key] = False #To improve waiting experience, this can be set to True.
  173. query3[cache_key] = False
  174. elif cache[0] >= 1:
  175. # Skip the waiting phase
  176. query1[cache_key] = True
  177. query2[cache_key] = True
  178. query3[cache_key] = True
  179. cache = cache_dict.get(cache_key)
  180. if query1.get(cache_key) == False:
  181. # The first query from wechat official server
  182. logger.debug("[wechatmp] query1 {}".format(cache_key))
  183. query1[cache_key] = True
  184. cnt = 0
  185. while cache[0] == 0 and cnt < 45:
  186. cnt = cnt + 1
  187. time.sleep(0.1)
  188. cache = cache_dict.get(cache_key)
  189. if cnt == 45:
  190. # waiting for timeout (the POST query will be closed by wechat official server)
  191. time.sleep(5)
  192. # and do nothing
  193. return
  194. else:
  195. pass
  196. elif query2.get(cache_key) == False:
  197. # The second query from wechat official server
  198. logger.debug("[wechatmp] query2 {}".format(cache_key))
  199. query2[cache_key] = True
  200. cnt = 0
  201. while cache[0] == 0 and cnt < 45:
  202. cnt = cnt + 1
  203. time.sleep(0.1)
  204. cache = cache_dict.get(cache_key)
  205. if cnt == 45:
  206. # waiting for timeout (the POST query will be closed by wechat official server)
  207. time.sleep(5)
  208. # and do nothing
  209. return
  210. else:
  211. pass
  212. elif query3.get(cache_key) == False:
  213. # The third query from wechat official server
  214. logger.debug("[wechatmp] query3 {}".format(cache_key))
  215. query3[cache_key] = True
  216. cnt = 0
  217. while cache[0] == 0 and cnt < 45:
  218. cnt = cnt + 1
  219. time.sleep(0.1)
  220. cache = cache_dict.get(cache_key)
  221. if cnt == 45:
  222. # Have waiting for 3x5 seconds
  223. # return timeout message
  224. reply_text = "【正在响应中,回复任意文字尝试获取回复】"
  225. logger.info("[wechatmp] Three queries has finished For {}: {}".format(fromUser, message_id))
  226. replyPost = reply.TextMsg(fromUser, toUser, reply_text).send()
  227. return replyPost
  228. else:
  229. pass
  230. if float(time.time()) - float(queryTime) > 4.8:
  231. logger.info("[wechatmp] Timeout for {} {}".format(fromUser, message_id))
  232. return
  233. if cache[0] > 1:
  234. reply_text = cache[1][:600] + "\n【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit
  235. cache_dict[cache_key] = (cache[0] - 1, cache[1][600:])
  236. elif cache[0] == 1:
  237. reply_text = cache[1]
  238. cache_dict.pop(cache_key)
  239. logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text))
  240. replyPost = reply.TextMsg(fromUser, toUser, reply_text).send()
  241. return replyPost
  242. elif isinstance(recMsg, receive.Event) and recMsg.MsgType == 'event':
  243. logger.info("[wechatmp] Event {} from {}".format(recMsg.Event, recMsg.FromUserName))
  244. content = textwrap.dedent("""\
  245. 感谢您的关注!
  246. 这里是ChatGPT,可以自由对话。
  247. 资源有限,回复较慢,请勿着急。
  248. 支持通用表情输入。
  249. 暂时不支持图片输入。
  250. 支持图片输出,画字开头的问题将回复图片链接。
  251. 支持角色扮演和文字冒险两种定制模式对话。
  252. 输入'#帮助' 查看详细指令。""")
  253. replyMsg = reply.TextMsg(recMsg.FromUserName, recMsg.ToUserName, content)
  254. return replyMsg.send()
  255. else:
  256. logger.info("暂且不处理")
  257. return "success"
  258. except Exception as exc:
  259. logger.exception(exc)
  260. return exc
  261. def check_prefix(content, prefix_list):
  262. for prefix in prefix_list:
  263. if content.startswith(prefix):
  264. return prefix
  265. return None