You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

378 lines
18KB

  1. # encoding:utf-8
  2. """
  3. wechat channel
  4. """
  5. import os
  6. import requests
  7. import io
  8. import time
  9. from lib import itchat
  10. import json
  11. from lib.itchat.content import *
  12. from bridge.reply import *
  13. from bridge.context import *
  14. from channel.channel import Channel
  15. from concurrent.futures import ThreadPoolExecutor
  16. from common.log import logger
  17. from common.tmp_dir import TmpDir
  18. from config import conf
  19. from common.time_check import time_checker
  20. from plugins import *
  21. try:
  22. from voice.audio_convert import mp3_to_wav
  23. except Exception as e:
  24. pass
  25. thread_pool = ThreadPoolExecutor(max_workers=8)
  26. def thread_pool_callback(worker):
  27. worker_exception = worker.exception()
  28. if worker_exception:
  29. logger.exception("Worker return exception: {}".format(worker_exception))
  30. @itchat.msg_register(TEXT)
  31. def handler_single_msg(msg):
  32. WechatChannel().handle_text(msg)
  33. return None
  34. @itchat.msg_register(TEXT, isGroupChat=True)
  35. def handler_group_msg(msg):
  36. WechatChannel().handle_group(msg)
  37. return None
  38. @itchat.msg_register(VOICE)
  39. def handler_single_voice(msg):
  40. WechatChannel().handle_voice(msg)
  41. return None
  42. @itchat.msg_register(VOICE, isGroupChat=True)
  43. def handler_group_voice(msg):
  44. WechatChannel().handle_group_voice(msg)
  45. return None
  46. class WechatChannel(Channel):
  47. def __init__(self):
  48. self.userName = None
  49. self.nickName = None
  50. def startup(self):
  51. itchat.instance.receivingRetryCount = 600 # 修改断线超时时间
  52. # login by scan QRCode
  53. hotReload = conf().get('hot_reload', False)
  54. try:
  55. itchat.auto_login(enableCmdQR=2, hotReload=hotReload)
  56. except Exception as e:
  57. if hotReload:
  58. logger.error("Hot reload failed, try to login without hot reload")
  59. itchat.logout()
  60. os.remove("itchat.pkl")
  61. itchat.auto_login(enableCmdQR=2, hotReload=hotReload)
  62. else:
  63. raise e
  64. self.userName = itchat.instance.storageClass.userName
  65. self.nickName = itchat.instance.storageClass.nickName
  66. logger.info("Wechat login success, username: {}, nickname: {}".format(self.userName, self.nickName))
  67. # start message listener
  68. itchat.run()
  69. # handle_* 系列函数处理收到的消息后构造Context,然后传入handle函数中处理Context和发送回复
  70. # Context包含了消息的所有信息,包括以下属性
  71. # type 消息类型, 包括TEXT、VOICE、IMAGE_CREATE
  72. # content 消息内容,如果是TEXT类型,content就是文本内容,如果是VOICE类型,content就是语音文件名,如果是IMAGE_CREATE类型,content就是图片生成命令
  73. # kwargs 附加参数字典,包含以下的key:
  74. # session_id: 会话id
  75. # isgroup: 是否是群聊
  76. # receiver: 需要回复的对象
  77. # msg: itchat的原始消息对象
  78. # origin_ctype: 原始消息类型,用于私聊语音消息时,避免匹配前缀
  79. # desire_rtype: 希望回复类型,TEXT类型是文本回复,VOICE类型是语音回复
  80. def handle_voice(self, msg):
  81. if conf().get('speech_recognition') != True:
  82. return
  83. logger.debug("[WX]receive voice msg: " + msg['FileName'])
  84. to_user_id = msg['ToUserName']
  85. from_user_id = msg['FromUserName']
  86. try:
  87. other_user_id = msg['User']['UserName'] # 对手方id
  88. except Exception as e:
  89. logger.warn("[WX]get other_user_id failed: " + str(e))
  90. if from_user_id == self.userName:
  91. other_user_id = to_user_id
  92. else:
  93. other_user_id = from_user_id
  94. if from_user_id == other_user_id:
  95. context = self._compose_context(ContextType.VOICE, msg['FileName'], isgroup=False, msg=msg, receiver=other_user_id, session_id=other_user_id)
  96. if context:
  97. thread_pool.submit(self.handle, context).add_done_callback(thread_pool_callback)
  98. @time_checker
  99. def handle_text(self, msg):
  100. logger.debug("[WX]receive text msg: " + json.dumps(msg, ensure_ascii=False))
  101. content = msg['Text']
  102. from_user_id = msg['FromUserName']
  103. to_user_id = msg['ToUserName'] # 接收人id
  104. try:
  105. other_user_id = msg['User']['UserName'] # 对手方id
  106. except Exception as e:
  107. logger.warn("[WX]get other_user_id failed: " + str(e))
  108. if from_user_id == self.userName:
  109. other_user_id = to_user_id
  110. else:
  111. other_user_id = from_user_id
  112. create_time = msg['CreateTime'] # 消息时间
  113. if conf().get('hot_reload') == True and int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息
  114. logger.debug("[WX]history message skipped")
  115. return
  116. if "」\n- - - - - - - - - - - - - - -" in content:
  117. logger.debug("[WX]reference query skipped")
  118. return
  119. context = self._compose_context(ContextType.TEXT, content, isgroup=False, msg=msg, receiver=other_user_id, session_id=other_user_id)
  120. if context:
  121. thread_pool.submit(self.handle, context).add_done_callback(thread_pool_callback)
  122. @time_checker
  123. def handle_group(self, msg):
  124. logger.debug("[WX]receive group msg: " + json.dumps(msg, ensure_ascii=False))
  125. group_name = msg['User'].get('NickName', None)
  126. group_id = msg['User'].get('UserName', None)
  127. create_time = msg['CreateTime'] # 消息时间
  128. if conf().get('hot_reload') == True and int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息
  129. logger.debug("[WX]history group message skipped")
  130. return
  131. if not group_name:
  132. return ""
  133. origin_content = msg['Content']
  134. content = msg['Content']
  135. content_list = content.split(' ', 1)
  136. context_special_list = content.split('\u2005', 1)
  137. if len(context_special_list) == 2:
  138. content = context_special_list[1]
  139. elif len(content_list) == 2:
  140. content = content_list[1]
  141. if "」\n- - - - - - - - - - - - - - -" in content:
  142. logger.debug("[WX]reference query skipped")
  143. return ""
  144. config = conf()
  145. group_name_white_list = config.get('group_name_white_list', [])
  146. group_name_keyword_white_list = config.get('group_name_keyword_white_list', [])
  147. if any([group_name in group_name_white_list, 'ALL_GROUP' in group_name_white_list, check_contain(group_name, group_name_keyword_white_list), msg['IsAt'] and not config.get("group_at_off", False)]):
  148. group_chat_in_one_session = conf().get('group_chat_in_one_session', [])
  149. session_id = msg['ActualUserName']
  150. if any([group_name in group_chat_in_one_session, 'ALL_GROUP' in group_chat_in_one_session]):
  151. session_id = group_id
  152. context = self._compose_context(ContextType.TEXT, content, isgroup=True, msg=msg, receiver=group_id, session_id=session_id)
  153. if context:
  154. thread_pool.submit(self.handle, context).add_done_callback(thread_pool_callback)
  155. def handle_group_voice(self, msg):
  156. if conf().get('group_speech_recognition', False) != True:
  157. return
  158. logger.debug("[WX]receive voice for group msg: " + msg['FileName'])
  159. group_name = msg['User'].get('NickName', None)
  160. group_id = msg['User'].get('UserName', None)
  161. create_time = msg['CreateTime'] # 消息时间
  162. if conf().get('hot_reload') == True and int(create_time) < int(time.time()) - 60: #跳过1分钟前的历史消息
  163. logger.debug("[WX]history group voice skipped")
  164. return
  165. # 验证群名
  166. if not group_name:
  167. return ""
  168. config = conf()
  169. group_name_white_list = config.get('group_name_white_list', [])
  170. group_name_keyword_white_list = config.get('group_name_keyword_white_list', [])
  171. if any([group_name in group_name_white_list, 'ALL_GROUP' in group_name_white_list, check_contain(group_name, group_name_keyword_white_list)]):
  172. group_chat_in_one_session = conf().get('group_chat_in_one_session', [])
  173. session_id =msg['ActualUserName']
  174. if any([group_name in group_chat_in_one_session, 'ALL_GROUP' in group_chat_in_one_session]):
  175. session_id = group_id
  176. context = self._compose_context(ContextType.VOICE, msg['FileName'], isgroup=True, msg=msg, receiver=group_id, session_id=session_id)
  177. if context:
  178. thread_pool.submit(self.handle, context).add_done_callback(thread_pool_callback)
  179. def _compose_context(self, ctype: ContextType, content, **kwargs):
  180. context = Context(ctype, content)
  181. context.kwargs = kwargs
  182. if 'origin_ctype' not in context:
  183. context['origin_ctype'] = ctype
  184. if ctype == ContextType.TEXT:
  185. if context["isgroup"]: # 群聊
  186. # 校验关键字
  187. match_prefix = check_prefix(content, conf().get('group_chat_prefix'))
  188. match_contain = check_contain(content, conf().get('group_chat_keyword'))
  189. if match_prefix is not None or match_contain is not None:
  190. # 判断如果匹配到自定义前缀,则返回过滤掉前缀+空格后的内容,用于实现类似自定义+前缀触发生成AI图片的功能
  191. if match_prefix:
  192. content = content.replace(match_prefix, '', 1).strip()
  193. elif context["origin_ctype"] == ContextType.VOICE:
  194. logger.info("[WX]receive group voice, checkprefix didn't match")
  195. return None
  196. else: # 单聊
  197. match_prefix = check_prefix(content, conf().get('single_chat_prefix'))
  198. if match_prefix: # 判断如果匹配到自定义前缀,则返回过滤掉前缀+空格后的内容
  199. content = content.replace(match_prefix, '', 1).strip()
  200. elif context["origin_ctype"] == ContextType.VOICE: # 如果源消息是私聊的语音消息,不匹配前缀,直接返回
  201. pass
  202. else:
  203. return None
  204. img_match_prefix = check_prefix(content, conf().get('image_create_prefix'))
  205. if img_match_prefix:
  206. content = content.replace(img_match_prefix, '', 1).strip()
  207. context.type = ContextType.IMAGE_CREATE
  208. else:
  209. context.type = ContextType.TEXT
  210. context.content = content
  211. elif context.type == ContextType.VOICE:
  212. if 'desire_rtype' not in context and conf().get('voice_reply_voice'):
  213. context['desire_rtype'] = ReplyType.VOICE
  214. return context
  215. # 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息
  216. def send(self, reply: Reply, receiver, retry_cnt = 0):
  217. try:
  218. if reply.type == ReplyType.TEXT:
  219. itchat.send(reply.content, toUserName=receiver)
  220. logger.info('[WX] sendMsg={}, receiver={}'.format(reply, receiver))
  221. elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
  222. itchat.send(reply.content, toUserName=receiver)
  223. logger.info('[WX] sendMsg={}, receiver={}'.format(reply, receiver))
  224. elif reply.type == ReplyType.VOICE:
  225. itchat.send_file(reply.content, toUserName=receiver)
  226. logger.info('[WX] sendFile={}, receiver={}'.format(reply.content, receiver))
  227. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  228. img_url = reply.content
  229. pic_res = requests.get(img_url, stream=True)
  230. image_storage = io.BytesIO()
  231. for block in pic_res.iter_content(1024):
  232. image_storage.write(block)
  233. image_storage.seek(0)
  234. itchat.send_image(image_storage, toUserName=receiver)
  235. logger.info('[WX] sendImage url={}, receiver={}'.format(img_url,receiver))
  236. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  237. image_storage = reply.content
  238. image_storage.seek(0)
  239. itchat.send_image(image_storage, toUserName=receiver)
  240. logger.info('[WX] sendImage, receiver={}'.format(receiver))
  241. except Exception as e:
  242. logger.error('[WX] sendMsg error: {}, receiver={}'.format(e, receiver))
  243. if retry_cnt < 2:
  244. time.sleep(3+3*retry_cnt)
  245. self.send(reply, receiver, retry_cnt + 1)
  246. # 处理消息 TODO: 如果wechaty解耦,此处逻辑可以放置到父类
  247. def handle(self, context: Context):
  248. if context is None or not context.content:
  249. return
  250. logger.debug('[WX] ready to handle context: {}'.format(context))
  251. # reply的构建步骤
  252. reply = self._generate_reply(context)
  253. logger.debug('[WX] ready to decorate reply: {}'.format(reply))
  254. # reply的包装步骤
  255. reply = self._decorate_reply(context, reply)
  256. # reply的发送步骤
  257. self._send_reply(context, reply)
  258. def _generate_reply(self, context: Context, reply: Reply = Reply()) -> Reply:
  259. e_context = PluginManager().emit_event(EventContext(Event.ON_HANDLE_CONTEXT, {
  260. 'channel': self, 'context': context, 'reply': reply}))
  261. reply = e_context['reply']
  262. if not e_context.is_pass():
  263. logger.debug('[WX] ready to handle context: type={}, content={}'.format(context.type, context.content))
  264. if context.type == ContextType.TEXT or context.type == ContextType.IMAGE_CREATE: # 文字和图片消息
  265. reply = super().build_reply_content(context.content, context)
  266. elif context.type == ContextType.VOICE: # 语音消息
  267. msg = context['msg']
  268. mp3_path = TmpDir().path() + context.content
  269. msg.download(mp3_path)
  270. # mp3转wav
  271. wav_path = os.path.splitext(mp3_path)[0] + '.wav'
  272. try:
  273. mp3_to_wav(mp3_path=mp3_path, wav_path=wav_path)
  274. except Exception as e: # 转换失败,直接使用mp3,对于某些api,mp3也可以识别
  275. logger.warning("[WX]mp3 to wav error, use mp3 path. " + str(e))
  276. wav_path = mp3_path
  277. # 语音识别
  278. reply = super().build_voice_to_text(wav_path)
  279. # 删除临时文件
  280. try:
  281. os.remove(wav_path)
  282. os.remove(mp3_path)
  283. except Exception as e:
  284. logger.warning("[WX]delete temp file error: " + str(e))
  285. if reply.type == ReplyType.TEXT:
  286. new_context = self._compose_context(
  287. ContextType.TEXT, reply.content, **context.kwargs)
  288. if new_context:
  289. reply = self._generate_reply(new_context)
  290. else:
  291. logger.error('[WX] unknown context type: {}'.format(context.type))
  292. return
  293. return reply
  294. def _decorate_reply(self, context: Context, reply: Reply) -> Reply:
  295. if reply and reply.type:
  296. e_context = PluginManager().emit_event(EventContext(Event.ON_DECORATE_REPLY, {
  297. 'channel': self, 'context': context, 'reply': reply}))
  298. reply = e_context['reply']
  299. desire_rtype = context.get('desire_rtype')
  300. if not e_context.is_pass() and reply and reply.type:
  301. if reply.type == ReplyType.TEXT:
  302. reply_text = reply.content
  303. if desire_rtype == ReplyType.VOICE:
  304. reply = super().build_text_to_voice(reply.content)
  305. return self._decorate_reply(context, reply)
  306. if context['isgroup']:
  307. reply_text = '@' + context['msg']['ActualNickName'] + ' ' + reply_text.strip()
  308. reply_text = conf().get("group_chat_reply_prefix", "")+reply_text
  309. else:
  310. reply_text = conf().get("single_chat_reply_prefix", "")+reply_text
  311. reply.content = reply_text
  312. elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
  313. reply.content = str(reply.type)+":\n" + reply.content
  314. elif reply.type == ReplyType.IMAGE_URL or reply.type == ReplyType.VOICE or reply.type == ReplyType.IMAGE:
  315. pass
  316. else:
  317. logger.error('[WX] unknown reply type: {}'.format(reply.type))
  318. return
  319. if desire_rtype and desire_rtype != reply.type and reply.type not in [ReplyType.ERROR, ReplyType.INFO]:
  320. logger.warning('[WX] desire_rtype: {}, but reply type: {}'.format(context.get('desire_rtype'), reply.type))
  321. return reply
  322. def _send_reply(self, context: Context, reply: Reply):
  323. if reply and reply.type:
  324. e_context = PluginManager().emit_event(EventContext(Event.ON_SEND_REPLY, {
  325. 'channel': self, 'context': context, 'reply': reply}))
  326. reply = e_context['reply']
  327. if not e_context.is_pass() and reply and reply.type:
  328. logger.debug('[WX] ready to send reply: {} to {}'.format(reply, context['receiver']))
  329. self.send(reply, context['receiver'])
  330. def check_prefix(content, prefix_list):
  331. for prefix in prefix_list:
  332. if content.startswith(prefix):
  333. return prefix
  334. return None
  335. def check_contain(content, keyword_list):
  336. if not keyword_list:
  337. return None
  338. for ky in keyword_list:
  339. if content.find(ky) != -1:
  340. return True
  341. return None