You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

214 lines
11KB

  1. # -*- coding: utf-8 -*-
  2. import asyncio
  3. import imghdr
  4. import io
  5. import os
  6. import threading
  7. import time
  8. import requests
  9. import web
  10. from wechatpy.crypto import WeChatCrypto
  11. from wechatpy.exceptions import WeChatClientException
  12. from bridge.context import *
  13. from bridge.reply import *
  14. from channel.chat_channel import ChatChannel
  15. from channel.wechatmp.common import *
  16. from channel.wechatmp.wechatmp_client import WechatMPClient
  17. from common.log import logger
  18. from common.singleton import singleton
  19. from config import conf
  20. from voice.audio_convert import any_to_mp3
  21. # If using SSL, uncomment the following lines, and modify the certificate path.
  22. # from cheroot.server import HTTPServer
  23. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  24. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  25. # certificate='/ssl/cert.pem',
  26. # private_key='/ssl/cert.key')
  27. @singleton
  28. class WechatMPChannel(ChatChannel):
  29. def __init__(self, passive_reply=True):
  30. super().__init__()
  31. self.passive_reply = passive_reply
  32. self.NOT_SUPPORT_REPLYTYPE = []
  33. appid = conf().get("wechatmp_app_id")
  34. secret = conf().get("wechatmp_app_secret")
  35. token = conf().get("wechatmp_token")
  36. aes_key = conf().get("wechatmp_aes_key")
  37. self.client = WechatMPClient(appid, secret)
  38. self.crypto = None
  39. if aes_key:
  40. self.crypto = WeChatCrypto(token, aes_key, appid)
  41. if self.passive_reply:
  42. # Cache the reply to the user's first message
  43. self.cache_dict = dict()
  44. # Record whether the current message is being processed
  45. self.running = set()
  46. # Count the request from wechat official server by message_id
  47. self.request_cnt = dict()
  48. # The permanent media need to be deleted to avoid media number limit
  49. self.delete_media_loop = asyncio.new_event_loop()
  50. t = threading.Thread(target=self.start_loop, args=(self.delete_media_loop,))
  51. t.setDaemon(True)
  52. t.start()
  53. def startup(self):
  54. if self.passive_reply:
  55. urls = ("/wx", "channel.wechatmp.passive_reply.Query")
  56. else:
  57. urls = ("/wx", "channel.wechatmp.active_reply.Query")
  58. app = web.application(urls, globals(), autoreload=False)
  59. port = conf().get("wechatmp_port", 8080)
  60. web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
  61. def start_loop(self, loop):
  62. asyncio.set_event_loop(loop)
  63. loop.run_forever()
  64. async def delete_media(self, media_id):
  65. logger.debug("[wechatmp] permanent media {} will be deleted in 10s".format(media_id))
  66. await asyncio.sleep(10)
  67. self.client.material.delete(media_id)
  68. logger.info("[wechatmp] permanent media {} has been deleted".format(media_id))
  69. def send(self, reply: Reply, context: Context):
  70. receiver = context["receiver"]
  71. if self.passive_reply:
  72. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  73. reply_text = reply.content
  74. logger.info("[wechatmp] text cached, receiver {}\n{}".format(receiver, reply_text))
  75. self.cache_dict[receiver] = ("text", reply_text)
  76. elif reply.type == ReplyType.VOICE:
  77. try:
  78. voice_file_path = reply.content
  79. with open(voice_file_path, "rb") as f:
  80. # support: <2M, <60s, mp3/wma/wav/amr
  81. response = self.client.material.add("voice", f)
  82. logger.debug("[wechatmp] upload voice response: {}".format(response))
  83. # 根据文件大小估计一个微信自动审核的时间,审核结束前返回将会导致语音无法播放,这个估计有待验证
  84. f_size = os.fstat(f.fileno()).st_size
  85. time.sleep(1.0 + 2 * f_size / 1024 / 1024)
  86. # todo check media_id
  87. except WeChatClientException as e:
  88. logger.error("[wechatmp] upload voice failed: {}".format(e))
  89. return
  90. media_id = response["media_id"]
  91. logger.info("[wechatmp] voice uploaded, receiver {}, media_id {}".format(receiver, media_id))
  92. self.cache_dict[receiver] = ("voice", media_id)
  93. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  94. img_url = reply.content
  95. pic_res = requests.get(img_url, stream=True)
  96. image_storage = io.BytesIO()
  97. for block in pic_res.iter_content(1024):
  98. image_storage.write(block)
  99. image_storage.seek(0)
  100. image_type = imghdr.what(image_storage)
  101. filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type
  102. content_type = "image/" + image_type
  103. try:
  104. response = self.client.material.add("image", (filename, image_storage, content_type))
  105. logger.debug("[wechatmp] upload image response: {}".format(response))
  106. except WeChatClientException as e:
  107. logger.error("[wechatmp] upload image failed: {}".format(e))
  108. return
  109. media_id = response["media_id"]
  110. logger.info("[wechatmp] image uploaded, receiver {}, media_id {}".format(receiver, media_id))
  111. self.cache_dict[receiver] = ("image", media_id)
  112. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  113. image_storage = reply.content
  114. image_storage.seek(0)
  115. image_type = imghdr.what(image_storage)
  116. filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type
  117. content_type = "image/" + image_type
  118. try:
  119. response = self.client.material.add("image", (filename, image_storage, content_type))
  120. logger.debug("[wechatmp] upload image response: {}".format(response))
  121. except WeChatClientException as e:
  122. logger.error("[wechatmp] upload image failed: {}".format(e))
  123. return
  124. media_id = response["media_id"]
  125. logger.info("[wechatmp] image uploaded, receiver {}, media_id {}".format(receiver, media_id))
  126. self.cache_dict[receiver] = ("image", media_id)
  127. else:
  128. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  129. reply_text = reply.content
  130. texts = split_string_by_utf8_length(reply_text, MAX_UTF8_LEN)
  131. if len(texts) > 1:
  132. logger.info("[wechatmp] text too long, split into {} parts".format(len(texts)))
  133. for text in texts:
  134. self.client.message.send_text(receiver, text)
  135. logger.info("[wechatmp] Do send text to {}: {}".format(receiver, reply_text))
  136. elif reply.type == ReplyType.VOICE:
  137. try:
  138. file_path = reply.content
  139. file_name = os.path.basename(file_path)
  140. file_type = os.path.splitext(file_name)[1]
  141. if file_type == ".mp3":
  142. file_type = "audio/mpeg"
  143. elif file_type == ".amr":
  144. file_type = "audio/amr"
  145. else:
  146. mp3_file = os.path.splitext(file_path)[0] + ".mp3"
  147. any_to_mp3(file_path, mp3_file)
  148. file_path = mp3_file
  149. file_name = os.path.basename(file_path)
  150. file_type = "audio/mpeg"
  151. logger.info("[wechatmp] file_name: {}, file_type: {} ".format(file_name, file_type))
  152. # support: <2M, <60s, AMR\MP3
  153. response = self.client.media.upload("voice", (file_name, open(file_path, "rb"), file_type))
  154. logger.debug("[wechatmp] upload voice response: {}".format(response))
  155. except WeChatClientException as e:
  156. logger.error("[wechatmp] upload voice failed: {}".format(e))
  157. return
  158. self.client.message.send_voice(receiver, response["media_id"])
  159. logger.info("[wechatmp] Do send voice to {}".format(receiver))
  160. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  161. img_url = reply.content
  162. pic_res = requests.get(img_url, stream=True)
  163. image_storage = io.BytesIO()
  164. for block in pic_res.iter_content(1024):
  165. image_storage.write(block)
  166. image_storage.seek(0)
  167. image_type = imghdr.what(image_storage)
  168. filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type
  169. content_type = "image/" + image_type
  170. try:
  171. response = self.client.media.upload("image", (filename, image_storage, content_type))
  172. logger.debug("[wechatmp] upload image response: {}".format(response))
  173. except WeChatClientException as e:
  174. logger.error("[wechatmp] upload image failed: {}".format(e))
  175. return
  176. self.client.message.send_image(receiver, response["media_id"])
  177. logger.info("[wechatmp] Do send image to {}".format(receiver))
  178. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  179. image_storage = reply.content
  180. image_storage.seek(0)
  181. image_type = imghdr.what(image_storage)
  182. filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type
  183. content_type = "image/" + image_type
  184. try:
  185. response = self.client.media.upload("image", (filename, image_storage, content_type))
  186. logger.debug("[wechatmp] upload image response: {}".format(response))
  187. except WeChatClientException as e:
  188. logger.error("[wechatmp] upload image failed: {}".format(e))
  189. return
  190. self.client.message.send_image(receiver, response["media_id"])
  191. logger.info("[wechatmp] Do send image to {}".format(receiver))
  192. return
  193. def _success_callback(self, session_id, context, **kwargs): # 线程异常结束时的回调函数
  194. logger.debug("[wechatmp] Success to generate reply, msgId={}".format(context["msg"].msg_id))
  195. if self.passive_reply:
  196. self.running.remove(session_id)
  197. def _fail_callback(self, session_id, exception, context, **kwargs): # 线程异常结束时的回调函数
  198. logger.exception("[wechatmp] Fail to generate reply to user, msgId={}, exception={}".format(context["msg"].msg_id, exception))
  199. if self.passive_reply:
  200. assert session_id not in self.cache_dict
  201. self.running.remove(session_id)