You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1 年之前
1 年之前
1 年之前
1 年之前
1 年之前
1 年之前
1 年之前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. # -*- coding: utf-8 -*-
  2. import asyncio
  3. import imghdr
  4. import io
  5. import os
  6. import threading
  7. import time
  8. import requests
  9. import web
  10. from wechatpy.crypto import WeChatCrypto
  11. from wechatpy.exceptions import WeChatClientException
  12. from collections import defaultdict
  13. from bridge.context import *
  14. from bridge.reply import *
  15. from channel.chat_channel import ChatChannel
  16. from channel.wechatmp.common import *
  17. from channel.wechatmp.wechatmp_client import WechatMPClient
  18. from common.log import logger
  19. from common.singleton import singleton
  20. from common.utils import split_string_by_utf8_length
  21. from config import conf
  22. from voice.audio_convert import any_to_mp3, split_audio
  23. # If using SSL, uncomment the following lines, and modify the certificate path.
  24. # from cheroot.server import HTTPServer
  25. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  26. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  27. # certificate='/ssl/cert.pem',
  28. # private_key='/ssl/cert.key')
  29. @singleton
  30. class WechatMPChannel(ChatChannel):
  31. def __init__(self, passive_reply=True):
  32. super().__init__()
  33. self.passive_reply = passive_reply
  34. self.NOT_SUPPORT_REPLYTYPE = []
  35. appid = conf().get("wechatmp_app_id")
  36. secret = conf().get("wechatmp_app_secret")
  37. token = conf().get("wechatmp_token")
  38. aes_key = conf().get("wechatmp_aes_key")
  39. self.client = WechatMPClient(appid, secret)
  40. self.crypto = None
  41. if aes_key:
  42. self.crypto = WeChatCrypto(token, aes_key, appid)
  43. if self.passive_reply:
  44. # Cache the reply to the user's first message
  45. self.cache_dict = defaultdict(list)
  46. # Record whether the current message is being processed
  47. self.running = set()
  48. # Count the request from wechat official server by message_id
  49. self.request_cnt = dict()
  50. # The permanent media need to be deleted to avoid media number limit
  51. self.delete_media_loop = asyncio.new_event_loop()
  52. t = threading.Thread(target=self.start_loop, args=(self.delete_media_loop,))
  53. t.setDaemon(True)
  54. t.start()
  55. def startup(self):
  56. if self.passive_reply:
  57. urls = ("/wx", "channel.wechatmp.passive_reply.Query")
  58. else:
  59. urls = ("/wx", "channel.wechatmp.active_reply.Query")
  60. app = web.application(urls, globals(), autoreload=False)
  61. port = conf().get("wechatmp_port", 8080)
  62. web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
  63. def start_loop(self, loop):
  64. asyncio.set_event_loop(loop)
  65. loop.run_forever()
  66. async def delete_media(self, media_id):
  67. logger.debug("[wechatmp] permanent media {} will be deleted in 10s".format(media_id))
  68. await asyncio.sleep(10)
  69. self.client.material.delete(media_id)
  70. logger.info("[wechatmp] permanent media {} has been deleted".format(media_id))
  71. def send(self, reply: Reply, context: Context):
  72. receiver = context["receiver"]
  73. if self.passive_reply:
  74. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  75. reply_text = reply.content
  76. logger.info("[wechatmp] text cached, receiver {}\n{}".format(receiver, reply_text))
  77. self.cache_dict[receiver].append(("text", reply_text))
  78. elif reply.type == ReplyType.VOICE:
  79. voice_file_path = reply.content
  80. duration, files = split_audio(voice_file_path, 60 * 1000)
  81. if len(files) > 1:
  82. logger.info("[wechatmp] voice too long {}s > 60s , split into {} parts".format(duration / 1000.0, len(files)))
  83. for path in files:
  84. # support: <2M, <60s, mp3/wma/wav/amr
  85. try:
  86. with open(path, "rb") as f:
  87. response = self.client.material.add("voice", f)
  88. logger.debug("[wechatmp] upload voice response: {}".format(response))
  89. f_size = os.fstat(f.fileno()).st_size
  90. time.sleep(1.0 + 2 * f_size / 1024 / 1024)
  91. # todo check media_id
  92. except WeChatClientException as e:
  93. logger.error("[wechatmp] upload voice failed: {}".format(e))
  94. return
  95. media_id = response["media_id"]
  96. logger.info("[wechatmp] voice uploaded, receiver {}, media_id {}".format(receiver, media_id))
  97. self.cache_dict[receiver].append(("voice", media_id))
  98. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  99. img_url = reply.content
  100. pic_res = requests.get(img_url, stream=True)
  101. image_storage = io.BytesIO()
  102. for block in pic_res.iter_content(1024):
  103. image_storage.write(block)
  104. image_storage.seek(0)
  105. image_type = imghdr.what(image_storage)
  106. filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type
  107. content_type = "image/" + image_type
  108. try:
  109. response = self.client.material.add("image", (filename, image_storage, content_type))
  110. logger.debug("[wechatmp] upload image response: {}".format(response))
  111. except WeChatClientException as e:
  112. logger.error("[wechatmp] upload image failed: {}".format(e))
  113. return
  114. media_id = response["media_id"]
  115. logger.info("[wechatmp] image uploaded, receiver {}, media_id {}".format(receiver, media_id))
  116. self.cache_dict[receiver].append(("image", media_id))
  117. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  118. image_storage = reply.content
  119. image_storage.seek(0)
  120. image_type = imghdr.what(image_storage)
  121. filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type
  122. content_type = "image/" + image_type
  123. try:
  124. response = self.client.material.add("image", (filename, image_storage, content_type))
  125. logger.debug("[wechatmp] upload image response: {}".format(response))
  126. except WeChatClientException as e:
  127. logger.error("[wechatmp] upload image failed: {}".format(e))
  128. return
  129. media_id = response["media_id"]
  130. logger.info("[wechatmp] image uploaded, receiver {}, media_id {}".format(receiver, media_id))
  131. self.cache_dict[receiver].append(("image", media_id))
  132. elif reply.type == ReplyType.VIDEO_URL: # 从网络下载视频
  133. video_url = reply.content
  134. video_res = requests.get(video_url, stream=True)
  135. video_storage = io.BytesIO()
  136. for block in video_res.iter_content(1024):
  137. video_storage.write(block)
  138. video_storage.seek(0)
  139. video_type = 'mp4'
  140. filename = receiver + "-" + str(context["msg"].msg_id) + "." + video_type
  141. content_type = "video/" + video_type
  142. try:
  143. response = self.client.material.add("video", (filename, video_storage, content_type))
  144. logger.debug("[wechatmp] upload video response: {}".format(response))
  145. except WeChatClientException as e:
  146. logger.error("[wechatmp] upload video failed: {}".format(e))
  147. return
  148. media_id = response["media_id"]
  149. logger.info("[wechatmp] video uploaded, receiver {}, media_id {}".format(receiver, media_id))
  150. self.cache_dict[receiver].append(("video", media_id))
  151. elif reply.type == ReplyType.VIDEO: # 从文件读取视频
  152. video_storage = reply.content
  153. video_storage.seek(0)
  154. video_type = 'mp4'
  155. filename = receiver + "-" + str(context["msg"].msg_id) + "." + video_type
  156. content_type = "video/" + video_type
  157. try:
  158. response = self.client.material.add("video", (filename, video_storage, content_type))
  159. logger.debug("[wechatmp] upload video response: {}".format(response))
  160. except WeChatClientException as e:
  161. logger.error("[wechatmp] upload video failed: {}".format(e))
  162. return
  163. media_id = response["media_id"]
  164. logger.info("[wechatmp] video uploaded, receiver {}, media_id {}".format(receiver, media_id))
  165. self.cache_dict[receiver].append(("video", media_id))
  166. else:
  167. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  168. reply_text = reply.content
  169. texts = split_string_by_utf8_length(reply_text, MAX_UTF8_LEN)
  170. if len(texts) > 1:
  171. logger.info("[wechatmp] text too long, split into {} parts".format(len(texts)))
  172. for i, text in enumerate(texts):
  173. self.client.message.send_text(receiver, text)
  174. if i != len(texts) - 1:
  175. time.sleep(0.5) # 休眠0.5秒,防止发送过快乱序
  176. logger.info("[wechatmp] Do send text to {}: {}".format(receiver, reply_text))
  177. elif reply.type == ReplyType.VOICE:
  178. try:
  179. file_path = reply.content
  180. file_name = os.path.basename(file_path)
  181. file_type = os.path.splitext(file_name)[1]
  182. if file_type == ".mp3":
  183. file_type = "audio/mpeg"
  184. elif file_type == ".amr":
  185. file_type = "audio/amr"
  186. else:
  187. mp3_file = os.path.splitext(file_path)[0] + ".mp3"
  188. any_to_mp3(file_path, mp3_file)
  189. file_path = mp3_file
  190. file_name = os.path.basename(file_path)
  191. file_type = "audio/mpeg"
  192. logger.info("[wechatmp] file_name: {}, file_type: {} ".format(file_name, file_type))
  193. media_ids = []
  194. duration, files = split_audio(file_path, 60 * 1000)
  195. if len(files) > 1:
  196. logger.info("[wechatmp] voice too long {}s > 60s , split into {} parts".format(duration / 1000.0, len(files)))
  197. for path in files:
  198. # support: <2M, <60s, AMR\MP3
  199. response = self.client.media.upload("voice", (os.path.basename(path), open(path, "rb"), file_type))
  200. logger.debug("[wechatcom] upload voice response: {}".format(response))
  201. media_ids.append(response["media_id"])
  202. os.remove(path)
  203. except WeChatClientException as e:
  204. logger.error("[wechatmp] upload voice failed: {}".format(e))
  205. return
  206. try:
  207. os.remove(file_path)
  208. except Exception:
  209. pass
  210. for media_id in media_ids:
  211. self.client.message.send_voice(receiver, media_id)
  212. time.sleep(1)
  213. logger.info("[wechatmp] Do send voice to {}".format(receiver))
  214. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  215. img_url = reply.content
  216. pic_res = requests.get(img_url, stream=True)
  217. image_storage = io.BytesIO()
  218. for block in pic_res.iter_content(1024):
  219. image_storage.write(block)
  220. image_storage.seek(0)
  221. image_type = imghdr.what(image_storage)
  222. filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type
  223. content_type = "image/" + image_type
  224. try:
  225. response = self.client.media.upload("image", (filename, image_storage, content_type))
  226. logger.debug("[wechatmp] upload image response: {}".format(response))
  227. except WeChatClientException as e:
  228. logger.error("[wechatmp] upload image failed: {}".format(e))
  229. return
  230. self.client.message.send_image(receiver, response["media_id"])
  231. logger.info("[wechatmp] Do send image to {}".format(receiver))
  232. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  233. image_storage = reply.content
  234. image_storage.seek(0)
  235. image_type = imghdr.what(image_storage)
  236. filename = receiver + "-" + str(context["msg"].msg_id) + "." + image_type
  237. content_type = "image/" + image_type
  238. try:
  239. response = self.client.media.upload("image", (filename, image_storage, content_type))
  240. logger.debug("[wechatmp] upload image response: {}".format(response))
  241. except WeChatClientException as e:
  242. logger.error("[wechatmp] upload image failed: {}".format(e))
  243. return
  244. self.client.message.send_image(receiver, response["media_id"])
  245. logger.info("[wechatmp] Do send image to {}".format(receiver))
  246. elif reply.type == ReplyType.VIDEO_URL: # 从网络下载视频
  247. video_url = reply.content
  248. video_res = requests.get(video_url, stream=True)
  249. video_storage = io.BytesIO()
  250. for block in video_res.iter_content(1024):
  251. video_storage.write(block)
  252. video_storage.seek(0)
  253. video_type = 'mp4'
  254. filename = receiver + "-" + str(context["msg"].msg_id) + "." + video_type
  255. content_type = "video/" + video_type
  256. try:
  257. response = self.client.media.upload("video", (filename, video_storage, content_type))
  258. logger.debug("[wechatmp] upload video response: {}".format(response))
  259. except WeChatClientException as e:
  260. logger.error("[wechatmp] upload video failed: {}".format(e))
  261. return
  262. self.client.message.send_video(receiver, response["media_id"])
  263. logger.info("[wechatmp] Do send video to {}".format(receiver))
  264. elif reply.type == ReplyType.VIDEO: # 从文件读取视频
  265. video_storage = reply.content
  266. video_storage.seek(0)
  267. video_type = 'mp4'
  268. filename = receiver + "-" + str(context["msg"].msg_id) + "." + video_type
  269. content_type = "video/" + video_type
  270. try:
  271. response = self.client.media.upload("video", (filename, video_storage, content_type))
  272. logger.debug("[wechatmp] upload video response: {}".format(response))
  273. except WeChatClientException as e:
  274. logger.error("[wechatmp] upload video failed: {}".format(e))
  275. return
  276. self.client.message.send_video(receiver, response["media_id"])
  277. logger.info("[wechatmp] Do send video to {}".format(receiver))
  278. return
  279. def _success_callback(self, session_id, context, **kwargs): # 线程异常结束时的回调函数
  280. logger.debug("[wechatmp] Success to generate reply, msgId={}".format(context["msg"].msg_id))
  281. if self.passive_reply:
  282. self.running.remove(session_id)
  283. def _fail_callback(self, session_id, exception, context, **kwargs): # 线程异常结束时的回调函数
  284. logger.exception("[wechatmp] Fail to generate reply to user, msgId={}, exception={}".format(context["msg"].msg_id, exception))
  285. if self.passive_reply:
  286. assert session_id not in self.cache_dict
  287. self.running.remove(session_id)