passive_reply.py 9.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. import asyncio
  2. import time
  3. import web
  4. from wechatpy import parse_message
  5. from wechatpy.replies import ImageReply, VoiceReply, create_reply
  6. import textwrap
  7. from bridge.context import *
  8. from bridge.reply import *
  9. from channel.wechatmp.common import *
  10. from channel.wechatmp.wechatmp_channel import WechatMPChannel
  11. from channel.wechatmp.wechatmp_message import WeChatMPMessage
  12. from common.log import logger
  13. from common.utils import split_string_by_utf8_length
  14. from config import conf, subscribe_msg
  15. # This class is instantiated once per query
  16. class Query:
  17. def GET(self):
  18. return verify_server(web.input())
  19. def POST(self):
  20. try:
  21. args = web.input()
  22. verify_server(args)
  23. request_time = time.time()
  24. channel = WechatMPChannel()
  25. message = web.data()
  26. encrypt_func = lambda x: x
  27. if args.get("encrypt_type") == "aes":
  28. logger.debug("[wechatmp] Receive encrypted post data:\n" + message.decode("utf-8"))
  29. if not channel.crypto:
  30. raise Exception("Crypto not initialized, Please set wechatmp_aes_key in config.json")
  31. message = channel.crypto.decrypt_message(message, args.msg_signature, args.timestamp, args.nonce)
  32. encrypt_func = lambda x: channel.crypto.encrypt_message(x, args.nonce, args.timestamp)
  33. else:
  34. logger.debug("[wechatmp] Receive post data:\n" + message.decode("utf-8"))
  35. msg = parse_message(message)
  36. if msg.type in ["text", "voice", "image"]:
  37. wechatmp_msg = WeChatMPMessage(msg, client=channel.client)
  38. from_user = wechatmp_msg.from_user_id
  39. content = wechatmp_msg.content
  40. message_id = wechatmp_msg.msg_id
  41. supported = True
  42. if "【收到不支持的消息类型,暂无法显示】" in content:
  43. supported = False # not supported, used to refresh
  44. # New request
  45. if (
  46. channel.cache_dict.get(from_user) is None
  47. and from_user not in channel.running
  48. or content.startswith("#")
  49. and message_id not in channel.request_cnt # insert the godcmd
  50. ):
  51. # The first query begin
  52. if msg.type == "voice" and wechatmp_msg.ctype == ContextType.TEXT and conf().get("voice_reply_voice", False):
  53. context = channel._compose_context(wechatmp_msg.ctype, content, isgroup=False, desire_rtype=ReplyType.VOICE, msg=wechatmp_msg)
  54. else:
  55. context = channel._compose_context(wechatmp_msg.ctype, content, isgroup=False, msg=wechatmp_msg)
  56. logger.debug("[wechatmp] context: {} {} {}".format(context, wechatmp_msg, supported))
  57. if supported and context:
  58. channel.running.add(from_user)
  59. channel.produce(context)
  60. else:
  61. trigger_prefix = conf().get("single_chat_prefix", [""])[0]
  62. if trigger_prefix or not supported:
  63. if trigger_prefix:
  64. reply_text = textwrap.dedent(
  65. f"""\
  66. 请输入'{trigger_prefix}'接你想说的话跟我说话。
  67. 例如:
  68. {trigger_prefix}你好,很高兴见到你。"""
  69. )
  70. else:
  71. reply_text = textwrap.dedent(
  72. """\
  73. 你好,很高兴见到你。
  74. 请跟我说话吧。"""
  75. )
  76. else:
  77. logger.error(f"[wechatmp] unknown error")
  78. reply_text = textwrap.dedent(
  79. """\
  80. 未知错误,请稍后再试"""
  81. )
  82. replyPost = create_reply(reply_text, msg)
  83. return encrypt_func(replyPost.render())
  84. # Wechat official server will request 3 times (5 seconds each), with the same message_id.
  85. # Because the interval is 5 seconds, here assumed that do not have multithreading problems.
  86. request_cnt = channel.request_cnt.get(message_id, 0) + 1
  87. channel.request_cnt[message_id] = request_cnt
  88. logger.info(
  89. "[wechatmp] Request {} from {} {} {}:{}\n{}".format(
  90. request_cnt, from_user, message_id, web.ctx.env.get("REMOTE_ADDR"), web.ctx.env.get("REMOTE_PORT"), content
  91. )
  92. )
  93. task_running = True
  94. waiting_until = request_time + 4
  95. while time.time() < waiting_until:
  96. if from_user in channel.running:
  97. time.sleep(0.1)
  98. else:
  99. task_running = False
  100. break
  101. reply_text = ""
  102. if task_running:
  103. if request_cnt < 3:
  104. # waiting for timeout (the POST request will be closed by Wechat official server)
  105. time.sleep(2)
  106. # and do nothing, waiting for the next request
  107. return "success"
  108. else: # request_cnt == 3:
  109. # return timeout message
  110. reply_text = "【正在思考中,回复任意文字尝试获取回复】"
  111. replyPost = create_reply(reply_text, msg)
  112. return encrypt_func(replyPost.render())
  113. # reply is ready
  114. channel.request_cnt.pop(message_id)
  115. # no return because of bandwords or other reasons
  116. if from_user not in channel.cache_dict and from_user not in channel.running:
  117. return "success"
  118. # Only one request can access to the cached data
  119. try:
  120. (reply_type, reply_content) = channel.cache_dict[from_user].pop(0)
  121. if not channel.cache_dict[from_user]: # If popping the message makes the list empty, delete the user entry from cache
  122. del channel.cache_dict[from_user]
  123. except IndexError:
  124. return "success"
  125. if reply_type == "text":
  126. if len(reply_content.encode("utf8")) <= MAX_UTF8_LEN:
  127. reply_text = reply_content
  128. else:
  129. continue_text = "\n【未完待续,回复任意文字以继续】"
  130. splits = split_string_by_utf8_length(
  131. reply_content,
  132. MAX_UTF8_LEN - len(continue_text.encode("utf-8")),
  133. max_split=1,
  134. )
  135. reply_text = splits[0] + continue_text
  136. channel.cache_dict[from_user].append(("text", splits[1]))
  137. logger.info(
  138. "[wechatmp] Request {} do send to {} {}: {}\n{}".format(
  139. request_cnt,
  140. from_user,
  141. message_id,
  142. content,
  143. reply_text,
  144. )
  145. )
  146. replyPost = create_reply(reply_text, msg)
  147. return encrypt_func(replyPost.render())
  148. elif reply_type == "voice":
  149. media_id = reply_content
  150. asyncio.run_coroutine_threadsafe(channel.delete_media(media_id), channel.delete_media_loop)
  151. logger.info(
  152. "[wechatmp] Request {} do send to {} {}: {} voice media_id {}".format(
  153. request_cnt,
  154. from_user,
  155. message_id,
  156. content,
  157. media_id,
  158. )
  159. )
  160. replyPost = VoiceReply(message=msg)
  161. replyPost.media_id = media_id
  162. return encrypt_func(replyPost.render())
  163. elif reply_type == "image":
  164. media_id = reply_content
  165. asyncio.run_coroutine_threadsafe(channel.delete_media(media_id), channel.delete_media_loop)
  166. logger.info(
  167. "[wechatmp] Request {} do send to {} {}: {} image media_id {}".format(
  168. request_cnt,
  169. from_user,
  170. message_id,
  171. content,
  172. media_id,
  173. )
  174. )
  175. replyPost = ImageReply(message=msg)
  176. replyPost.media_id = media_id
  177. return encrypt_func(replyPost.render())
  178. elif msg.type == "event":
  179. logger.info("[wechatmp] Event {} from {}".format(msg.event, msg.source))
  180. if msg.event in ["subscribe", "subscribe_scan"]:
  181. reply_text = subscribe_msg()
  182. if reply_text:
  183. replyPost = create_reply(reply_text, msg)
  184. return encrypt_func(replyPost.render())
  185. else:
  186. return "success"
  187. else:
  188. logger.info("暂且不处理")
  189. return "success"
  190. except Exception as exc:
  191. logger.exception(exc)
  192. return exc