You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

212 lines
9.2KB

  1. import datetime
  2. import json
  3. import os
  4. import re
  5. import time
  6. import pilk
  7. from bridge.context import ContextType
  8. from channel.chat_message import ChatMessage
  9. from common.log import logger
  10. from ntwork.const import send_type
  11. def get_with_retry(get_func, max_retries=5, delay=5):
  12. retries = 0
  13. result = None
  14. while retries < max_retries:
  15. result = get_func()
  16. if result:
  17. break
  18. logger.warning(f"获取数据失败,重试第{retries + 1}次······")
  19. retries += 1
  20. time.sleep(delay) # 等待一段时间后重试
  21. return result
  22. def get_room_info(wework, conversation_id):
  23. logger.debug(f"传入的 conversation_id: {conversation_id}")
  24. rooms = wework.get_rooms()
  25. if not rooms or 'room_list' not in rooms:
  26. logger.error(f"获取群聊信息失败: {rooms}")
  27. return None
  28. time.sleep(1)
  29. logger.debug(f"获取到的群聊信息: {rooms}")
  30. for room in rooms['room_list']:
  31. if room['conversation_id'] == conversation_id:
  32. return room
  33. return None
  34. def cdn_download(wework, message, file_name):
  35. data = message["data"]
  36. aes_key = data["cdn"]["aes_key"]
  37. file_size = data["cdn"]["size"]
  38. # 获取当前工作目录,然后与文件名拼接得到保存路径
  39. current_dir = os.getcwd()
  40. save_path = os.path.join(current_dir, "tmp", file_name)
  41. # 下载保存图片到本地
  42. if "url" in data["cdn"].keys() and "auth_key" in data["cdn"].keys():
  43. url = data["cdn"]["url"]
  44. auth_key = data["cdn"]["auth_key"]
  45. # result = wework.wx_cdn_download(url, auth_key, aes_key, file_size, save_path) # ntwork库本身接口有问题,缺失了aes_key这个参数
  46. """
  47. 下载wx类型的cdn文件,以https开头
  48. """
  49. data = {
  50. 'url': url,
  51. 'auth_key': auth_key,
  52. 'aes_key': aes_key,
  53. 'size': file_size,
  54. 'save_path': save_path
  55. }
  56. result = wework._WeWork__send_sync(send_type.MT_WXCDN_DOWNLOAD_MSG, data) # 直接用wx_cdn_download的接口内部实现来调用
  57. elif "file_id" in data["cdn"].keys():
  58. file_type = 2
  59. file_id = data["cdn"]["file_id"]
  60. result = wework.c2c_cdn_download(file_id, aes_key, file_size, file_type, save_path)
  61. else:
  62. logger.error(f"something is wrong, data: {data}")
  63. return
  64. # 输出下载结果
  65. logger.debug(f"result: {result}")
  66. def c2c_download_and_convert(wework, message, file_name):
  67. data = message["data"]
  68. aes_key = data["cdn"]["aes_key"]
  69. file_size = data["cdn"]["size"]
  70. file_type = 5
  71. file_id = data["cdn"]["file_id"]
  72. current_dir = os.getcwd()
  73. save_path = os.path.join(current_dir, "tmp", file_name)
  74. result = wework.c2c_cdn_download(file_id, aes_key, file_size, file_type, save_path)
  75. logger.debug(result)
  76. # 在下载完SILK文件之后,立即将其转换为WAV文件
  77. base_name, _ = os.path.splitext(save_path)
  78. wav_file = base_name + ".wav"
  79. pilk.silk_to_wav(save_path, wav_file, rate=24000)
  80. # 删除SILK文件
  81. try:
  82. os.remove(save_path)
  83. except Exception as e:
  84. pass
  85. class WeworkMessage(ChatMessage):
  86. def __init__(self, wework_msg, wework, is_group=False):
  87. try:
  88. super().__init__(wework_msg)
  89. self.msg_id = wework_msg['data'].get('conversation_id', wework_msg['data'].get('room_conversation_id'))
  90. # 使用.get()防止 'send_time' 键不存在时抛出错误
  91. self.create_time = wework_msg['data'].get("send_time")
  92. self.is_group = is_group
  93. self.wework = wework
  94. if wework_msg["type"] == 11041: # 文本消息类型
  95. if any(substring in wework_msg['data']['content'] for substring in ("该消息类型暂不能展示", "不支持的消息类型")):
  96. return
  97. self.ctype = ContextType.TEXT
  98. self.content = wework_msg['data']['content']
  99. elif wework_msg["type"] == 11044: # 语音消息类型,需要缓存文件
  100. file_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ".silk"
  101. base_name, _ = os.path.splitext(file_name)
  102. file_name_2 = base_name + ".wav"
  103. current_dir = os.getcwd()
  104. self.ctype = ContextType.VOICE
  105. self.content = os.path.join(current_dir, "tmp", file_name_2)
  106. self._prepare_fn = lambda: c2c_download_and_convert(wework, wework_msg, file_name)
  107. elif wework_msg["type"] == 11042: # 图片消息类型,需要下载文件
  108. file_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ".jpg"
  109. current_dir = os.getcwd()
  110. self.ctype = ContextType.IMAGE
  111. self.content = os.path.join(current_dir, "tmp", file_name)
  112. self._prepare_fn = lambda: cdn_download(wework, wework_msg, file_name)
  113. elif wework_msg["type"] == 11072: # 新成员入群通知
  114. self.ctype = ContextType.JOIN_GROUP
  115. member_list = wework_msg['data']['member_list']
  116. self.actual_user_nickname = member_list[0]['name']
  117. self.actual_user_id = member_list[0]['user_id']
  118. self.content = f"{self.actual_user_nickname}加入了群聊!"
  119. directory = os.path.join(os.getcwd(), "tmp")
  120. rooms = get_with_retry(wework.get_rooms)
  121. if not rooms:
  122. logger.error("更新群信息失败···")
  123. else:
  124. result = {}
  125. for room in rooms['room_list']:
  126. # 获取聊天室ID
  127. room_wxid = room['conversation_id']
  128. # 获取聊天室成员
  129. room_members = wework.get_room_members(room_wxid)
  130. # 将聊天室成员保存到结果字典中
  131. result[room_wxid] = room_members
  132. with open(os.path.join(directory, 'wework_room_members.json'), 'w', encoding='utf-8') as f:
  133. json.dump(result, f, ensure_ascii=False, indent=4)
  134. logger.info("有新成员加入,已自动更新群成员列表缓存!")
  135. else:
  136. raise NotImplementedError(
  137. "Unsupported message type: Type:{} MsgType:{}".format(wework_msg["type"], wework_msg["MsgType"]))
  138. data = wework_msg['data']
  139. login_info = self.wework.get_login_info()
  140. logger.debug(f"login_info: {login_info}")
  141. nickname = f"{login_info['username']}({login_info['nickname']})" if login_info['nickname'] else login_info['username']
  142. user_id = login_info['user_id']
  143. sender_id = data.get('sender')
  144. conversation_id = data.get('conversation_id')
  145. sender_name = data.get("sender_name")
  146. self.from_user_id = user_id if sender_id == user_id else conversation_id
  147. self.from_user_nickname = nickname if sender_id == user_id else sender_name
  148. self.to_user_id = user_id
  149. self.to_user_nickname = nickname
  150. self.other_user_nickname = sender_name
  151. self.other_user_id = conversation_id
  152. if self.is_group:
  153. conversation_id = data.get('conversation_id') or data.get('room_conversation_id')
  154. self.other_user_id = conversation_id
  155. if conversation_id:
  156. room_info = get_room_info(wework=wework, conversation_id=conversation_id)
  157. self.other_user_nickname = room_info.get('nickname', None) if room_info else None
  158. at_list = data.get('at_list', [])
  159. tmp_list = []
  160. for at in at_list:
  161. tmp_list.append(at['nickname'])
  162. at_list = tmp_list
  163. logger.debug(f"at_list: {at_list}")
  164. logger.debug(f"nickname: {nickname}")
  165. self.is_at = False
  166. if nickname in at_list or login_info['nickname'] in at_list or login_info['username'] in at_list:
  167. self.is_at = True
  168. self.at_list = at_list
  169. # 检查消息内容是否包含@用户名。处理复制粘贴的消息,这类消息可能不会触发@通知,但内容中可能包含 "@用户名"。
  170. content = data.get('content', '')
  171. name = nickname
  172. pattern = f"@{re.escape(name)}(\u2005|\u0020)"
  173. if re.search(pattern, content):
  174. logger.debug(f"Wechaty message {self.msg_id} includes at")
  175. self.is_at = True
  176. if not self.actual_user_id:
  177. self.actual_user_id = data.get("sender")
  178. self.actual_user_nickname = sender_name if self.ctype != ContextType.JOIN_GROUP else self.actual_user_nickname
  179. else:
  180. logger.error("群聊消息中没有找到 conversation_id 或 room_conversation_id")
  181. logger.debug(f"WeworkMessage has been successfully instantiated with message id: {self.msg_id}")
  182. except Exception as e:
  183. logger.error(f"在 WeworkMessage 的初始化过程中出现错误:{e}")
  184. raise e