You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

183 lines
8.2KB

  1. import datetime
  2. import json
  3. import os
  4. import re
  5. import time
  6. import pilk
  7. from bridge.context import ContextType
  8. from channel.chat_message import ChatMessage
  9. from common.log import logger
  10. def get_with_retry(get_func, max_retries=5, delay=5):
  11. retries = 0
  12. result = None
  13. while retries < max_retries:
  14. result = get_func()
  15. if result:
  16. break
  17. logger.warning(f"获取数据失败,重试第{retries + 1}次······")
  18. retries += 1
  19. time.sleep(delay) # 等待一段时间后重试
  20. return result
  21. def get_room_info(wework, conversation_id):
  22. logger.debug(f"传入的 conversation_id: {conversation_id}")
  23. rooms = wework.get_rooms()
  24. if not rooms or 'room_list' not in rooms:
  25. logger.error(f"获取群聊信息失败: {rooms}")
  26. return None
  27. time.sleep(1)
  28. logger.debug(f"获取到的群聊信息: {rooms}")
  29. for room in rooms['room_list']:
  30. if room['conversation_id'] == conversation_id:
  31. return room
  32. return None
  33. def cdn_download(wework, message, file_name):
  34. data = message["data"]
  35. url = data["cdn"]["url"]
  36. auth_key = data["cdn"]["auth_key"]
  37. aes_key = data["cdn"]["aes_key"]
  38. file_size = data["cdn"]["size"]
  39. # 获取当前工作目录,然后与文件名拼接得到保存路径
  40. current_dir = os.getcwd()
  41. save_path = os.path.join(current_dir, "tmp", file_name)
  42. result = wework.wx_cdn_download(url, auth_key, aes_key, file_size, save_path)
  43. logger.debug(result)
  44. def c2c_download_and_convert(wework, message, file_name):
  45. data = message["data"]
  46. aes_key = data["cdn"]["aes_key"]
  47. file_size = data["cdn"]["size"]
  48. file_type = 5
  49. file_id = data["cdn"]["file_id"]
  50. current_dir = os.getcwd()
  51. save_path = os.path.join(current_dir, "tmp", file_name)
  52. result = wework.c2c_cdn_download(file_id, aes_key, file_size, file_type, save_path)
  53. logger.debug(result)
  54. # 在下载完SILK文件之后,立即将其转换为WAV文件
  55. base_name, _ = os.path.splitext(save_path)
  56. wav_file = base_name + ".wav"
  57. pilk.silk_to_wav(save_path, wav_file, rate=24000)
  58. class WeworkMessage(ChatMessage):
  59. def __init__(self, wework_msg, wework, is_group=False):
  60. try:
  61. super().__init__(wework_msg)
  62. self.msg_id = wework_msg['data'].get('conversation_id', wework_msg['data'].get('room_conversation_id'))
  63. # 使用.get()防止 'send_time' 键不存在时抛出错误
  64. self.create_time = wework_msg['data'].get("send_time")
  65. self.is_group = is_group
  66. self.wework = wework
  67. if wework_msg["type"] == 11041: # 文本消息类型
  68. if any(substring in wework_msg['data']['content'] for substring in ("该消息类型暂不能展示", "不支持的消息类型")):
  69. return
  70. self.ctype = ContextType.TEXT
  71. self.content = wework_msg['data']['content']
  72. elif wework_msg["type"] == 11044: # 语音消息类型,需要缓存文件
  73. file_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ".silk"
  74. base_name, _ = os.path.splitext(file_name)
  75. file_name_2 = base_name + ".wav"
  76. current_dir = os.getcwd()
  77. self.ctype = ContextType.VOICE
  78. self.content = os.path.join(current_dir, "tmp", file_name_2)
  79. self._prepare_fn = lambda: c2c_download_and_convert(wework, wework_msg, file_name)
  80. elif wework_msg["type"] == 11042: # 图片消息类型,需要下载文件
  81. file_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ".jpg"
  82. current_dir = os.getcwd()
  83. self.ctype = ContextType.IMAGE
  84. self.content = os.path.join(current_dir, "tmp", file_name)
  85. self._prepare_fn = lambda: cdn_download(wework, wework_msg, file_name)
  86. elif wework_msg["type"] == 11072: # 新成员入群通知
  87. self.ctype = ContextType.JOIN_GROUP
  88. member_list = wework_msg['data']['member_list']
  89. self.actual_user_nickname = member_list[0]['name']
  90. self.actual_user_id = member_list[0]['user_id']
  91. self.content = f"{self.actual_user_nickname}加入了群聊!"
  92. directory = os.path.join(os.getcwd(), "tmp")
  93. rooms = get_with_retry(wework.get_rooms)
  94. if not rooms:
  95. logger.error("更新群信息失败···")
  96. else:
  97. result = {}
  98. for room in rooms['room_list']:
  99. # 获取聊天室ID
  100. room_wxid = room['conversation_id']
  101. # 获取聊天室成员
  102. room_members = wework.get_room_members(room_wxid)
  103. # 将聊天室成员保存到结果字典中
  104. result[room_wxid] = room_members
  105. with open(os.path.join(directory, 'wework_room_members.json'), 'w', encoding='utf-8') as f:
  106. json.dump(result, f, ensure_ascii=False, indent=4)
  107. logger.info("有新成员加入,已自动更新群成员列表缓存!")
  108. else:
  109. raise NotImplementedError(
  110. "Unsupported message type: Type:{} MsgType:{}".format(wework_msg["type"], wework_msg["MsgType"]))
  111. data = wework_msg['data']
  112. login_info = self.wework.get_login_info()
  113. logger.debug(f"login_info: {login_info}")
  114. nickname = f"{login_info['username']}({login_info['nickname']})" if login_info['nickname'] else login_info['username']
  115. user_id = login_info['user_id']
  116. sender_id = data.get('sender')
  117. conversation_id = data.get('conversation_id')
  118. sender_name = data.get("sender_name")
  119. self.from_user_id = user_id if sender_id == user_id else conversation_id
  120. self.from_user_nickname = nickname if sender_id == user_id else sender_name
  121. self.to_user_id = user_id
  122. self.to_user_nickname = nickname
  123. self.other_user_nickname = sender_name
  124. self.other_user_id = conversation_id
  125. if self.is_group:
  126. conversation_id = data.get('conversation_id') or data.get('room_conversation_id')
  127. self.other_user_id = conversation_id
  128. if conversation_id:
  129. room_info = get_room_info(wework=wework, conversation_id=conversation_id)
  130. self.other_user_nickname = room_info.get('nickname', None) if room_info else None
  131. at_list = data.get('at_list', [])
  132. tmp_list = []
  133. for at in at_list:
  134. tmp_list.append(at['nickname'])
  135. at_list = tmp_list
  136. logger.debug(f"at_list: {at_list}")
  137. logger.debug(f"nickname: {nickname}")
  138. self.is_at = False
  139. if nickname in at_list or login_info['nickname'] in at_list or login_info['username'] in at_list:
  140. self.is_at = True
  141. self.at_list = at_list
  142. # 检查消息内容是否包含@用户名。处理复制粘贴的消息,这类消息可能不会触发@通知,但内容中可能包含 "@用户名"。
  143. content = data.get('content', '')
  144. name = nickname
  145. pattern = f"@{re.escape(name)}(\u2005|\u0020)"
  146. if re.search(pattern, content):
  147. logger.debug(f"Wechaty message {self.msg_id} includes at")
  148. self.is_at = True
  149. if not self.actual_user_id:
  150. self.actual_user_id = data.get("sender")
  151. self.actual_user_nickname = sender_name if self.ctype != ContextType.JOIN_GROUP else self.actual_user_nickname
  152. else:
  153. logger.error("群聊消息中没有找到 conversation_id 或 room_conversation_id")
  154. logger.debug(f"WeworkMessage has been successfully instantiated with message id: {self.msg_id}")
  155. except Exception as e:
  156. logger.error(f"在 WeworkMessage 的初始化过程中出现错误:{e}")
  157. raise e