Procházet zdrojové kódy

支持wework企业微信机器人

master
chenzhenkun před 1 rokem
rodič
revize
7c6ed9944e
8 změnil soubory, kde provedl 524 přidání a 5 odebrání
  1. +4
    -0
      bridge/context.py
  2. +4
    -0
      channel/channel_factory.py
  3. +5
    -1
      channel/chat_channel.py
  4. +3
    -1
      channel/chat_message.py
  5. +17
    -0
      channel/wework/run.py
  6. +306
    -0
      channel/wework/wework_channel.py
  7. +180
    -0
      channel/wework/wework_message.py
  8. +5
    -3
      config.py

+ 4
- 0
bridge/context.py Zobrazit soubor

@@ -7,9 +7,13 @@ class ContextType(Enum):
TEXT = 1 # 文本消息
VOICE = 2 # 音频消息
IMAGE = 3 # 图片消息
FILE = 4 # 文件信息
VIDEO = 5 # 视频信息

IMAGE_CREATE = 10 # 创建图片命令
JOIN_GROUP = 20 # 加入群聊
PATPAT = 21 # 拍了拍
FUNCTION = 22 # 函数调用

def __str__(self):
return self.name


+ 4
- 0
channel/channel_factory.py Zobrazit soubor

@@ -33,4 +33,8 @@ def create_channel(channel_type):
from channel.wechatcom.wechatcomapp_channel import WechatComAppChannel

return WechatComAppChannel()
elif channel_type == "wework":
from channel.wework.wework_channel import WeworkChannel

return WeworkChannel()
raise RuntimeError

+ 5
- 1
channel/chat_channel.py Zobrazit soubor

@@ -109,6 +109,9 @@ class ChatChannel(Channel):
flag = True
pattern = f"@{re.escape(self.name)}(\u2005|\u0020)"
subtract_res = re.sub(pattern, r"", content)
for at in context["msg"].at_list:
pattern = f"@{re.escape(at)}(\u2005|\u0020)"
subtract_res = re.sub(pattern, r"", subtract_res)
if subtract_res == content and context["msg"].self_display_name:
# 前缀移除后没有变化,使用群昵称再次移除
pattern = f"@{re.escape(context['msg'].self_display_name)}(\u2005|\u0020)"
@@ -197,7 +200,8 @@ class ChatChannel(Channel):
reply = self._generate_reply(new_context)
else:
return
elif context.type == ContextType.IMAGE: # 图片消息,当前无默认逻辑
elif context.type == ContextType.IMAGE or context.type == ContextType.FUNCTION \
or context.type == ContextType.FILE: # 图片/文件消息及函数调用等,当前无默认逻辑
pass
else:
logger.error("[WX] unknown context type: {}".format(context.type))


+ 3
- 1
channel/chat_message.py Zobrazit soubor

@@ -53,6 +53,7 @@ class ChatMessage(object):
is_at = False
actual_user_id = None
actual_user_nickname = None
at_list = None

_prepare_fn = None
_prepared = False
@@ -67,7 +68,7 @@ class ChatMessage(object):
self._prepare_fn()

def __str__(self):
return "ChatMessage: id={}, create_time={}, ctype={}, content={}, from_user_id={}, from_user_nickname={}, to_user_id={}, to_user_nickname={}, other_user_id={}, other_user_nickname={}, is_group={}, is_at={}, actual_user_id={}, actual_user_nickname={}".format(
return "ChatMessage: id={}, create_time={}, ctype={}, content={}, from_user_id={}, from_user_nickname={}, to_user_id={}, to_user_nickname={}, other_user_id={}, other_user_nickname={}, is_group={}, is_at={}, actual_user_id={}, actual_user_nickname={}, at_list={}".format(
self.msg_id,
self.create_time,
self.ctype,
@@ -82,4 +83,5 @@ class ChatMessage(object):
self.is_at,
self.actual_user_id,
self.actual_user_nickname,
self.at_list
)

+ 17
- 0
channel/wework/run.py Zobrazit soubor

@@ -0,0 +1,17 @@
import os
import time
os.environ['ntwork_LOG'] = "ERROR"
import ntwork

wework = ntwork.WeWork()


def forever():
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
ntwork.exit_()
os._exit(0)



+ 306
- 0
channel/wework/wework_channel.py Zobrazit soubor

@@ -0,0 +1,306 @@
import io
import os
import random
import tempfile
import threading
os.environ['ntwork_LOG'] = "ERROR"
import ntwork
import requests
import uuid

from bridge.context import *
from bridge.reply import *
from channel.chat_channel import ChatChannel
from channel.wework.wework_message import *
from channel.wework.wework_message import WeworkMessage
from common.singleton import singleton
from common.log import logger
from common.time_check import time_checker
from config import conf
from channel.wework.run import wework
from channel.wework import run
from PIL import Image


def get_wxid_by_name(room_members, group_wxid, name):
if group_wxid in room_members:
for member in room_members[group_wxid]['member_list']:
if member['room_nickname'] == name or member['username'] == name:
return member['user_id']
return None # 如果没有找到对应的group_wxid或name,则返回None


def download_and_compress_image(url, filename, quality=30):
# 确定保存图片的目录
directory = os.path.join(os.getcwd(), "tmp")
# 如果目录不存在,则创建目录
if not os.path.exists(directory):
os.makedirs(directory)

# 下载图片
response = requests.get(url)
image = Image.open(io.BytesIO(response.content))

# 压缩图片
image_path = os.path.join(directory, f"{filename}.jpg")
image.save(image_path, "JPEG", quality=quality)

return image_path


def download_video(url, filename):
# 确定保存视频的目录
directory = os.path.join(os.getcwd(), "tmp")
# 如果目录不存在,则创建目录
if not os.path.exists(directory):
os.makedirs(directory)

# 下载视频
response = requests.get(url, stream=True)
total_size = 0

video_path = os.path.join(directory, f"{filename}.mp4")

with open(video_path, 'wb') as f:
for block in response.iter_content(1024):
total_size += len(block)

# 如果视频的总大小超过30MB (30 * 1024 * 1024 bytes),则停止下载并返回
if total_size > 30 * 1024 * 1024:
logger.info("[WX] Video is larger than 30MB, skipping...")
return None

f.write(block)

return video_path


def create_message(wework_instance, message, is_group):
logger.debug(f"正在为{'群聊' if is_group else '单聊'}创建 WeworkMessage")
cmsg = WeworkMessage(message, wework=wework_instance, is_group=is_group)
logger.debug(f"cmsg:{cmsg}")
return cmsg


def handle_message(cmsg, is_group):
logger.debug(f"准备用 WeworkChannel 处理{'群聊' if is_group else '单聊'}消息")
if is_group:
WeworkChannel().handle_group(cmsg)
else:
WeworkChannel().handle_single(cmsg)
logger.debug(f"已用 WeworkChannel 处理完{'群聊' if is_group else '单聊'}消息")


def _check(func):
def wrapper(self, cmsg: ChatMessage):
msgId = cmsg.msg_id
create_time = cmsg.create_time # 消息时间戳
if create_time is None:
return func(self, cmsg)
if int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息
logger.debug("[WX]history message {} skipped".format(msgId))
return
return func(self, cmsg)

return wrapper


@wework.msg_register(
[ntwork.MT_RECV_TEXT_MSG, ntwork.MT_RECV_IMAGE_MSG, 11072, ntwork.MT_RECV_VOICE_MSG])
def all_msg_handler(wework_instance: ntwork.WeWork, message):
logger.debug(f"收到消息: {message}")
if 'data' in message:
# 首先查找conversation_id,如果没有找到,则查找room_conversation_id
conversation_id = message['data'].get('conversation_id', message['data'].get('room_conversation_id'))
if conversation_id is not None:
is_group = "R:" in conversation_id
try:
cmsg = create_message(wework_instance=wework_instance, message=message, is_group=is_group)
except NotImplementedError as e:
logger.error(f"[WX]{message.get('MsgId', 'unknown')} 跳过: {e}")
return None
delay = random.randint(1, 2)
timer = threading.Timer(delay, handle_message, args=(cmsg, is_group))
timer.start()
else:
logger.debug("消息数据中无 conversation_id")
return None
return None


def accept_friend_with_retries(wework_instance, user_id, corp_id):
result = wework_instance.accept_friend(user_id, corp_id)
logger.debug(f'result:{result}')


# @wework.msg_register(ntwork.MT_RECV_FRIEND_MSG)
# def friend(wework_instance: ntwork.WeWork, message):
# data = message["data"]
# user_id = data["user_id"]
# corp_id = data["corp_id"]
# logger.info(f"接收到好友请求,消息内容:{data}")
# delay = random.randint(1, 180)
# threading.Timer(delay, accept_friend_with_retries, args=(wework_instance, user_id, corp_id)).start()
#
# return None


def get_with_retry(get_func, max_retries=5, delay=5):
retries = 0
result = None
while retries < max_retries:
result = get_func()
if result:
break
logger.warning(f"获取数据失败,重试第{retries + 1}次······")
retries += 1
time.sleep(delay) # 等待一段时间后重试
return result


@singleton
class WeworkChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = []

def __init__(self):
super().__init__()

def startup(self):
smart = conf().get("wework_smart", True)
wework.open(smart)
logger.info("等待登录······")
wework.wait_login()
login_info = wework.get_login_info()
self.user_id = login_info['user_id']
self.name = login_info['nickname']
logger.info(f"登录信息:>>>user_id:{self.user_id}>>>>>>>>name:{self.name}")
logger.info("静默延迟60s,等待客户端刷新数据,请勿进行任何操作······")
time.sleep(60)
contacts = get_with_retry(wework.get_external_contacts)
rooms = get_with_retry(wework.get_rooms)
directory = os.path.join(os.getcwd(), "tmp")
if not contacts or not rooms:
logger.error("获取contacts或rooms失败,程序退出")
ntwork.exit_()
os.exit(0)
if not os.path.exists(directory):
os.makedirs(directory)
# 将contacts保存到json文件中
with open(os.path.join(directory, 'wework_contacts.json'), 'w', encoding='utf-8') as f:
json.dump(contacts, f, ensure_ascii=False, indent=4)
with open(os.path.join(directory, 'wework_rooms.json'), 'w', encoding='utf-8') as f:
json.dump(rooms, f, ensure_ascii=False, indent=4)
# 创建一个空字典来保存结果
result = {}

# 遍历列表中的每个字典
for room in rooms['room_list']:
# 获取聊天室ID
room_wxid = room['conversation_id']

# 获取聊天室成员
room_members = wework.get_room_members(room_wxid)

# 将聊天室成员保存到结果字典中
result[room_wxid] = room_members

# 将结果保存到json文件中
with open(os.path.join(directory, 'wework_room_members.json'), 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=4)
logger.info("wework程序初始化完成········")
run.forever()

@time_checker
@_check
def handle_single(self, cmsg: ChatMessage):
if cmsg.ctype == ContextType.VOICE:
if not conf().get("speech_recognition"):
return
logger.debug("[WX]receive voice msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.IMAGE:
logger.debug("[WX]receive image msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.PATPAT:
logger.debug("[WX]receive patpat msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.TEXT:
logger.debug("[WX]receive text msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
else:
logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg))
context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg)
if context:
self.produce(context)

@time_checker
@_check
def handle_group(self, cmsg: ChatMessage):
if cmsg.ctype == ContextType.VOICE:
if not conf().get("speech_recognition"):
return
logger.debug("[WX]receive voice for group msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.IMAGE:
logger.debug("[WX]receive image for group msg: {}".format(cmsg.content))
elif cmsg.ctype in [ContextType.JOIN_GROUP, ContextType.PATPAT]:
logger.debug("[WX]receive note msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.TEXT:
pass
else:
logger.debug("[WX]receive group msg: {}".format(cmsg.content))
context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=True, msg=cmsg)
if context:
self.produce(context)

# 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息
def send(self, reply: Reply, context: Context):
logger.debug(f"context: {context}")
receiver = context["receiver"]
actual_user_id = context["msg"].actual_user_id
if reply.type == ReplyType.TEXT or reply.type == ReplyType.TEXT_:
match = re.search(r"^@(.*?)\n", reply.content)
logger.debug(f"match: {match}")
if match:
new_content = re.sub(r"^@(.*?)\n", "\n", reply.content)
at_list = [actual_user_id]
logger.debug(f"new_content: {new_content}")
wework.send_room_at_msg(receiver, new_content, at_list)
else:
wework.send_text(receiver, reply.content)
logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
wework.send_text(receiver, reply.content)
logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
elif reply.type == ReplyType.IMAGE: # 从文件读取图片
image_storage = reply.content
image_storage.seek(0)
# Read data from image_storage
data = image_storage.read()
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False) as temp:
temp_path = temp.name
temp.write(data)
# Send the image
wework.send_image(receiver, temp_path)
logger.info("[WX] sendImage, receiver={}".format(receiver))
# Remove the temporary file
os.remove(temp_path)
elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
img_url = reply.content
filename = str(uuid.uuid4())

# 调用你的函数,下载图片并保存为本地文件
image_path = download_and_compress_image(img_url, filename)

wework.send_image(receiver, file_path=image_path)
logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))
elif reply.type == ReplyType.VIDEO_URL:
video_url = reply.content
filename = str(uuid.uuid4())
video_path = download_video(video_url, filename)

if video_path is None:
# 如果视频太大,下载可能会被跳过,此时 video_path 将为 None
wework.send_text(receiver, "抱歉,视频太大了!!!")
else:
wework.send_video(receiver, video_path)
logger.info("[WX] sendVideo, receiver={}".format(receiver))
elif reply.type == ReplyType.VOICE:
wework.send_file(receiver, reply.content)
logger.info("[WX] sendFile={}, receiver={}".format(reply.content, receiver))

+ 180
- 0
channel/wework/wework_message.py Zobrazit soubor

@@ -0,0 +1,180 @@
import datetime
import json
import os
import re
import time
import pilk

from bridge.context import ContextType
from channel.chat_message import ChatMessage
from common.log import logger


def get_with_retry(get_func, max_retries=5, delay=5):
retries = 0
result = None
while retries < max_retries:
result = get_func()
if result:
break
logger.warning(f"获取数据失败,重试第{retries + 1}次······")
retries += 1
time.sleep(delay) # 等待一段时间后重试
return result


def get_room_info(wework, conversation_id):
logger.debug(f"传入的 conversation_id: {conversation_id}")
rooms = wework.get_rooms()
if not rooms or 'room_list' not in rooms:
logger.error(f"获取群聊信息失败: {rooms}")
return None
time.sleep(1)
logger.debug(f"获取到的群聊信息: {rooms}")
for room in rooms['room_list']:
if room['conversation_id'] == conversation_id:
return room
return None


def cdn_download(wework, message, file_name):
data = message["data"]
url = data["cdn"]["url"]
auth_key = data["cdn"]["auth_key"]
aes_key = data["cdn"]["aes_key"]
file_size = data["cdn"]["size"]

# 获取当前工作目录,然后与文件名拼接得到保存路径
current_dir = os.getcwd()
save_path = os.path.join(current_dir, "tmp", file_name)

result = wework.wx_cdn_download(url, auth_key, aes_key, file_size, save_path)
logger.debug(result)


def c2c_download_and_convert(wework, message, file_name):
data = message["data"]
aes_key = data["cdn"]["aes_key"]
file_size = data["cdn"]["size"]
file_type = 5
file_id = data["cdn"]["file_id"]

current_dir = os.getcwd()
save_path = os.path.join(current_dir, "tmp", file_name)
result = wework.c2c_cdn_download(file_id, aes_key, file_size, file_type, save_path)
logger.debug(result)

# 在下载完SILK文件之后,立即将其转换为WAV文件
base_name, _ = os.path.splitext(save_path)
wav_file = base_name + ".wav"
pilk.silk_to_wav(save_path, wav_file, rate=24000)


class WeworkMessage(ChatMessage):
def __init__(self, wework_msg, wework, is_group=False):
try:
super().__init__(wework_msg)
self.msg_id = wework_msg['data'].get('conversation_id', wework_msg['data'].get('room_conversation_id'))
# 使用.get()防止 'send_time' 键不存在时抛出错误
self.create_time = wework_msg['data'].get("send_time")
self.is_group = is_group
self.wework = wework

if wework_msg["type"] == 11041: # 文本消息类型
if any(substring in wework_msg['data']['content'] for substring in ("该消息类型暂不能展示", "不支持的消息类型")):
return
self.ctype = ContextType.TEXT
self.content = wework_msg['data']['content']
elif wework_msg["type"] == 11044: # 语音消息类型,需要缓存文件
file_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ".silk"
base_name, _ = os.path.splitext(file_name)
file_name_2 = base_name + ".wav"
current_dir = os.getcwd()
self.ctype = ContextType.VOICE
self.content = os.path.join(current_dir, "tmp", file_name_2)
self._prepare_fn = lambda: c2c_download_and_convert(wework, wework_msg, file_name)
elif wework_msg["type"] == 11042: # 图片消息类型,需要下载文件
file_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ".jpg"
current_dir = os.getcwd()
self.ctype = ContextType.IMAGE
self.content = os.path.join(current_dir, "tmp", file_name)
self._prepare_fn = lambda: cdn_download(wework, wework_msg, file_name)
elif wework_msg["type"] == 11072: # 新成员入群通知
self.ctype = ContextType.JOIN_GROUP
member_list = wework_msg['data']['member_list']
self.actual_user_nickname = member_list[0]['name']
self.actual_user_id = member_list[0]['user_id']
self.content = f"{self.actual_user_nickname}加入了群聊!"
directory = os.path.join(os.getcwd(), "tmp")
rooms = get_with_retry(wework.get_rooms)
if not rooms:
logger.error("更新群信息失败···")
else:
result = {}
for room in rooms['room_list']:
# 获取聊天室ID
room_wxid = room['conversation_id']

# 获取聊天室成员
room_members = wework.get_room_members(room_wxid)

# 将聊天室成员保存到结果字典中
result[room_wxid] = room_members
with open(os.path.join(directory, 'wework_room_members.json'), 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=4)
logger.info("有新成员加入,已自动更新群成员列表缓存!")
else:
raise NotImplementedError(
"Unsupported message type: Type:{} MsgType:{}".format(wework_msg["type"], wework_msg["MsgType"]))

data = wework_msg['data']
login_info = self.wework.get_login_info()
logger.debug(f"login_info: {login_info}")
nickname = f"{login_info['username']}({login_info['nickname']})" if login_info['nickname'] else login_info['username']
user_id = login_info['user_id']

sender_id = data.get('sender')
conversation_id = data.get('conversation_id')
sender_name = data.get("sender_name")

self.from_user_id = user_id if sender_id == user_id else conversation_id
self.from_user_nickname = nickname if sender_id == user_id else sender_name
self.to_user_id = user_id
self.to_user_nickname = nickname
self.other_user_nickname = sender_name
self.other_user_id = conversation_id

if self.is_group:
conversation_id = data.get('conversation_id') or data.get('room_conversation_id')
self.other_user_id = conversation_id
if conversation_id:
room_info = get_room_info(wework=wework, conversation_id=conversation_id)
self.other_user_nickname = room_info.get('nickname', None) if room_info else None
at_list = data.get('at_list', [])
tmp_list = []
for at in at_list:
tmp_list.append(at['nickname'])
at_list = tmp_list
logger.debug(f"at_list: {at_list}")
logger.debug(f"nickname: {nickname}")
self.is_at = nickname in at_list
self.at_list = at_list

# 检查消息内容是否包含@用户名。处理复制粘贴的消息,这类消息可能不会触发@通知,但内容中可能包含 "@用户名"。
content = data.get('content', '')
name = nickname
pattern = f"@{re.escape(name)}(\u2005|\u0020)"
if re.search(pattern, content):
logger.debug(f"Wechaty message {self.msg_id} includes at")
self.is_at = True

if not self.actual_user_id:
self.actual_user_id = data.get("sender")
self.actual_user_nickname = sender_name if self.ctype != ContextType.JOIN_GROUP else self.actual_user_nickname
else:
logger.error("群聊消息中没有找到 conversation_id 或 room_conversation_id")

logger.debug(f"WeworkMessage has been successfully instantiated with message id: {self.msg_id}")
except Exception as e:
logger.error(f"在 WeworkMessage 的初始化过程中出现错误:{e}")
raise e

+ 5
- 3
config.py Zobrazit soubor

@@ -23,8 +23,8 @@ available_setting = {
# Bot触发配置
"single_chat_prefix": ["bot", "@bot"], # 私聊时文本需要包含该前缀才能触发机器人回复
"single_chat_reply_prefix": "[bot] ", # 私聊时自动回复的前缀,用于区分真人
"single_chat_reply_suffix": "", # 私聊时自动回复的后缀,\n 可以换行
"group_chat_prefix": ["@bot"], # 群聊时包含该前缀则会触发机器人回复
"single_chat_reply_suffix": "", # 私聊时自动回复的后缀,\n 可以换行
"group_chat_prefix": ["@bot"], # 群聊时包含该前缀则会触发机器人回复
"group_chat_reply_prefix": "", # 群聊时自动回复的前缀
"group_chat_reply_suffix": "", # 群聊时自动回复的后缀,\n 可以换行
"group_chat_keyword": [], # 群聊时包含该关键词则会触发机器人回复
@@ -120,7 +120,9 @@ available_setting = {
"use_linkai": False,
"linkai_api_key": "",
"linkai_app_code": "",
"linkai_api_base": "https://api.link-ai.chat" # linkAI服务地址,若国内无法访问或延迟较高可改为 https://api.link-ai.tech
"linkai_api_base": "https://api.link-ai.chat", # linkAI服务地址,若国内无法访问或延迟较高可改为 https://api.link-ai.tech
# wework的通用配置
"wework_smart": True # 配置wework是否使用已登录的企业微信,False为多开
}




Načítá se…
Zrušit
Uložit