You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

215 lines
11KB

  1. # -*- coding: utf-8 -*-
  2. import io
  3. import os
  4. import time
  5. import imghdr
  6. import requests
  7. import asyncio
  8. import threading
  9. from config import conf
  10. from bridge.context import *
  11. from bridge.reply import *
  12. from common.log import logger
  13. from common.singleton import singleton
  14. from voice.audio_convert import any_to_mp3
  15. from channel.chat_channel import ChatChannel
  16. from channel.wechatmp.common import *
  17. from channel.wechatmp.wechatmp_client import WechatMPClient
  18. from wechatpy.exceptions import WeChatClientException
  19. import web
  20. # If using SSL, uncomment the following lines, and modify the certificate path.
  21. # from cheroot.server import HTTPServer
  22. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  23. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  24. # certificate='/ssl/cert.pem',
  25. # private_key='/ssl/cert.key')
  26. @singleton
  27. class WechatMPChannel(ChatChannel):
  28. def __init__(self, passive_reply=True):
  29. super().__init__()
  30. self.passive_reply = passive_reply
  31. self.NOT_SUPPORT_REPLYTYPE = []
  32. appid = conf().get("wechatmp_app_id")
  33. secret = conf().get("wechatmp_app_secret")
  34. self.client = WechatMPClient(appid, secret)
  35. if self.passive_reply:
  36. # Cache the reply to the user's first message
  37. self.cache_dict = dict()
  38. # Record whether the current message is being processed
  39. self.running = set()
  40. # Count the request from wechat official server by message_id
  41. self.request_cnt = dict()
  42. # The permanent media need to be deleted to avoid media number limit
  43. self.delete_media_loop = asyncio.new_event_loop()
  44. t = threading.Thread(target=self.start_loop, args=(self.delete_media_loop,))
  45. t.setDaemon(True)
  46. t.start()
  47. def startup(self):
  48. if self.passive_reply:
  49. urls = ("/wx", "channel.wechatmp.passive_reply.Query")
  50. else:
  51. urls = ("/wx", "channel.wechatmp.active_reply.Query")
  52. app = web.application(urls, globals(), autoreload=False)
  53. port = conf().get("wechatmp_port", 8080)
  54. web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
  55. def start_loop(self, loop):
  56. asyncio.set_event_loop(loop)
  57. loop.run_forever()
  58. async def delete_media(self, media_id):
  59. logger.debug("[wechatmp] permanent media {} will be deleted in 10s".format(media_id))
  60. await asyncio.sleep(10)
  61. self.client.material.delete(media_id)
  62. logger.info("[wechatmp] permanent media {} has been deleted".format(media_id))
  63. def send(self, reply: Reply, context: Context):
  64. receiver = context["receiver"]
  65. if self.passive_reply:
  66. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  67. reply_text = reply.content
  68. logger.info("[wechatmp] text cached, receiver {}\n{}".format(receiver, reply_text))
  69. self.cache_dict[receiver] = ("text", reply_text)
  70. elif reply.type == ReplyType.VOICE:
  71. try:
  72. voice_file_path = reply.content
  73. with open(voice_file_path, 'rb') as f:
  74. # support: <2M, <60s, mp3/wma/wav/amr
  75. response = self.client.material.add("voice", f)
  76. logger.debug("[wechatmp] upload voice response: {}".format(response))
  77. # 根据文件大小估计一个微信自动审核的时间,审核结束前返回将会导致语音无法播放,这个估计有待验证
  78. f_size = os.fstat(f.fileno()).st_size
  79. time.sleep(1.0 + 2 * f_size / 1024 / 1024)
  80. # todo check media_id
  81. except WeChatClientException as e:
  82. logger.error("[wechatmp] upload voice failed: {}".format(e))
  83. return
  84. media_id = response["media_id"]
  85. logger.info("[wechatmp] voice uploaded, receiver {}, media_id {}".format(receiver, media_id))
  86. self.cache_dict[receiver] = ("voice", media_id)
  87. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  88. img_url = reply.content
  89. pic_res = requests.get(img_url, stream=True)
  90. image_storage = io.BytesIO()
  91. for block in pic_res.iter_content(1024):
  92. image_storage.write(block)
  93. image_storage.seek(0)
  94. image_type = imghdr.what(image_storage)
  95. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  96. content_type = "image/" + image_type
  97. try:
  98. response = self.client.material.add("image", (filename, image_storage, content_type))
  99. logger.debug("[wechatmp] upload image response: {}".format(response))
  100. except WeChatClientException as e:
  101. logger.error("[wechatmp] upload image failed: {}".format(e))
  102. return
  103. media_id = response["media_id"]
  104. logger.info("[wechatmp] image uploaded, receiver {}, media_id {}".format(receiver, media_id))
  105. self.cache_dict[receiver] = ("image", media_id)
  106. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  107. image_storage = reply.content
  108. image_storage.seek(0)
  109. image_type = imghdr.what(image_storage)
  110. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  111. content_type = "image/" + image_type
  112. try:
  113. response = self.client.material.add("image", (filename, image_storage, content_type))
  114. logger.debug("[wechatmp] upload image response: {}".format(response))
  115. except WeChatClientException as e:
  116. logger.error("[wechatmp] upload image failed: {}".format(e))
  117. return
  118. media_id = response["media_id"]
  119. logger.info("[wechatmp] image uploaded, receiver {}, media_id {}".format(receiver, media_id))
  120. self.cache_dict[receiver] = ("image", media_id)
  121. else:
  122. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  123. reply_text = reply.content
  124. texts = split_string_by_utf8_length(reply_text, MAX_UTF8_LEN)
  125. if len(texts)>1:
  126. logger.info("[wechatmp] text too long, split into {} parts".format(len(texts)))
  127. for text in texts:
  128. self.client.message.send_text(receiver, text)
  129. logger.info("[wechatmp] Do send text to {}: {}".format(receiver, reply_text))
  130. elif reply.type == ReplyType.VOICE:
  131. try:
  132. file_path = reply.content
  133. file_name = os.path.basename(file_path)
  134. file_type = os.path.splitext(file_name)[1]
  135. if file_type == ".mp3":
  136. file_type = "audio/mpeg"
  137. elif file_type == ".amr":
  138. file_type = "audio/amr"
  139. else:
  140. mp3_file = os.path.splitext(file_path)[0] + ".mp3"
  141. any_to_mp3(file_path, mp3_file)
  142. file_path = mp3_file
  143. file_name = os.path.basename(file_path)
  144. file_type = "audio/mpeg"
  145. logger.info("[wechatmp] file_name: {}, file_type: {} ".format(file_name, file_type))
  146. # support: <2M, <60s, AMR\MP3
  147. response = self.client.media.upload("voice", (file_name, open(file_path, "rb"), file_type))
  148. logger.debug("[wechatmp] upload voice response: {}".format(response))
  149. except WeChatClientException as e:
  150. logger.error("[wechatmp] upload voice failed: {}".format(e))
  151. return
  152. self.client.message.send_voice(receiver, response["media_id"])
  153. logger.info("[wechatmp] Do send voice to {}".format(receiver))
  154. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  155. img_url = reply.content
  156. pic_res = requests.get(img_url, stream=True)
  157. image_storage = io.BytesIO()
  158. for block in pic_res.iter_content(1024):
  159. image_storage.write(block)
  160. image_storage.seek(0)
  161. image_type = imghdr.what(image_storage)
  162. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  163. content_type = "image/" + image_type
  164. try:
  165. response = self.client.media.upload("image", (filename, image_storage, content_type))
  166. logger.debug("[wechatmp] upload image response: {}".format(response))
  167. except WeChatClientException as e:
  168. logger.error("[wechatmp] upload image failed: {}".format(e))
  169. return
  170. self.client.message.send_image(receiver, response["media_id"])
  171. logger.info("[wechatmp] Do send image to {}".format(receiver))
  172. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  173. image_storage = reply.content
  174. image_storage.seek(0)
  175. image_type = imghdr.what(image_storage)
  176. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  177. content_type = "image/" + image_type
  178. try:
  179. response = self.client.media.upload("image", (filename, image_storage, content_type))
  180. logger.debug("[wechatmp] upload image response: {}".format(response))
  181. except WeChatClientException as e:
  182. logger.error("[wechatmp] upload image failed: {}".format(e))
  183. return
  184. self.client.message.send_image(receiver, response["media_id"])
  185. logger.info("[wechatmp] Do send image to {}".format(receiver))
  186. return
  187. def _success_callback(self, session_id, context, **kwargs): # 线程异常结束时的回调函数
  188. logger.debug(
  189. "[wechatmp] Success to generate reply, msgId={}".format(
  190. context["msg"].msg_id
  191. )
  192. )
  193. if self.passive_reply:
  194. self.running.remove(session_id)
  195. def _fail_callback(self, session_id, exception, context, **kwargs): # 线程异常结束时的回调函数
  196. logger.exception(
  197. "[wechatmp] Fail to generate reply to user, msgId={}, exception={}".format(
  198. context["msg"].msg_id, exception
  199. )
  200. )
  201. if self.passive_reply:
  202. assert session_id not in self.cache_dict
  203. self.running.remove(session_id)