You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
11KB

  1. import time
  2. import web
  3. import channel.wechatmp.receive as receive
  4. import channel.wechatmp.reply as reply
  5. from bridge.context import *
  6. from channel.wechatmp.common import *
  7. from channel.wechatmp.wechatmp_channel import WechatMPChannel
  8. from common.log import logger
  9. from config import conf
  10. # This class is instantiated once per query
  11. class Query:
  12. def GET(self):
  13. return verify_server(web.input())
  14. def POST(self):
  15. # Make sure to return the instance that first created, @singleton will do that.
  16. channel = WechatMPChannel()
  17. try:
  18. query_time = time.time()
  19. webData = web.data()
  20. logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8"))
  21. wechatmp_msg = receive.parse_xml(webData)
  22. if wechatmp_msg.msg_type == "text" or wechatmp_msg.msg_type == "voice":
  23. from_user = wechatmp_msg.from_user_id
  24. to_user = wechatmp_msg.to_user_id
  25. message = wechatmp_msg.content.decode("utf-8")
  26. message_id = wechatmp_msg.msg_id
  27. logger.info(
  28. "[wechatmp] {}:{} Receive post query {} {}: {}".format(
  29. web.ctx.env.get("REMOTE_ADDR"),
  30. web.ctx.env.get("REMOTE_PORT"),
  31. from_user,
  32. message_id,
  33. message,
  34. )
  35. )
  36. supported = True
  37. if "【收到不支持的消息类型,暂无法显示】" in message:
  38. supported = False # not supported, used to refresh
  39. cache_key = from_user
  40. reply_text = ""
  41. # New request
  42. if (
  43. cache_key not in channel.cache_dict
  44. and cache_key not in channel.running
  45. ):
  46. # The first query begin, reset the cache
  47. context = channel._compose_context(
  48. ContextType.TEXT, message, isgroup=False, msg=wechatmp_msg
  49. )
  50. logger.debug(
  51. "[wechatmp] context: {} {}".format(context, wechatmp_msg)
  52. )
  53. if message_id in channel.received_msgs: # received and finished
  54. # no return because of bandwords or other reasons
  55. return "success"
  56. if supported and context:
  57. # set private openai_api_key
  58. # if from_user is not changed in itchat, this can be placed at chat_channel
  59. user_data = conf().get_user_data(from_user)
  60. context["openai_api_key"] = user_data.get(
  61. "openai_api_key"
  62. ) # None or user openai_api_key
  63. channel.received_msgs[message_id] = wechatmp_msg
  64. channel.running.add(cache_key)
  65. channel.produce(context)
  66. else:
  67. trigger_prefix = conf().get("single_chat_prefix", [""])[0]
  68. if trigger_prefix or not supported:
  69. if trigger_prefix:
  70. content = textwrap.dedent(
  71. f"""\
  72. 请输入'{trigger_prefix}'接你想说的话跟我说话。
  73. 例如:
  74. {trigger_prefix}你好,很高兴见到你。"""
  75. )
  76. else:
  77. content = textwrap.dedent(
  78. """\
  79. 你好,很高兴见到你。
  80. 请跟我说话吧。"""
  81. )
  82. else:
  83. logger.error(f"[wechatmp] unknown error")
  84. content = textwrap.dedent(
  85. """\
  86. 未知错误,请稍后再试"""
  87. )
  88. replyMsg = reply.TextMsg(
  89. wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content
  90. )
  91. return replyMsg.send()
  92. channel.query1[cache_key] = False
  93. channel.query2[cache_key] = False
  94. channel.query3[cache_key] = False
  95. # User request again, and the answer is not ready
  96. elif (
  97. cache_key in channel.running
  98. and channel.query1.get(cache_key) == True
  99. and channel.query2.get(cache_key) == True
  100. and channel.query3.get(cache_key) == True
  101. ):
  102. channel.query1[
  103. cache_key
  104. ] = False # To improve waiting experience, this can be set to True.
  105. channel.query2[
  106. cache_key
  107. ] = False # To improve waiting experience, this can be set to True.
  108. channel.query3[cache_key] = False
  109. # User request again, and the answer is ready
  110. elif cache_key in channel.cache_dict:
  111. # Skip the waiting phase
  112. channel.query1[cache_key] = True
  113. channel.query2[cache_key] = True
  114. channel.query3[cache_key] = True
  115. assert not (
  116. cache_key in channel.cache_dict and cache_key in channel.running
  117. )
  118. if channel.query1.get(cache_key) == False:
  119. # The first query from wechat official server
  120. logger.debug("[wechatmp] query1 {}".format(cache_key))
  121. channel.query1[cache_key] = True
  122. cnt = 0
  123. while cache_key in channel.running and cnt < 45:
  124. cnt = cnt + 1
  125. time.sleep(0.1)
  126. if cnt == 45:
  127. # waiting for timeout (the POST query will be closed by wechat official server)
  128. time.sleep(1)
  129. # and do nothing
  130. return
  131. else:
  132. pass
  133. elif channel.query2.get(cache_key) == False:
  134. # The second query from wechat official server
  135. logger.debug("[wechatmp] query2 {}".format(cache_key))
  136. channel.query2[cache_key] = True
  137. cnt = 0
  138. while cache_key in channel.running and cnt < 45:
  139. cnt = cnt + 1
  140. time.sleep(0.1)
  141. if cnt == 45:
  142. # waiting for timeout (the POST query will be closed by wechat official server)
  143. time.sleep(1)
  144. # and do nothing
  145. return
  146. else:
  147. pass
  148. elif channel.query3.get(cache_key) == False:
  149. # The third query from wechat official server
  150. logger.debug("[wechatmp] query3 {}".format(cache_key))
  151. channel.query3[cache_key] = True
  152. cnt = 0
  153. while cache_key in channel.running and cnt < 40:
  154. cnt = cnt + 1
  155. time.sleep(0.1)
  156. if cnt == 40:
  157. # Have waiting for 3x5 seconds
  158. # return timeout message
  159. reply_text = "【正在思考中,回复任意文字尝试获取回复】"
  160. logger.info(
  161. "[wechatmp] Three queries has finished For {}: {}".format(
  162. from_user, message_id
  163. )
  164. )
  165. replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
  166. return replyPost
  167. else:
  168. pass
  169. if (
  170. cache_key not in channel.cache_dict
  171. and cache_key not in channel.running
  172. ):
  173. # no return because of bandwords or other reasons
  174. return "success"
  175. # if float(time.time()) - float(query_time) > 4.8:
  176. # reply_text = "【正在思考中,回复任意文字尝试获取回复】"
  177. # logger.info("[wechatmp] Timeout for {} {}, return".format(from_user, message_id))
  178. # replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
  179. # return replyPost
  180. if cache_key in channel.cache_dict:
  181. content = channel.cache_dict[cache_key]
  182. if len(content.encode("utf8")) <= MAX_UTF8_LEN:
  183. reply_text = channel.cache_dict[cache_key]
  184. channel.cache_dict.pop(cache_key)
  185. else:
  186. continue_text = "\n【未完待续,回复任意文字以继续】"
  187. splits = split_string_by_utf8_length(
  188. content,
  189. MAX_UTF8_LEN - len(continue_text.encode("utf-8")),
  190. max_split=1,
  191. )
  192. reply_text = splits[0] + continue_text
  193. channel.cache_dict[cache_key] = splits[1]
  194. logger.info(
  195. "[wechatmp] {}:{} Do send {}".format(
  196. web.ctx.env.get("REMOTE_ADDR"),
  197. web.ctx.env.get("REMOTE_PORT"),
  198. reply_text,
  199. )
  200. )
  201. replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
  202. return replyPost
  203. elif wechatmp_msg.msg_type == "event":
  204. logger.info(
  205. "[wechatmp] Event {} from {}".format(
  206. wechatmp_msg.content, wechatmp_msg.from_user_id
  207. )
  208. )
  209. content = subscribe_msg()
  210. replyMsg = reply.TextMsg(
  211. wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content
  212. )
  213. return replyMsg.send()
  214. else:
  215. logger.info("暂且不处理")
  216. return "success"
  217. except Exception as exc:
  218. logger.exception(exc)
  219. return exc