You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

254 line
11KB

  1. # -*- coding: utf-8 -*-
  2. import web
  3. import time
  4. import math
  5. import hashlib
  6. import textwrap
  7. from channel.chat_channel import ChatChannel
  8. import channel.wechatmp.reply as reply
  9. import channel.wechatmp.receive as receive
  10. from common.singleton import singleton
  11. from common.log import logger
  12. from config import conf
  13. from bridge.reply import *
  14. from bridge.context import *
  15. from plugins import *
  16. import traceback
  17. # If using SSL, uncomment the following lines, and modify the certificate path.
  18. # from cheroot.server import HTTPServer
  19. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  20. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  21. # certificate='/ssl/cert.pem',
  22. # private_key='/ssl/cert.key')
  23. # from concurrent.futures import ThreadPoolExecutor
  24. # thread_pool = ThreadPoolExecutor(max_workers=8)
  25. MAX_UTF8_LEN = 2048
  26. @singleton
  27. class WechatMPChannel(ChatChannel):
  28. NOT_SUPPORT_REPLYTYPE = [ReplyType.IMAGE, ReplyType.VOICE]
  29. def __init__(self):
  30. super().__init__()
  31. self.cache_dict = dict()
  32. self.running = set()
  33. self.query1 = dict()
  34. self.query2 = dict()
  35. self.query3 = dict()
  36. def startup(self):
  37. urls = (
  38. '/wx', 'SubsribeAccountQuery',
  39. )
  40. app = web.application(urls, globals(), autoreload=False)
  41. port = conf().get('wechatmp_port', 8080)
  42. web.httpserver.runsimple(app.wsgifunc(), ('0.0.0.0', port))
  43. def send(self, reply: Reply, context: Context):
  44. receiver = context["receiver"]
  45. self.cache_dict[receiver] = reply.content
  46. self.running.remove(receiver)
  47. logger.debug("[send] reply to {} saved to cache: {}".format(receiver, reply))
  48. def _fail_callback(self, session_id, exception, context, **kwargs):
  49. logger.exception("[wechatmp] Fail to generation message to user, msgId={}, exception={}".format(context['msg'].msg_id, exception))
  50. assert session_id not in self.cache_dict
  51. self.running.remove(session_id)
  52. def verify_server():
  53. try:
  54. data = web.input()
  55. if len(data) == 0:
  56. return "None"
  57. signature = data.signature
  58. timestamp = data.timestamp
  59. nonce = data.nonce
  60. echostr = data.echostr
  61. token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写
  62. data_list = [token, timestamp, nonce]
  63. data_list.sort()
  64. sha1 = hashlib.sha1()
  65. # map(sha1.update, data_list) #python2
  66. sha1.update("".join(data_list).encode('utf-8'))
  67. hashcode = sha1.hexdigest()
  68. print("handle/GET func: hashcode, signature: ", hashcode, signature)
  69. if hashcode == signature:
  70. return echostr
  71. else:
  72. return ""
  73. except Exception as Argument:
  74. return Argument
  75. # This class is instantiated once per query
  76. class SubsribeAccountQuery():
  77. def GET(self):
  78. return verify_server()
  79. def POST(self):
  80. channel = WechatMPChannel()
  81. try:
  82. query_time = time.time()
  83. webData = web.data()
  84. logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8"))
  85. wechat_msg = receive.parse_xml(webData)
  86. if wechat_msg.msg_type == 'text':
  87. from_user = wechat_msg.from_user_id
  88. to_user = wechat_msg.to_user_id
  89. message = wechat_msg.content.decode("utf-8")
  90. message_id = wechat_msg.msg_id
  91. logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message))
  92. cache_key = from_user
  93. reply_text = ""
  94. # New request
  95. if cache_key not in channel.cache_dict and cache_key not in channel.running:
  96. # The first query begin, reset the cache
  97. context = channel._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechat_msg)
  98. logger.debug("[wechatmp] context: {} {}".format(context, wechat_msg))
  99. if context:
  100. # set private openai_api_key
  101. # if from_user is not changed in itchat, this can be placed at chat_channel
  102. user_data = conf().get_user_data(from_user)
  103. context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key
  104. channel.running.add(cache_key)
  105. channel.produce(context)
  106. else:
  107. trigger_prefix = conf().get('single_chat_prefix',[''])[0]
  108. if trigger_prefix:
  109. content = textwrap.dedent(f"""\
  110. 请输入'{trigger_prefix}'接你想说的话跟我说话。
  111. 例如:
  112. {trigger_prefix}你好,很高兴见到你。""")
  113. else:
  114. logger.error(f"[wechatmp] unknown error")
  115. content = textwrap.dedent("""\
  116. 未知错误,请稍后再试""")
  117. replyMsg = reply.TextMsg(wechat_msg.from_user_id, wechat_msg.to_user_id, content)
  118. return replyMsg.send()
  119. channel.query1[cache_key] = False
  120. channel.query2[cache_key] = False
  121. channel.query3[cache_key] = False
  122. # Request again
  123. elif cache_key in channel.running and channel.query1.get(cache_key) == True and channel.query2.get(cache_key) == True and channel.query3.get(cache_key) == True:
  124. channel.query1[cache_key] = False #To improve waiting experience, this can be set to True.
  125. channel.query2[cache_key] = False #To improve waiting experience, this can be set to True.
  126. channel.query3[cache_key] = False
  127. elif cache_key in channel.cache_dict:
  128. # Skip the waiting phase
  129. channel.query1[cache_key] = True
  130. channel.query2[cache_key] = True
  131. channel.query3[cache_key] = True
  132. if channel.query1.get(cache_key) == False:
  133. # The first query from wechat official server
  134. logger.debug("[wechatmp] query1 {}".format(cache_key))
  135. channel.query1[cache_key] = True
  136. cnt = 0
  137. while cache_key not in channel.cache_dict and cnt < 45:
  138. cnt = cnt + 1
  139. time.sleep(0.1)
  140. if cnt == 45:
  141. # waiting for timeout (the POST query will be closed by wechat official server)
  142. time.sleep(5)
  143. # and do nothing
  144. return
  145. else:
  146. pass
  147. elif channel.query2.get(cache_key) == False:
  148. # The second query from wechat official server
  149. logger.debug("[wechatmp] query2 {}".format(cache_key))
  150. channel.query2[cache_key] = True
  151. cnt = 0
  152. while cache_key not in channel.cache_dict and cnt < 45:
  153. cnt = cnt + 1
  154. time.sleep(0.1)
  155. if cnt == 45:
  156. # waiting for timeout (the POST query will be closed by wechat official server)
  157. time.sleep(5)
  158. # and do nothing
  159. return
  160. else:
  161. pass
  162. elif channel.query3.get(cache_key) == False:
  163. # The third query from wechat official server
  164. logger.debug("[wechatmp] query3 {}".format(cache_key))
  165. channel.query3[cache_key] = True
  166. cnt = 0
  167. while cache_key not in channel.cache_dict and cnt < 40:
  168. cnt = cnt + 1
  169. time.sleep(0.1)
  170. if cnt == 40:
  171. # Have waiting for 3x5 seconds
  172. # return timeout message
  173. reply_text = "【正在思考中,回复任意文字尝试获取回复】"
  174. logger.info("[wechatmp] Three queries has finished For {}: {}".format(from_user, message_id))
  175. replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
  176. return replyPost
  177. else:
  178. pass
  179. if float(time.time()) - float(query_time) > 4.8:
  180. logger.info("[wechatmp] Timeout for {} {}".format(from_user, message_id))
  181. time.sleep(1)
  182. return
  183. if cache_key in channel.cache_dict:
  184. content = channel.cache_dict[cache_key]
  185. if len(content.encode('utf8'))<=MAX_UTF8_LEN:
  186. reply_text = channel.cache_dict[cache_key]
  187. channel.cache_dict.pop(cache_key)
  188. else:
  189. continue_text = "\n【未完待续,回复任意文字以继续】"
  190. splits = split_string_by_utf8_length(content, MAX_UTF8_LEN - len(continue_text.encode('utf-8')))
  191. reply_text = splits[0] + continue_text
  192. channel.cache_dict[cache_key] = splits[1]
  193. logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text))
  194. replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
  195. return replyPost
  196. elif wechat_msg.msg_type == 'event':
  197. logger.info("[wechatmp] Event {} from {}".format(wechat_msg.Event, wechat_msg.from_user_id))
  198. trigger_prefix = conf().get('single_chat_prefix',[''])[0]
  199. content = textwrap.dedent(f"""\
  200. 感谢您的关注!
  201. 这里是ChatGPT,可以自由对话。
  202. 资源有限,回复较慢,请勿着急。
  203. 支持通用表情输入。
  204. 暂时不支持图片输入。
  205. 支持图片输出,画字开头的问题将回复图片链接。
  206. 支持角色扮演和文字冒险两种定制模式对话。
  207. 输入'{trigger_prefix}#帮助' 查看详细指令。""")
  208. replyMsg = reply.TextMsg(wechat_msg.from_user_id, wechat_msg.to_user_id, content)
  209. return replyMsg.send()
  210. else:
  211. logger.info("暂且不处理")
  212. return "success"
  213. except Exception as exc:
  214. logger.exception(exc)
  215. return exc
  216. def split_string_by_utf8_length(string, max_length, max_split=0):
  217. encoded = string.encode('utf-8')
  218. start, end = 0, 0
  219. result = []
  220. while end < len(encoded):
  221. if max_split > 0 and len(result) >= max_split:
  222. result.append(encoded[start:].decode('utf-8'))
  223. break
  224. end = start + max_length
  225. # 如果当前字节不是 UTF-8 编码的开始字节,则向前查找直到找到开始字节为止
  226. while end < len(encoded) and (encoded[end] & 0b11000000) == 0b10000000:
  227. end -= 1
  228. result.append(encoded[start:end].decode('utf-8'))
  229. start = end
  230. return result