You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

wechat_channel.py 7.1KB

2 년 전
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. # encoding:utf-8
  2. """
  3. wechat channel
  4. """
  5. import os
  6. import requests
  7. import io
  8. import time
  9. from channel.chat_channel import ChatChannel
  10. from channel.wechat.wechat_message import *
  11. from common.singleton import singleton
  12. from lib import itchat
  13. import json
  14. from lib.itchat.content import *
  15. from bridge.reply import *
  16. from bridge.context import *
  17. from channel.channel import Channel
  18. from concurrent.futures import ThreadPoolExecutor
  19. from common.log import logger
  20. from config import conf
  21. from common.time_check import time_checker
  22. from common.expired_dict import ExpiredDict
  23. from plugins import *
  24. try:
  25. from voice.audio_convert import mp3_to_wav
  26. except Exception as e:
  27. pass
  28. thread_pool = ThreadPoolExecutor(max_workers=8)
  29. def thread_pool_callback(worker):
  30. worker_exception = worker.exception()
  31. if worker_exception:
  32. logger.exception("Worker return exception: {}".format(worker_exception))
  33. @itchat.msg_register(TEXT)
  34. def handler_single_msg(msg):
  35. WechatChannel().handle_text(WeChatMessage(msg))
  36. return None
  37. @itchat.msg_register(TEXT, isGroupChat=True)
  38. def handler_group_msg(msg):
  39. WechatChannel().handle_group(WeChatMessage(msg,True))
  40. return None
  41. @itchat.msg_register(VOICE)
  42. def handler_single_voice(msg):
  43. WechatChannel().handle_voice(WeChatMessage(msg))
  44. return None
  45. @itchat.msg_register(VOICE, isGroupChat=True)
  46. def handler_group_voice(msg):
  47. WechatChannel().handle_group_voice(WeChatMessage(msg,True))
  48. return None
  49. def _check(func):
  50. def wrapper(self, cmsg: ChatMessage):
  51. msgId = cmsg.msg_id
  52. if msgId in self.receivedMsgs:
  53. logger.info("Wechat message {} already received, ignore".format(msgId))
  54. return
  55. self.receivedMsgs[msgId] = cmsg
  56. create_time = cmsg.create_time # 消息时间戳
  57. if conf().get('hot_reload') == True and int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息
  58. logger.debug("[WX]history message {} skipped".format(msgId))
  59. return
  60. return func(self, cmsg)
  61. return wrapper
  62. @singleton
  63. class WechatChannel(ChatChannel):
  64. def __init__(self):
  65. super().__init__()
  66. self.receivedMsgs = ExpiredDict(60*60*24)
  67. def startup(self):
  68. itchat.instance.receivingRetryCount = 600 # 修改断线超时时间
  69. # login by scan QRCode
  70. hotReload = conf().get('hot_reload', False)
  71. try:
  72. itchat.auto_login(enableCmdQR=2, hotReload=hotReload)
  73. except Exception as e:
  74. if hotReload:
  75. logger.error("Hot reload failed, try to login without hot reload")
  76. itchat.logout()
  77. os.remove("itchat.pkl")
  78. itchat.auto_login(enableCmdQR=2, hotReload=hotReload)
  79. else:
  80. raise e
  81. self.user_id = itchat.instance.storageClass.userName
  82. self.name = itchat.instance.storageClass.nickName
  83. logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name))
  84. # start message listener
  85. itchat.run()
  86. # handle_* 系列函数处理收到的消息后构造Context,然后传入_handle函数中处理Context和发送回复
  87. # Context包含了消息的所有信息,包括以下属性
  88. # type 消息类型, 包括TEXT、VOICE、IMAGE_CREATE
  89. # content 消息内容,如果是TEXT类型,content就是文本内容,如果是VOICE类型,content就是语音文件名,如果是IMAGE_CREATE类型,content就是图片生成命令
  90. # kwargs 附加参数字典,包含以下的key:
  91. # session_id: 会话id
  92. # isgroup: 是否是群聊
  93. # receiver: 需要回复的对象
  94. # msg: ChatMessage消息对象
  95. # origin_ctype: 原始消息类型,语音转文字后,私聊时如果匹配前缀失败,会根据初始消息是否是语音来放宽触发规则
  96. # desire_rtype: 希望回复类型,默认是文本回复,设置为ReplyType.VOICE是语音回复
  97. @time_checker
  98. @_check
  99. def handle_voice(self, cmsg : ChatMessage):
  100. if conf().get('speech_recognition') != True:
  101. return
  102. logger.debug("[WX]receive voice msg: {}".format(cmsg.content))
  103. context = self._compose_context(ContextType.VOICE, cmsg.content, isgroup=False, msg=cmsg)
  104. if context:
  105. thread_pool.submit(self._handle, context).add_done_callback(thread_pool_callback)
  106. @time_checker
  107. @_check
  108. def handle_text(self, cmsg : ChatMessage):
  109. logger.debug("[WX]receive text msg: " + json.dumps(cmsg._rawmsg, ensure_ascii=False))
  110. context = self._compose_context(ContextType.TEXT, cmsg.content, isgroup=False, msg=cmsg)
  111. if context:
  112. thread_pool.submit(self._handle, context).add_done_callback(thread_pool_callback)
  113. @time_checker
  114. @_check
  115. def handle_group(self, cmsg : ChatMessage):
  116. logger.debug("[WX]receive group msg: " + json.dumps(cmsg._rawmsg, ensure_ascii=False))
  117. context = self._compose_context(ContextType.TEXT, cmsg.content, isgroup=True, msg=cmsg)
  118. if context:
  119. thread_pool.submit(self._handle, context).add_done_callback(thread_pool_callback)
  120. @time_checker
  121. @_check
  122. def handle_group_voice(self, cmsg : ChatMessage):
  123. if conf().get('group_speech_recognition', False) != True:
  124. return
  125. logger.debug("[WX]receive voice for group msg: {}".format(cmsg.content))
  126. context = self._compose_context(ContextType.VOICE, cmsg.content, isgroup=True, msg=cmsg)
  127. if context:
  128. thread_pool.submit(self._handle, context).add_done_callback(thread_pool_callback)
  129. # 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息
  130. def send(self, reply: Reply, context: Context):
  131. receiver = context["receiver"]
  132. if reply.type == ReplyType.TEXT:
  133. itchat.send(reply.content, toUserName=receiver)
  134. logger.info('[WX] sendMsg={}, receiver={}'.format(reply, receiver))
  135. elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
  136. itchat.send(reply.content, toUserName=receiver)
  137. logger.info('[WX] sendMsg={}, receiver={}'.format(reply, receiver))
  138. elif reply.type == ReplyType.VOICE:
  139. itchat.send_file(reply.content, toUserName=receiver)
  140. logger.info('[WX] sendFile={}, receiver={}'.format(reply.content, receiver))
  141. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  142. img_url = reply.content
  143. pic_res = requests.get(img_url, stream=True)
  144. image_storage = io.BytesIO()
  145. for block in pic_res.iter_content(1024):
  146. image_storage.write(block)
  147. image_storage.seek(0)
  148. itchat.send_image(image_storage, toUserName=receiver)
  149. logger.info('[WX] sendImage url={}, receiver={}'.format(img_url,receiver))
  150. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  151. image_storage = reply.content
  152. image_storage.seek(0)
  153. itchat.send_image(image_storage, toUserName=receiver)
  154. logger.info('[WX] sendImage, receiver={}'.format(receiver))