You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

221 lines
11KB

  1. # -*- coding: utf-8 -*-
  2. import io
  3. import os
  4. import time
  5. import imghdr
  6. import requests
  7. import asyncio
  8. import threading
  9. from config import conf
  10. from bridge.context import *
  11. from bridge.reply import *
  12. from common.log import logger
  13. from common.singleton import singleton
  14. from voice.audio_convert import any_to_mp3
  15. from channel.chat_channel import ChatChannel
  16. from channel.wechatmp.common import *
  17. from channel.wechatmp.wechatmp_client import WechatMPClient
  18. from wechatpy.exceptions import WeChatClientException
  19. from wechatpy.crypto import WeChatCrypto
  20. import web
  21. # If using SSL, uncomment the following lines, and modify the certificate path.
  22. # from cheroot.server import HTTPServer
  23. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  24. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  25. # certificate='/ssl/cert.pem',
  26. # private_key='/ssl/cert.key')
  27. @singleton
  28. class WechatMPChannel(ChatChannel):
  29. def __init__(self, passive_reply=True):
  30. super().__init__()
  31. self.passive_reply = passive_reply
  32. self.NOT_SUPPORT_REPLYTYPE = []
  33. appid = conf().get("wechatmp_app_id")
  34. secret = conf().get("wechatmp_app_secret")
  35. token = conf().get("wechatmp_token")
  36. aes_key = conf().get("wechatmp_aes_key")
  37. self.client = WechatMPClient(appid, secret)
  38. self.crypto = None
  39. if aes_key:
  40. self.crypto = WeChatCrypto(token, aes_key, appid)
  41. if self.passive_reply:
  42. # Cache the reply to the user's first message
  43. self.cache_dict = dict()
  44. # Record whether the current message is being processed
  45. self.running = set()
  46. # Count the request from wechat official server by message_id
  47. self.request_cnt = dict()
  48. # The permanent media need to be deleted to avoid media number limit
  49. self.delete_media_loop = asyncio.new_event_loop()
  50. t = threading.Thread(target=self.start_loop, args=(self.delete_media_loop,))
  51. t.setDaemon(True)
  52. t.start()
  53. def startup(self):
  54. if self.passive_reply:
  55. urls = ("/wx", "channel.wechatmp.passive_reply.Query")
  56. else:
  57. urls = ("/wx", "channel.wechatmp.active_reply.Query")
  58. app = web.application(urls, globals(), autoreload=False)
  59. port = conf().get("wechatmp_port", 8080)
  60. web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
  61. def start_loop(self, loop):
  62. asyncio.set_event_loop(loop)
  63. loop.run_forever()
  64. async def delete_media(self, media_id):
  65. logger.debug("[wechatmp] permanent media {} will be deleted in 10s".format(media_id))
  66. await asyncio.sleep(10)
  67. self.client.material.delete(media_id)
  68. logger.info("[wechatmp] permanent media {} has been deleted".format(media_id))
  69. def send(self, reply: Reply, context: Context):
  70. receiver = context["receiver"]
  71. if self.passive_reply:
  72. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  73. reply_text = reply.content
  74. logger.info("[wechatmp] text cached, receiver {}\n{}".format(receiver, reply_text))
  75. self.cache_dict[receiver] = ("text", reply_text)
  76. elif reply.type == ReplyType.VOICE:
  77. try:
  78. voice_file_path = reply.content
  79. with open(voice_file_path, 'rb') as f:
  80. # support: <2M, <60s, mp3/wma/wav/amr
  81. response = self.client.material.add("voice", f)
  82. logger.debug("[wechatmp] upload voice response: {}".format(response))
  83. # 根据文件大小估计一个微信自动审核的时间,审核结束前返回将会导致语音无法播放,这个估计有待验证
  84. f_size = os.fstat(f.fileno()).st_size
  85. time.sleep(1.0 + 2 * f_size / 1024 / 1024)
  86. # todo check media_id
  87. except WeChatClientException as e:
  88. logger.error("[wechatmp] upload voice failed: {}".format(e))
  89. return
  90. media_id = response["media_id"]
  91. logger.info("[wechatmp] voice uploaded, receiver {}, media_id {}".format(receiver, media_id))
  92. self.cache_dict[receiver] = ("voice", media_id)
  93. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  94. img_url = reply.content
  95. pic_res = requests.get(img_url, stream=True)
  96. image_storage = io.BytesIO()
  97. for block in pic_res.iter_content(1024):
  98. image_storage.write(block)
  99. image_storage.seek(0)
  100. image_type = imghdr.what(image_storage)
  101. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  102. content_type = "image/" + image_type
  103. try:
  104. response = self.client.material.add("image", (filename, image_storage, content_type))
  105. logger.debug("[wechatmp] upload image response: {}".format(response))
  106. except WeChatClientException as e:
  107. logger.error("[wechatmp] upload image failed: {}".format(e))
  108. return
  109. media_id = response["media_id"]
  110. logger.info("[wechatmp] image uploaded, receiver {}, media_id {}".format(receiver, media_id))
  111. self.cache_dict[receiver] = ("image", media_id)
  112. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  113. image_storage = reply.content
  114. image_storage.seek(0)
  115. image_type = imghdr.what(image_storage)
  116. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  117. content_type = "image/" + image_type
  118. try:
  119. response = self.client.material.add("image", (filename, image_storage, content_type))
  120. logger.debug("[wechatmp] upload image response: {}".format(response))
  121. except WeChatClientException as e:
  122. logger.error("[wechatmp] upload image failed: {}".format(e))
  123. return
  124. media_id = response["media_id"]
  125. logger.info("[wechatmp] image uploaded, receiver {}, media_id {}".format(receiver, media_id))
  126. self.cache_dict[receiver] = ("image", media_id)
  127. else:
  128. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  129. reply_text = reply.content
  130. texts = split_string_by_utf8_length(reply_text, MAX_UTF8_LEN)
  131. if len(texts)>1:
  132. logger.info("[wechatmp] text too long, split into {} parts".format(len(texts)))
  133. for text in texts:
  134. self.client.message.send_text(receiver, text)
  135. logger.info("[wechatmp] Do send text to {}: {}".format(receiver, reply_text))
  136. elif reply.type == ReplyType.VOICE:
  137. try:
  138. file_path = reply.content
  139. file_name = os.path.basename(file_path)
  140. file_type = os.path.splitext(file_name)[1]
  141. if file_type == ".mp3":
  142. file_type = "audio/mpeg"
  143. elif file_type == ".amr":
  144. file_type = "audio/amr"
  145. else:
  146. mp3_file = os.path.splitext(file_path)[0] + ".mp3"
  147. any_to_mp3(file_path, mp3_file)
  148. file_path = mp3_file
  149. file_name = os.path.basename(file_path)
  150. file_type = "audio/mpeg"
  151. logger.info("[wechatmp] file_name: {}, file_type: {} ".format(file_name, file_type))
  152. # support: <2M, <60s, AMR\MP3
  153. response = self.client.media.upload("voice", (file_name, open(file_path, "rb"), file_type))
  154. logger.debug("[wechatmp] upload voice response: {}".format(response))
  155. except WeChatClientException as e:
  156. logger.error("[wechatmp] upload voice failed: {}".format(e))
  157. return
  158. self.client.message.send_voice(receiver, response["media_id"])
  159. logger.info("[wechatmp] Do send voice to {}".format(receiver))
  160. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  161. img_url = reply.content
  162. pic_res = requests.get(img_url, stream=True)
  163. image_storage = io.BytesIO()
  164. for block in pic_res.iter_content(1024):
  165. image_storage.write(block)
  166. image_storage.seek(0)
  167. image_type = imghdr.what(image_storage)
  168. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  169. content_type = "image/" + image_type
  170. try:
  171. response = self.client.media.upload("image", (filename, image_storage, content_type))
  172. logger.debug("[wechatmp] upload image response: {}".format(response))
  173. except WeChatClientException as e:
  174. logger.error("[wechatmp] upload image failed: {}".format(e))
  175. return
  176. self.client.message.send_image(receiver, response["media_id"])
  177. logger.info("[wechatmp] Do send image to {}".format(receiver))
  178. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  179. image_storage = reply.content
  180. image_storage.seek(0)
  181. image_type = imghdr.what(image_storage)
  182. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  183. content_type = "image/" + image_type
  184. try:
  185. response = self.client.media.upload("image", (filename, image_storage, content_type))
  186. logger.debug("[wechatmp] upload image response: {}".format(response))
  187. except WeChatClientException as e:
  188. logger.error("[wechatmp] upload image failed: {}".format(e))
  189. return
  190. self.client.message.send_image(receiver, response["media_id"])
  191. logger.info("[wechatmp] Do send image to {}".format(receiver))
  192. return
  193. def _success_callback(self, session_id, context, **kwargs): # 线程异常结束时的回调函数
  194. logger.debug(
  195. "[wechatmp] Success to generate reply, msgId={}".format(
  196. context["msg"].msg_id
  197. )
  198. )
  199. if self.passive_reply:
  200. self.running.remove(session_id)
  201. def _fail_callback(self, session_id, exception, context, **kwargs): # 线程异常结束时的回调函数
  202. logger.exception(
  203. "[wechatmp] Fail to generate reply to user, msgId={}, exception={}".format(
  204. context["msg"].msg_id, exception
  205. )
  206. )
  207. if self.passive_reply:
  208. assert session_id not in self.cache_dict
  209. self.running.remove(session_id)