Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

218 lines
10KB

  1. # -*- coding: utf-8 -*-
  2. import io
  3. import os
  4. import time
  5. import imghdr
  6. import requests
  7. from bridge.context import *
  8. from bridge.reply import *
  9. from channel.chat_channel import ChatChannel
  10. from channel.wechatmp.wechatmp_client import WechatMPClient
  11. from channel.wechatmp.common import *
  12. from common.log import logger
  13. from common.singleton import singleton
  14. from config import conf
  15. from wechatpy.exceptions import WeChatClientException
  16. import asyncio
  17. from threading import Thread
  18. import web
  19. from voice.audio_convert import any_to_mp3
  20. # If using SSL, uncomment the following lines, and modify the certificate path.
  21. # from cheroot.server import HTTPServer
  22. # from cheroot.ssl.builtin import BuiltinSSLAdapter
  23. # HTTPServer.ssl_adapter = BuiltinSSLAdapter(
  24. # certificate='/ssl/cert.pem',
  25. # private_key='/ssl/cert.key')
  26. @singleton
  27. class WechatMPChannel(ChatChannel):
  28. def __init__(self, passive_reply=True):
  29. super().__init__()
  30. self.passive_reply = passive_reply
  31. self.NOT_SUPPORT_REPLYTYPE = []
  32. appid = conf().get("wechatmp_app_id")
  33. secret = conf().get("wechatmp_app_secret")
  34. self.client = WechatMPClient(appid, secret)
  35. if self.passive_reply:
  36. # Cache the reply to the user's first message
  37. self.cache_dict = dict()
  38. # Record whether the current message is being processed
  39. self.running = set()
  40. # Count the request from wechat official server by message_id
  41. self.request_cnt = dict()
  42. # The permanent media need to be deleted to avoid media number limit
  43. self.delete_media_loop = asyncio.new_event_loop()
  44. t = Thread(target=self.start_loop, args=(self.delete_media_loop,))
  45. t.setDaemon(True)
  46. t.start()
  47. def startup(self):
  48. if self.passive_reply:
  49. urls = ("/wx", "channel.wechatmp.passive_reply.Query")
  50. else:
  51. urls = ("/wx", "channel.wechatmp.active_reply.Query")
  52. app = web.application(urls, globals(), autoreload=False)
  53. port = conf().get("wechatmp_port", 8080)
  54. web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
  55. def start_loop(self, loop):
  56. asyncio.set_event_loop(loop)
  57. loop.run_forever()
  58. async def delete_media(self, media_id):
  59. logger.debug("[wechatmp] permanent media {} will be deleted in 10s".format(media_id))
  60. await asyncio.sleep(10)
  61. self.client.material.delete(media_id)
  62. logger.info("[wechatmp] permanent media {} has been deleted".format(media_id))
  63. def send(self, reply: Reply, context: Context):
  64. receiver = context["receiver"]
  65. if self.passive_reply:
  66. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  67. reply_text = reply.content
  68. logger.info("[wechatmp] reply to {} cached:\n{}".format(receiver, reply_text))
  69. self.cache_dict[receiver] = ("text", reply_text)
  70. elif reply.type == ReplyType.VOICE:
  71. try:
  72. file_path = reply.content
  73. response = self.client.material.add("voice", open(file_path, "rb"))
  74. logger.debug("[wechatmp] upload voice response: {}".format(response))
  75. except WeChatClientException as e:
  76. logger.error("[wechatmp] upload voice failed: {}".format(e))
  77. return
  78. time.sleep(3) # todo check media_id
  79. media_id = response["media_id"]
  80. logger.info("[wechatmp] voice reply to {} uploaded: {}".format(receiver, media_id))
  81. self.cache_dict[receiver] = ("voice", media_id)
  82. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  83. img_url = reply.content
  84. pic_res = requests.get(img_url, stream=True)
  85. image_storage = io.BytesIO()
  86. for block in pic_res.iter_content(1024):
  87. image_storage.write(block)
  88. image_storage.seek(0)
  89. image_type = imghdr.what(image_storage)
  90. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  91. content_type = "image/" + image_type
  92. try:
  93. response = self.client.material.add("image", (filename, image_storage, content_type))
  94. logger.debug("[wechatmp] upload image response: {}".format(response))
  95. except WeChatClientException as e:
  96. logger.error("[wechatmp] upload image failed: {}".format(e))
  97. return
  98. media_id = response["media_id"]
  99. logger.info(
  100. "[wechatmp] image reply url={}, receiver={}".format(img_url, receiver)
  101. )
  102. self.cache_dict[receiver] = ("image", media_id)
  103. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  104. image_storage = reply.content
  105. image_storage.seek(0)
  106. image_type = imghdr.what(image_storage)
  107. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  108. content_type = "image/" + image_type
  109. try:
  110. response = self.client.material.add("image", (filename, image_storage, content_type))
  111. logger.debug("[wechatmp] upload image response: {}".format(response))
  112. except WeChatClientException as e:
  113. logger.error("[wechatmp] upload image failed: {}".format(e))
  114. return
  115. media_id = response["media_id"]
  116. logger.info(
  117. "[wechatmp] image reply url={}, receiver={}".format(img_url, receiver)
  118. )
  119. self.cache_dict[receiver] = ("image", media_id)
  120. else:
  121. if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR:
  122. reply_text = reply.content
  123. self.client.message.send_text(receiver, reply_text)
  124. logger.info("[wechatmp] Do send to {}: {}".format(receiver, reply_text))
  125. elif reply.type == ReplyType.VOICE:
  126. try:
  127. file_path = reply.content
  128. file_name = os.path.basename(file_path)
  129. file_type = os.path.splitext(file_name)[1]
  130. if file_type == ".mp3":
  131. file_type = "audio/mpeg"
  132. elif file_type == ".amr":
  133. file_type = "audio/amr"
  134. else:
  135. mp3_file = os.path.splitext(file_path)[0] + ".mp3"
  136. any_to_mp3(file_path, mp3_file)
  137. file_path = mp3_file
  138. file_name = os.path.basename(file_path)
  139. file_type = "audio/mpeg"
  140. logger.info("[wechatmp] file_name: {}, file_type: {} ".format(file_name, file_type))
  141. response = self.client.media.upload("voice", (file_name, open(file_path, "rb"), file_type))
  142. logger.debug("[wechatmp] upload voice response: {}".format(response))
  143. except WeChatClientException as e:
  144. logger.error("[wechatmp] upload voice failed: {}".format(e))
  145. return
  146. self.client.message.send_voice(receiver, response["media_id"])
  147. logger.info("[wechatmp] Do send voice to {}".format(receiver))
  148. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  149. img_url = reply.content
  150. pic_res = requests.get(img_url, stream=True)
  151. image_storage = io.BytesIO()
  152. for block in pic_res.iter_content(1024):
  153. image_storage.write(block)
  154. image_storage.seek(0)
  155. image_type = imghdr.what(image_storage)
  156. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  157. content_type = "image/" + image_type
  158. try:
  159. response = self.client.media.upload("image", (filename, image_storage, content_type))
  160. logger.debug("[wechatmp] upload image response: {}".format(response))
  161. except WeChatClientException as e:
  162. logger.error("[wechatmp] upload image failed: {}".format(e))
  163. return
  164. self.client.message.send_image(
  165. receiver, response["media_id"]
  166. )
  167. logger.info(
  168. "[wechatmp] sendImage url={}, receiver={}".format(img_url, receiver)
  169. )
  170. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  171. image_storage = reply.content
  172. image_storage.seek(0)
  173. image_type = imghdr.what(image_storage)
  174. filename = receiver + "-" + str(context['msg'].msg_id) + "." + image_type
  175. content_type = "image/" + image_type
  176. try:
  177. response = self.client.media.upload("image", (filename, image_storage, content_type))
  178. logger.debug("[wechatmp] upload image response: {}".format(response))
  179. except WeChatClientException as e:
  180. logger.error("[wechatmp] upload image failed: {}".format(e))
  181. return
  182. self.client.message.send_image(
  183. receiver, response["media_id"]
  184. )
  185. logger.info(
  186. "[wechatmp] sendImage, receiver={}".format(receiver)
  187. )
  188. return
  189. def _success_callback(self, session_id, context, **kwargs): # 线程异常结束时的回调函数
  190. logger.debug(
  191. "[wechatmp] Success to generate reply, msgId={}".format(
  192. context["msg"].msg_id
  193. )
  194. )
  195. if self.passive_reply:
  196. self.running.remove(session_id)
  197. def _fail_callback(self, session_id, exception, context, **kwargs): # 线程异常结束时的回调函数
  198. logger.exception(
  199. "[wechatmp] Fail to generate reply to user, msgId={}, exception={}".format(
  200. context["msg"].msg_id, exception
  201. )
  202. )
  203. if self.passive_reply:
  204. assert session_id not in self.cache_dict
  205. self.running.remove(session_id)