您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

136 行
5.4KB

  1. # encoding:utf-8
  2. """
  3. wechaty channel
  4. Python Wechaty - https://github.com/wechaty/python-wechaty
  5. """
  6. import base64
  7. from concurrent.futures import ThreadPoolExecutor
  8. import os
  9. import time
  10. import asyncio
  11. from bridge.context import Context
  12. from wechaty_puppet import FileBox
  13. from wechaty import Wechaty, Contact
  14. from wechaty.user import Message
  15. from bridge.reply import *
  16. from bridge.context import *
  17. from channel.chat_channel import ChatChannel
  18. from channel.wechat.wechaty_message import WechatyMessage
  19. from common.log import logger
  20. from config import conf
  21. try:
  22. from voice.audio_convert import mp3_to_sil
  23. except Exception as e:
  24. pass
  25. thread_pool = ThreadPoolExecutor(max_workers=8)
  26. def thread_pool_callback(worker):
  27. worker_exception = worker.exception()
  28. if worker_exception:
  29. logger.exception("Worker return exception: {}".format(worker_exception))
  30. class WechatyChannel(ChatChannel):
  31. def __init__(self):
  32. pass
  33. def startup(self):
  34. asyncio.run(self.main())
  35. async def main(self):
  36. config = conf()
  37. token = config.get('wechaty_puppet_service_token')
  38. os.environ['WECHATY_PUPPET_SERVICE_TOKEN'] = token
  39. os.environ['WECHATY_LOG']="warn"
  40. # os.environ['WECHATY_PUPPET_SERVICE_ENDPOINT'] = '127.0.0.1:9001'
  41. self.bot = Wechaty()
  42. self.bot.on('login', self.on_login)
  43. self.bot.on('message', self.on_message)
  44. await self.bot.start()
  45. async def on_login(self, contact: Contact):
  46. self.user_id = contact.contact_id
  47. self.name = contact.name
  48. logger.info('[WX] login user={}'.format(contact))
  49. # 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息
  50. def send(self, reply: Reply, context: Context):
  51. receiver_id = context['receiver']
  52. loop = asyncio.get_event_loop()
  53. if context['isgroup']:
  54. receiver = asyncio.run_coroutine_threadsafe(self.bot.Room.find(receiver_id),loop).result()
  55. else:
  56. receiver = asyncio.run_coroutine_threadsafe(self.bot.Contact.find(receiver_id),loop).result()
  57. msg = None
  58. if reply.type == ReplyType.TEXT:
  59. msg = reply.content
  60. asyncio.run_coroutine_threadsafe(receiver.say(msg),loop).result()
  61. logger.info('[WX] sendMsg={}, receiver={}'.format(reply, receiver))
  62. elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
  63. msg = reply.content
  64. asyncio.run_coroutine_threadsafe(receiver.say(msg),loop).result()
  65. logger.info('[WX] sendMsg={}, receiver={}'.format(reply, receiver))
  66. elif reply.type == ReplyType.VOICE:
  67. voiceLength = None
  68. if reply.content.endswith('.mp3'):
  69. mp3_file = reply.content
  70. sil_file = os.path.splitext(mp3_file)[0] + '.sil'
  71. voiceLength = mp3_to_sil(mp3_file, sil_file)
  72. try:
  73. os.remove(mp3_file)
  74. except Exception as e:
  75. pass
  76. elif reply.content.endswith('.sil'):
  77. sil_file = reply.content
  78. else:
  79. raise Exception('voice file must be mp3 or sil format')
  80. # 发送语音
  81. t = int(time.time())
  82. msg = FileBox.from_file(sil_file, name=str(t) + '.sil')
  83. if voiceLength is not None:
  84. msg.metadata['voiceLength'] = voiceLength
  85. asyncio.run_coroutine_threadsafe(receiver.say(msg),loop).result()
  86. try:
  87. os.remove(sil_file)
  88. except Exception as e:
  89. pass
  90. logger.info('[WX] sendVoice={}, receiver={}'.format(reply.content, receiver))
  91. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  92. img_url = reply.content
  93. t = int(time.time())
  94. msg = FileBox.from_url(url=img_url, name=str(t) + '.png')
  95. asyncio.run_coroutine_threadsafe(receiver.say(msg),loop).result()
  96. logger.info('[WX] sendImage url={}, receiver={}'.format(img_url,receiver))
  97. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  98. image_storage = reply.content
  99. image_storage.seek(0)
  100. t = int(time.time())
  101. msg = FileBox.from_base64(base64.b64encode(image_storage.read()), str(t) + '.png')
  102. asyncio.run_coroutine_threadsafe(receiver.say(msg),loop).result()
  103. logger.info('[WX] sendImage, receiver={}'.format(receiver))
  104. async def on_message(self, msg: Message):
  105. """
  106. listen for message event
  107. """
  108. try:
  109. cmsg = await WechatyMessage(msg)
  110. except NotImplementedError as e:
  111. logger.debug('[WX] {}'.format(e))
  112. return
  113. except Exception as e:
  114. logger.exception('[WX] {}'.format(e))
  115. return
  116. logger.debug('[WX] message:{}'.format(cmsg))
  117. room = msg.room() # 获取消息来自的群聊. 如果消息不是来自群聊, 则返回None
  118. isgroup = room is not None
  119. ctype = cmsg.ctype
  120. context = self._compose_context(ctype, cmsg.content, isgroup=isgroup, msg=cmsg)
  121. if context:
  122. logger.info('[WX] receiveMsg={}, context={}'.format(cmsg, context))
  123. thread_pool.submit(self._handle_loop, context, asyncio.get_event_loop()).add_done_callback(thread_pool_callback)
  124. def _handle_loop(self,context,loop):
  125. asyncio.set_event_loop(loop)
  126. self._handle(context)