Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

308 lines
13KB

  1. # -*- coding: utf-8 -*-
  2. import web
  3. import time
  4. import math
  5. import hashlib
  6. import textwrap
  7. from channel.channel import Channel
  8. import channel.wechatmp.reply as reply
  9. import channel.wechatmp.receive as receive
  10. from common.log import logger
  11. from config import conf
  12. from bridge.reply import *
  13. from bridge.context import *
  14. from plugins import *
  15. import traceback
  16. import redis
  17. # If using SSL, uncomment the following lines, and modify the certificate path.
  18. # from cheroot.server import HTTPServer
  19. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  20. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  21. # certificate='/ssl/cert.pem',
  22. # private_key='/ssl/cert.key')
  23. class WechatMPServer():
  24. def __init__(self):
  25. pass
  26. def startup(self):
  27. urls = (
  28. '/wx', 'WechatMPChannel',
  29. )
  30. app = web.application(urls, globals())
  31. app.run()
  32. cache_dict = dict()
  33. query1 = dict()
  34. query2 = dict()
  35. query3 = dict()
  36. from concurrent.futures import ThreadPoolExecutor
  37. thread_pool = ThreadPoolExecutor(max_workers=8)
  38. class WechatMPChannel(Channel):
  39. def GET(self):
  40. try:
  41. data = web.input()
  42. if len(data) == 0:
  43. return "hello, this is handle view"
  44. signature = data.signature
  45. timestamp = data.timestamp
  46. nonce = data.nonce
  47. echostr = data.echostr
  48. token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写
  49. data_list = [token, timestamp, nonce]
  50. data_list.sort()
  51. sha1 = hashlib.sha1()
  52. # map(sha1.update, data_list) #python2
  53. sha1.update("".join(data_list).encode('utf-8'))
  54. hashcode = sha1.hexdigest()
  55. print("handle/GET func: hashcode, signature: ", hashcode, signature)
  56. if hashcode == signature:
  57. return echostr
  58. else:
  59. return ""
  60. except Exception as Argument:
  61. return Argument
  62. def _do_build_reply(self, cache_key, fromUser, message):
  63. context = dict()
  64. context['session_id'] = fromUser
  65. reply_text = super().build_reply_content(message, context)
  66. # The query is done, record the cache
  67. logger.info("[threaded] Get reply for {}: {} \nA: {}".format(fromUser, message, reply_text))
  68. global cache_dict
  69. reply_cnt = math.ceil(len(reply_text) / 600)
  70. cache_dict[cache_key] = (reply_cnt, reply_text)
  71. def send(self, reply : Reply, cache_key):
  72. global cache_dict
  73. reply_cnt = math.ceil(len(reply.content) / 600)
  74. cache_dict[cache_key] = (reply_cnt, reply.content)
  75. def handle(self, context):
  76. global cache_dict
  77. try:
  78. reply = Reply()
  79. logger.debug('[wechatmp] ready to handle context: {}'.format(context))
  80. # reply的构建步骤
  81. e_context = PluginManager().emit_event(EventContext(Event.ON_HANDLE_CONTEXT, {'channel' : self, 'context': context, 'reply': reply}))
  82. reply = e_context['reply']
  83. if not e_context.is_pass():
  84. logger.debug('[wechatmp] ready to handle context: type={}, content={}'.format(context.type, context.content))
  85. if context.type == ContextType.TEXT or context.type == ContextType.IMAGE_CREATE:
  86. reply = super().build_reply_content(context.content, context)
  87. # elif context.type == ContextType.VOICE:
  88. # msg = context['msg']
  89. # file_name = TmpDir().path() + context.content
  90. # msg.download(file_name)
  91. # reply = super().build_voice_to_text(file_name)
  92. # if reply.type != ReplyType.ERROR and reply.type != ReplyType.INFO:
  93. # context.content = reply.content # 语音转文字后,将文字内容作为新的context
  94. # context.type = ContextType.TEXT
  95. # reply = super().build_reply_content(context.content, context)
  96. # if reply.type == ReplyType.TEXT:
  97. # if conf().get('voice_reply_voice'):
  98. # reply = super().build_text_to_voice(reply.content)
  99. else:
  100. logger.error('[wechatmp] unknown context type: {}'.format(context.type))
  101. return
  102. logger.debug('[wechatmp] ready to decorate reply: {}'.format(reply))
  103. # reply的包装步骤
  104. if reply and reply.type:
  105. e_context = PluginManager().emit_event(EventContext(Event.ON_DECORATE_REPLY, {'channel' : self, 'context': context, 'reply': reply}))
  106. reply=e_context['reply']
  107. if not e_context.is_pass() and reply and reply.type:
  108. if reply.type == ReplyType.TEXT:
  109. pass
  110. elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
  111. reply.content = str(reply.type)+":\n" + reply.content
  112. elif reply.type == ReplyType.IMAGE_URL or reply.type == ReplyType.VOICE or reply.type == ReplyType.IMAGE:
  113. pass
  114. else:
  115. logger.error('[wechatmp] unknown reply type: {}'.format(reply.type))
  116. return
  117. # reply的发送步骤
  118. if reply and reply.type:
  119. e_context = PluginManager().emit_event(EventContext(Event.ON_SEND_REPLY, {'channel' : self, 'context': context, 'reply': reply}))
  120. reply=e_context['reply']
  121. if not e_context.is_pass() and reply and reply.type:
  122. logger.debug('[wechatmp] ready to send reply: {} to {}'.format(reply, context['receiver']))
  123. self.send(reply, context['receiver'])
  124. else:
  125. cache_dict[context['receiver']] = (1, "No reply")
  126. logger.info("[threaded] Get reply for {}: {} \nA: {}".format(context['receiver'], context.content, reply.content))
  127. except Exception as exc:
  128. print(traceback.format_exc())
  129. cache_dict[context['receiver']] = (1, "ERROR")
  130. def POST(self):
  131. try:
  132. queryTime = time.time()
  133. webData = web.data()
  134. # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8"))
  135. recMsg = receive.parse_xml(webData)
  136. if isinstance(recMsg, receive.Msg) and recMsg.MsgType == 'text':
  137. fromUser = recMsg.FromUserName
  138. toUser = recMsg.ToUserName
  139. createTime = recMsg.CreateTime
  140. message = recMsg.Content.decode("utf-8")
  141. message_id = recMsg.MsgId
  142. logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), fromUser, message_id, message))
  143. global cache_dict
  144. global query1
  145. global query2
  146. global query3
  147. cache_key = fromUser
  148. cache = cache_dict.get(cache_key)
  149. reply_text = ""
  150. # New request
  151. if cache == None:
  152. # The first query begin, reset the cache
  153. cache_dict[cache_key] = (0, "")
  154. # thread_pool.submit(self._do_build_reply, cache_key, fromUser, message)
  155. context = Context()
  156. context.kwargs = {'isgroup': False, 'receiver': fromUser, 'session_id': fromUser}
  157. R = redis.Redis(host='localhost', port=6379, db=0)
  158. user_openai_api_key = "openai_api_key_" + fromUser
  159. api_key = R.get(user_openai_api_key)
  160. if api_key != None:
  161. api_key = api_key.decode("utf-8")
  162. context['openai_api_key'] = api_key # None or user openai_api_key
  163. img_match_prefix = check_prefix(message, conf().get('image_create_prefix'))
  164. if img_match_prefix:
  165. message = message.replace(img_match_prefix, '', 1).strip()
  166. context.type = ContextType.IMAGE_CREATE
  167. else:
  168. context.type = ContextType.TEXT
  169. context.content = message
  170. thread_pool.submit(self.handle, context)
  171. query1[cache_key] = False
  172. query2[cache_key] = False
  173. query3[cache_key] = False
  174. # Request again
  175. elif cache[0] == 0 and query1.get(cache_key) == True and query2.get(cache_key) == True and query3.get(cache_key) == True:
  176. query1[cache_key] = False #To improve waiting experience, this can be set to True.
  177. query2[cache_key] = False #To improve waiting experience, this can be set to True.
  178. query3[cache_key] = False
  179. elif cache[0] >= 1:
  180. # Skip the waiting phase
  181. query1[cache_key] = True
  182. query2[cache_key] = True
  183. query3[cache_key] = True
  184. cache = cache_dict.get(cache_key)
  185. if query1.get(cache_key) == False:
  186. # The first query from wechat official server
  187. logger.debug("[wechatmp] query1 {}".format(cache_key))
  188. query1[cache_key] = True
  189. cnt = 0
  190. while cache[0] == 0 and cnt < 45:
  191. cnt = cnt + 1
  192. time.sleep(0.1)
  193. cache = cache_dict.get(cache_key)
  194. if cnt == 45:
  195. # waiting for timeout (the POST query will be closed by wechat official server)
  196. time.sleep(5)
  197. # and do nothing
  198. return
  199. else:
  200. pass
  201. elif query2.get(cache_key) == False:
  202. # The second query from wechat official server
  203. logger.debug("[wechatmp] query2 {}".format(cache_key))
  204. query2[cache_key] = True
  205. cnt = 0
  206. while cache[0] == 0 and cnt < 45:
  207. cnt = cnt + 1
  208. time.sleep(0.1)
  209. cache = cache_dict.get(cache_key)
  210. if cnt == 45:
  211. # waiting for timeout (the POST query will be closed by wechat official server)
  212. time.sleep(5)
  213. # and do nothing
  214. return
  215. else:
  216. pass
  217. elif query3.get(cache_key) == False:
  218. # The third query from wechat official server
  219. logger.debug("[wechatmp] query3 {}".format(cache_key))
  220. query3[cache_key] = True
  221. cnt = 0
  222. while cache[0] == 0 and cnt < 45:
  223. cnt = cnt + 1
  224. time.sleep(0.1)
  225. cache = cache_dict.get(cache_key)
  226. if cnt == 45:
  227. # Have waiting for 3x5 seconds
  228. # return timeout message
  229. reply_text = "【正在响应中,回复任意文字尝试获取回复】"
  230. logger.info("[wechatmp] Three queries has finished For {}: {}".format(fromUser, message_id))
  231. replyPost = reply.TextMsg(fromUser, toUser, reply_text).send()
  232. return replyPost
  233. else:
  234. pass
  235. if float(time.time()) - float(queryTime) > 4.8:
  236. logger.info("[wechatmp] Timeout for {} {}".format(fromUser, message_id))
  237. return
  238. if cache[0] > 1:
  239. reply_text = cache[1][:600] + " 【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit
  240. cache_dict[cache_key] = (cache[0] - 1, cache[1][600:])
  241. elif cache[0] == 1:
  242. reply_text = cache[1]
  243. cache_dict.pop(cache_key)
  244. logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text))
  245. replyPost = reply.TextMsg(fromUser, toUser, reply_text).send()
  246. return replyPost
  247. elif isinstance(recMsg, receive.Event) and recMsg.MsgType == 'event':
  248. logger.info("[wechatmp] Event {} from {}".format(recMsg.Event, recMsg.FromUserName))
  249. content = textwrap.dedent("""\
  250. 感谢您的关注!
  251. 这里是ChatGPT,可以自由对话。
  252. 资源有限,回复较慢,请勿着急。
  253. 支持通用表情输入。
  254. 暂时不支持图片输入。
  255. 支持图片输出,画字开头的问题将回复图片链接。
  256. 支持角色扮演和文字冒险两种定制模式对话。
  257. 输入'#帮助' 查看详细指令。""")
  258. replyMsg = reply.TextMsg(recMsg.FromUserName, recMsg.ToUserName, content)
  259. return replyMsg.send()
  260. else:
  261. print("暂且不处理")
  262. return "success"
  263. except Exception as exc:
  264. print(exc)
  265. return exc
  266. def check_prefix(content, prefix_list):
  267. for prefix in prefix_list:
  268. if content.startswith(prefix):
  269. return prefix
  270. return None