You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

wework_message.py 10KB

9 kuukautta sitten
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import datetime
  2. import json
  3. import os
  4. import re
  5. import time
  6. import pilk
  7. from bridge.context import ContextType
  8. from channel.chat_message import ChatMessage
  9. from common.log import logger
  10. from ntwork.const import send_type
  11. def get_with_retry(get_func, max_retries=5, delay=5):
  12. retries = 0
  13. result = None
  14. while retries < max_retries:
  15. result = get_func()
  16. if result:
  17. break
  18. logger.warning(f"获取数据失败,重试第{retries + 1}次······")
  19. retries += 1
  20. time.sleep(delay) # 等待一段时间后重试
  21. return result
  22. def get_room_info(wework, conversation_id):
  23. logger.debug(f"传入的 conversation_id: {conversation_id}")
  24. rooms = wework.get_rooms()
  25. if not rooms or 'room_list' not in rooms:
  26. logger.error(f"获取群聊信息失败: {rooms}")
  27. return None
  28. time.sleep(1)
  29. logger.debug(f"获取到的群聊信息: {rooms}")
  30. for room in rooms['room_list']:
  31. if room['conversation_id'] == conversation_id:
  32. return room
  33. return None
  34. def cdn_download(wework, message, file_name):
  35. data = message["data"]
  36. aes_key = data["cdn"]["aes_key"]
  37. file_size = data["cdn"]["size"]
  38. # 获取当前工作目录,然后与文件名拼接得到保存路径
  39. current_dir = os.getcwd()
  40. save_path = os.path.join(current_dir, "tmp", file_name)
  41. # 下载保存图片到本地
  42. if "url" in data["cdn"].keys() and "auth_key" in data["cdn"].keys():
  43. url = data["cdn"]["url"]
  44. auth_key = data["cdn"]["auth_key"]
  45. # result = wework.wx_cdn_download(url, auth_key, aes_key, file_size, save_path) # ntwork库本身接口有问题,缺失了aes_key这个参数
  46. """
  47. 下载wx类型的cdn文件,以https开头
  48. """
  49. data = {
  50. 'url': url,
  51. 'auth_key': auth_key,
  52. 'aes_key': aes_key,
  53. 'size': file_size,
  54. 'save_path': save_path
  55. }
  56. result = wework._WeWork__send_sync(send_type.MT_WXCDN_DOWNLOAD_MSG, data) # 直接用wx_cdn_download的接口内部实现来调用
  57. elif "file_id" in data["cdn"].keys():
  58. if message["type"] == 11042:
  59. file_type = 2
  60. elif message["type"] == 11045:
  61. file_type = 5
  62. file_id = data["cdn"]["file_id"]
  63. result = wework.c2c_cdn_download(file_id, aes_key, file_size, file_type, save_path)
  64. else:
  65. logger.error(f"something is wrong, data: {data}")
  66. return
  67. # 输出下载结果
  68. logger.debug(f"result: {result}")
  69. def c2c_download_and_convert(wework, message, file_name):
  70. data = message["data"]
  71. aes_key = data["cdn"]["aes_key"]
  72. file_size = data["cdn"]["size"]
  73. file_type = 5
  74. file_id = data["cdn"]["file_id"]
  75. current_dir = os.getcwd()
  76. save_path = os.path.join(current_dir, "tmp", file_name)
  77. result = wework.c2c_cdn_download(file_id, aes_key, file_size, file_type, save_path)
  78. logger.debug(result)
  79. # 在下载完SILK文件之后,立即将其转换为WAV文件
  80. base_name, _ = os.path.splitext(save_path)
  81. wav_file = base_name + ".wav"
  82. pilk.silk_to_wav(save_path, wav_file, rate=24000)
  83. # 删除SILK文件
  84. try:
  85. os.remove(save_path)
  86. except Exception as e:
  87. pass
  88. class WeworkMessage(ChatMessage):
  89. def __init__(self, wework_msg, wework, is_group=False):
  90. try:
  91. super().__init__(wework_msg)
  92. self.msg_id = wework_msg['data'].get('conversation_id', wework_msg['data'].get('room_conversation_id'))
  93. # 使用.get()防止 'send_time' 键不存在时抛出错误
  94. self.create_time = wework_msg['data'].get("send_time")
  95. self.is_group = is_group
  96. self.wework = wework
  97. if wework_msg["type"] == 11041: # 文本消息类型
  98. if any(substring in wework_msg['data']['content'] for substring in ("该消息类型暂不能展示", "不支持的消息类型")):
  99. return
  100. self.ctype = ContextType.TEXT
  101. self.content = wework_msg['data']['content']
  102. elif wework_msg["type"] == 11044: # 语音消息类型,需要缓存文件
  103. file_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ".silk"
  104. base_name, _ = os.path.splitext(file_name)
  105. file_name_2 = base_name + ".wav"
  106. current_dir = os.getcwd()
  107. self.ctype = ContextType.VOICE
  108. self.content = os.path.join(current_dir, "tmp", file_name_2)
  109. self._prepare_fn = lambda: c2c_download_and_convert(wework, wework_msg, file_name)
  110. elif wework_msg["type"] == 11042: # 图片消息类型,需要下载文件
  111. file_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ".jpg"
  112. current_dir = os.getcwd()
  113. self.ctype = ContextType.IMAGE
  114. self.content = os.path.join(current_dir, "tmp", file_name)
  115. self._prepare_fn = lambda: cdn_download(wework, wework_msg, file_name)
  116. elif wework_msg["type"] == 11045: # 文件消息
  117. print("文件消息")
  118. print(wework_msg)
  119. file_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
  120. file_name = file_name + wework_msg['data']['cdn']['file_name']
  121. current_dir = os.getcwd()
  122. self.ctype = ContextType.FILE
  123. self.content = os.path.join(current_dir, "tmp", file_name)
  124. self._prepare_fn = lambda: cdn_download(wework, wework_msg, file_name)
  125. elif wework_msg["type"] == 11047: # 链接消息
  126. self.ctype = ContextType.SHARING
  127. self.content = wework_msg['data']['url']
  128. elif wework_msg["type"] == 11072: # 新成员入群通知
  129. self.ctype = ContextType.JOIN_GROUP
  130. member_list = wework_msg['data']['member_list']
  131. self.actual_user_nickname = member_list[0]['name']
  132. self.actual_user_id = member_list[0]['user_id']
  133. self.content = f"{self.actual_user_nickname}加入了群聊!"
  134. directory = os.path.join(os.getcwd(), "tmp")
  135. rooms = get_with_retry(wework.get_rooms)
  136. if not rooms:
  137. logger.error("更新群信息失败···")
  138. else:
  139. result = {}
  140. for room in rooms['room_list']:
  141. # 获取聊天室ID
  142. room_wxid = room['conversation_id']
  143. # 获取聊天室成员
  144. room_members = wework.get_room_members(room_wxid)
  145. # 将聊天室成员保存到结果字典中
  146. result[room_wxid] = room_members
  147. with open(os.path.join(directory, 'wework_room_members.json'), 'w', encoding='utf-8') as f:
  148. json.dump(result, f, ensure_ascii=False, indent=4)
  149. logger.info("有新成员加入,已自动更新群成员列表缓存!")
  150. else:
  151. raise NotImplementedError(
  152. "Unsupported message type: Type:{} MsgType:{}".format(wework_msg["type"], wework_msg["MsgType"]))
  153. data = wework_msg['data']
  154. login_info = self.wework.get_login_info()
  155. logger.debug(f"login_info: {login_info}")
  156. nickname = f"{login_info['username']}({login_info['nickname']})" if login_info['nickname'] else login_info['username']
  157. user_id = login_info['user_id']
  158. sender_id = data.get('sender')
  159. conversation_id = data.get('conversation_id')
  160. sender_name = data.get("sender_name")
  161. self.from_user_id = user_id if sender_id == user_id else conversation_id
  162. self.from_user_nickname = nickname if sender_id == user_id else sender_name
  163. self.to_user_id = user_id
  164. self.to_user_nickname = nickname
  165. self.other_user_nickname = sender_name
  166. self.other_user_id = conversation_id
  167. if self.is_group:
  168. conversation_id = data.get('conversation_id') or data.get('room_conversation_id')
  169. self.other_user_id = conversation_id
  170. if conversation_id:
  171. room_info = get_room_info(wework=wework, conversation_id=conversation_id)
  172. self.other_user_nickname = room_info.get('nickname', None) if room_info else None
  173. self.from_user_nickname = room_info.get('nickname', None) if room_info else None
  174. at_list = data.get('at_list', [])
  175. tmp_list = []
  176. for at in at_list:
  177. tmp_list.append(at['nickname'])
  178. at_list = tmp_list
  179. logger.debug(f"at_list: {at_list}")
  180. logger.debug(f"nickname: {nickname}")
  181. self.is_at = False
  182. if nickname in at_list or login_info['nickname'] in at_list or login_info['username'] in at_list:
  183. self.is_at = True
  184. self.at_list = at_list
  185. # 检查消息内容是否包含@用户名。处理复制粘贴的消息,这类消息可能不会触发@通知,但内容中可能包含 "@用户名"。
  186. content = data.get('content', '')
  187. name = nickname
  188. pattern = f"@{re.escape(name)}(\u2005|\u0020)"
  189. if re.search(pattern, content):
  190. logger.debug(f"Wechaty message {self.msg_id} includes at")
  191. self.is_at = True
  192. if not self.actual_user_id:
  193. self.actual_user_id = data.get("sender")
  194. self.actual_user_nickname = sender_name if self.ctype != ContextType.JOIN_GROUP else self.actual_user_nickname
  195. else:
  196. logger.error("群聊消息中没有找到 conversation_id 或 room_conversation_id")
  197. logger.debug(f"WeworkMessage has been successfully instantiated with message id: {self.msg_id}")
  198. except Exception as e:
  199. logger.error(f"在 WeworkMessage 的初始化过程中出现错误:{e}")
  200. raise e