You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 line
11KB

  1. # -*- coding: utf-8 -*-
  2. import web
  3. import time
  4. import math
  5. import hashlib
  6. import textwrap
  7. from channel.chat_channel import ChatChannel
  8. import channel.wechatmp.reply as reply
  9. import channel.wechatmp.receive as receive
  10. from common.singleton import singleton
  11. from common.log import logger
  12. from config import conf
  13. from bridge.reply import *
  14. from bridge.context import *
  15. from plugins import *
  16. import traceback
  17. # If using SSL, uncomment the following lines, and modify the certificate path.
  18. # from cheroot.server import HTTPServer
  19. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  20. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  21. # certificate='/ssl/cert.pem',
  22. # private_key='/ssl/cert.key')
  23. # from concurrent.futures import ThreadPoolExecutor
  24. # thread_pool = ThreadPoolExecutor(max_workers=8)
  25. @singleton
  26. class WechatMPChannel(ChatChannel):
  27. NOT_SUPPORT_REPLYTYPE = [ReplyType.IMAGE, ReplyType.VOICE]
  28. def __init__(self):
  29. super().__init__()
  30. self.cache_dict = dict()
  31. self.query1 = dict()
  32. self.query2 = dict()
  33. self.query3 = dict()
  34. def startup(self):
  35. urls = (
  36. '/wx', 'SubsribeAccountQuery',
  37. )
  38. app = web.application(urls, globals(), autoreload=False)
  39. port = conf().get('wechatmp_port', 8080)
  40. web.httpserver.runsimple(app.wsgifunc(), ('0.0.0.0', port))
  41. def send(self, reply: Reply, context: Context):
  42. reply_cnt = math.ceil(len(reply.content) / 600)
  43. receiver = context["receiver"]
  44. self.cache_dict[receiver] = (reply_cnt, reply.content)
  45. logger.debug("[send] reply to {} saved to cache: {}".format(receiver, reply))
  46. def verify_server():
  47. try:
  48. data = web.input()
  49. if len(data) == 0:
  50. return "None"
  51. signature = data.signature
  52. timestamp = data.timestamp
  53. nonce = data.nonce
  54. echostr = data.echostr
  55. token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写
  56. data_list = [token, timestamp, nonce]
  57. data_list.sort()
  58. sha1 = hashlib.sha1()
  59. # map(sha1.update, data_list) #python2
  60. sha1.update("".join(data_list).encode('utf-8'))
  61. hashcode = sha1.hexdigest()
  62. print("handle/GET func: hashcode, signature: ", hashcode, signature)
  63. if hashcode == signature:
  64. return echostr
  65. else:
  66. return ""
  67. except Exception as Argument:
  68. return Argument
  69. # This class is instantiated once per query
  70. class SubsribeAccountQuery():
  71. def GET(self):
  72. return verify_server()
  73. def POST(self):
  74. channel_instance = WechatMPChannel()
  75. try:
  76. query_time = time.time()
  77. webData = web.data()
  78. # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8"))
  79. wechat_msg = receive.parse_xml(webData)
  80. if wechat_msg.msg_type == 'text':
  81. from_user = wechat_msg.from_user_id
  82. to_user = wechat_msg.to_user_id
  83. message = wechat_msg.content.decode("utf-8")
  84. message_id = wechat_msg.msg_id
  85. logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message))
  86. cache_key = from_user
  87. cache = channel_instance.cache_dict.get(cache_key)
  88. reply_text = ""
  89. # New request
  90. if cache == None:
  91. # The first query begin, reset the cache
  92. context = channel_instance._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechat_msg)
  93. logger.debug("[wechatmp] context: {} {}".format(context, wechat_msg))
  94. if context:
  95. # set private openai_api_key
  96. # if from_user is not changed in itchat, this can be placed at chat_channel
  97. user_data = conf().get_user_data(from_user)
  98. context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key
  99. channel_instance.cache_dict[cache_key] = (0, "")
  100. channel_instance.produce(context)
  101. else:
  102. trigger_prefix = conf().get('single_chat_prefix',[''])[0]
  103. if trigger_prefix:
  104. content = textwrap.dedent(f"""\
  105. 请输入'{trigger_prefix}'接你想说的话跟我说话。
  106. 例如:
  107. {trigger_prefix}你好,很高兴见到你。""")
  108. else:
  109. logger.error(f"[wechatmp] unknown error")
  110. content = textwrap.dedent("""\
  111. 未知错误,请稍后再试""")
  112. replyMsg = reply.TextMsg(wechat_msg.from_user_id, wechat_msg.to_user_id, content)
  113. return replyMsg.send()
  114. channel_instance.query1[cache_key] = False
  115. channel_instance.query2[cache_key] = False
  116. channel_instance.query3[cache_key] = False
  117. # Request again
  118. elif cache[0] == 0 and channel_instance.query1.get(cache_key) == True and channel_instance.query2.get(cache_key) == True and channel_instance.query3.get(cache_key) == True:
  119. channel_instance.query1[cache_key] = False #To improve waiting experience, this can be set to True.
  120. channel_instance.query2[cache_key] = False #To improve waiting experience, this can be set to True.
  121. channel_instance.query3[cache_key] = False
  122. elif cache[0] >= 1:
  123. # Skip the waiting phase
  124. channel_instance.query1[cache_key] = True
  125. channel_instance.query2[cache_key] = True
  126. channel_instance.query3[cache_key] = True
  127. cache = channel_instance.cache_dict.get(cache_key)
  128. if channel_instance.query1.get(cache_key) == False:
  129. # The first query from wechat official server
  130. logger.debug("[wechatmp] query1 {}".format(cache_key))
  131. channel_instance.query1[cache_key] = True
  132. cnt = 0
  133. while cache[0] == 0 and cnt < 45:
  134. cnt = cnt + 1
  135. time.sleep(0.1)
  136. cache = channel_instance.cache_dict.get(cache_key)
  137. if cnt == 45:
  138. # waiting for timeout (the POST query will be closed by wechat official server)
  139. time.sleep(5)
  140. # and do nothing
  141. return
  142. else:
  143. pass
  144. elif channel_instance.query2.get(cache_key) == False:
  145. # The second query from wechat official server
  146. logger.debug("[wechatmp] query2 {}".format(cache_key))
  147. channel_instance.query2[cache_key] = True
  148. cnt = 0
  149. while cache[0] == 0 and cnt < 45:
  150. cnt = cnt + 1
  151. time.sleep(0.1)
  152. cache = channel_instance.cache_dict.get(cache_key)
  153. if cnt == 45:
  154. # waiting for timeout (the POST query will be closed by wechat official server)
  155. time.sleep(5)
  156. # and do nothing
  157. return
  158. else:
  159. pass
  160. elif channel_instance.query3.get(cache_key) == False:
  161. # The third query from wechat official server
  162. logger.debug("[wechatmp] query3 {}".format(cache_key))
  163. channel_instance.query3[cache_key] = True
  164. cnt = 0
  165. while cache[0] == 0 and cnt < 40:
  166. cnt = cnt + 1
  167. time.sleep(0.1)
  168. cache = channel_instance.cache_dict.get(cache_key)
  169. if cnt == 40:
  170. # Have waiting for 3x5 seconds
  171. # return timeout message
  172. reply_text = "【正在思考中,回复任意文字尝试获取回复】"
  173. logger.info("[wechatmp] Three queries has finished For {}: {}".format(from_user, message_id))
  174. replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
  175. return replyPost
  176. else:
  177. pass
  178. if float(time.time()) - float(query_time) > 4.8:
  179. logger.info("[wechatmp] Timeout for {} {}".format(from_user, message_id))
  180. return
  181. if cache[0] > 1:
  182. reply_text = cache[1][:600] + "\n【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit
  183. channel_instance.cache_dict[cache_key] = (cache[0] - 1, cache[1][600:])
  184. elif cache[0] == 1:
  185. reply_text = cache[1]
  186. channel_instance.cache_dict.pop(cache_key)
  187. logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text))
  188. replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
  189. return replyPost
  190. elif wechat_msg.msg_type == 'event':
  191. logger.info("[wechatmp] Event {} from {}".format(wechat_msg.Event, wechat_msg.from_user_id))
  192. trigger_prefix = conf().get('single_chat_prefix',[''])[0]
  193. content = textwrap.dedent(f"""\
  194. 感谢您的关注!
  195. 这里是ChatGPT,可以自由对话。
  196. 资源有限,回复较慢,请勿着急。
  197. 支持通用表情输入。
  198. 暂时不支持图片输入。
  199. 支持图片输出,画字开头的问题将回复图片链接。
  200. 支持角色扮演和文字冒险两种定制模式对话。
  201. 输入'{trigger_prefix}#帮助' 查看详细指令。""")
  202. replyMsg = reply.TextMsg(wechat_msg.from_user_id, wechat_msg.to_user_id, content)
  203. return replyMsg.send()
  204. else:
  205. logger.info("暂且不处理")
  206. return "success"
  207. except Exception as exc:
  208. logger.exception(exc)
  209. return exc