|
- import io
- import os
- import uuid
- import requests
- from urllib.parse import urlparse
- from PIL import Image
- from common.log import logger
- import oss2,time,json
- from urllib.parse import urlparse, unquote
- from voice.ali.ali_voice import AliVoice
- from voice import audio_convert
-
- import aiohttp,aiofiles
- import cv2,re
- import os
- import tempfile
- from moviepy.editor import VideoFileClip
-
- from datetime import datetime
-
-
- def clean_json_string(json_str):
-
- # 删除所有控制字符(非打印字符),包括换行符、回车符等
- return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
-
- def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list,is_ai:bool=False):
- """
- 构造消息的 JSON 数据
- :param contents: list,包含多个消息内容,每个内容为字典,如:
- [{"type": "text", "text": "AAAAAAA"},
- {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}},
- {"type":"file","file_url":{"url":"https://AAAAA.pdf"}}
- ]
- :return: JSON 字符串
- """
-
- # 获取当前时间戳,精确到毫秒
- current_timestamp = int(time.time() * 1000)
-
- # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
- current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
-
- # 构造 JSON 数据
- data = {
- "message_id": str(current_timestamp),
- "topic": "topic.ai.ops.wx",
- "time": current_time,
- "data": {
- "msg_type": "dialogue",
- "is_ai":is_ai,
- "content": {
- "wxid_from": wxid_from,
- "wxid_to": wxid_to,
- "wx_content":wx_content
- }
- }
- }
-
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def kafka_base_message(msg_type:str,content: dict|list)->dict:
- """
- 构造消息的 JSON 数据
- :param wxid: 微信ID
- :param data: 一个包含了所有联系人的数据,格式为list,
- 每个元素为字典,包含wxid、alias、remark、sex、city、province、country,
- headimgurl、signature、skey、uin、nickname这10个字段
- :return: JSON 字符串
- """
- # 获取当前时间戳,精确到毫秒
- current_timestamp = int(time.time() * 1000)
-
- # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
- current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
-
- # 构造 JSON 数据
- data = {
- "message_id": str(current_timestamp),
- "topic": "topic.ai.ops.wx",
- "time": current_time,
- "data": {
- #"msg_type": "login-qrcode",
- "msg_type": msg_type,
- "content": content
- }
-
- }
- return data
-
- def wx_offline_message(appid:str,wxid:str)->str:
- content = {"appid": appid,"wxid":wxid}
- data=kafka_base_message("wx-offline",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def wx_del_contact_message(wxid:str,contact_wixd:str)->str:
- content = {"wxid": wxid,"contact_wixd":contact_wixd}
- data=kafka_base_message("del-contact",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def wx_mod_contact_message(wxid:str,contact_data:dict)->str:
- content = {"wxid": wxid,"contact_data":contact_data}
- data=kafka_base_message("mod-contact",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def wx_all_contacts_message(wxid:str,data:dict|list)->str:
- content = {"wxid": wxid,"contacts_data":data}
- data=kafka_base_message("all-contacts",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def wx_groups_info_members_message(wxid:str,data:dict|list)->str:
- content = {"wxid": wxid,"groups_info":data}
- data=kafka_base_message("all-groups",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def wx_all_contacts_key_message(wxid:str)->str:
- content = {"wxid": wxid,"key":f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" }
- data=kafka_base_message("all-contacts-key",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def wx_groups_info_members_key_message(wxid:str)->str:
- content = {
- "wxid": wxid,
- "info_key" :f"__AI_OPS_WX__:GROUPS_INFO:{wxid}",
- "members_key":f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
- }
- data=kafka_base_message("all-groups-key",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def wx_mod_group_info_members_message(wxid:str,data:dict|list)->str:
- content = {"wxid": wxid,"group_info":data}
- data=kafka_base_message("mod-group",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
-
- def wx_add_contacts_from_chatroom_message(wxid:str,chatroom_id:str,contact_wixd:str,add_time:int)->str:
- content = {"wxid": wxid,"chatroomId":chatroom_id,"contactWixd":contact_wixd,"addTime":add_time}
- data=kafka_base_message("add-contacts-from-chatroom",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
-
- def wx_add_contacts_from_chatroom_task_status_message(wxid:str,chatroom_id:str,status:int):
- content = {"wxid": wxid,"chatroomId":chatroom_id,"status":status}
- data=kafka_base_message("add-contacts-from-chatroom-task-status",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def wx_del_group_message(wxid:str,chatroom_id:str)->str:
- content = {"wxid": wxid,"chatroom_id":chatroom_id}
- data=kafka_base_message("del-group",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def login_qrcode_message(token_id: str,agent_tel:str,qr_code_img_base64:str,qr_code_url:list)->str:
- """
- 构造消息的 JSON 数据
- :param contents: list,包含多个消息内容,每个内容为字典,如:
- {
- "tel":"18029274615",
- "token_id":"f828cb3c-1039-489f-b9ae-7494d1778a15",
- "qr_code_urls":["url1","url2","url3","url4",],
- "qr_code_img_base64":"aaaaaaaaaaaaaa"
- }
- :return: JSON 字符串
- """
- content = {
- "tel":agent_tel,
- "token_id":token_id,
- "qr_code_urls":qr_code_url,
- "qr_code_img_base64":qr_code_img_base64
- }
- data=kafka_base_message("login-qrcode",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def login_result_message(token_id: str,agent_tel:str,region_id:str,agent_token_id:str,wxid:str)->str:
-
- content = {
- "tel":agent_tel,
- "token_id":token_id,
- "region_id":region_id,
- "agent_token_id":agent_token_id,
- "wxid":wxid
- }
- data=kafka_base_message("login-result",content)
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def wx_voice(text: str):
- try:
- # 将文本转换为语音
- reply_text_voice = AliVoice().textToVoice(text)
- reply_text_voice_path = os.path.join(os.getcwd(), reply_text_voice)
-
- # 转换为 Silk 格式
- reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk"
- reply_silk_during = audio_convert.any_to_sil(reply_text_voice_path, reply_silk_path)
-
- # OSS 配置(建议将凭证存储在安全的地方)
- oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
- oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
- oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
- oss_bucket_name="cow-agent"
- oss_prefix="cow"
-
- # 上传文件到 OSS
- file_path = reply_silk_path
- file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix)
-
- # 删除临时文件
- try:
- os.remove(reply_text_voice_path)
- except FileNotFoundError:
- pass # 如果文件未找到,跳过删除
- try:
- os.remove(reply_silk_path)
- except FileNotFoundError:
- pass # 如果文件未找到,跳过删除
-
- return int(reply_silk_during), file_url
- except Exception as e:
- print(f"发生错误:{e}")
- return None, None # 发生错误时返回 None
-
- def wx_img_url_to_oss_url(img_url: str)->str:
- try:
- # OSS 配置(建议将凭证存储在安全的地方)
- oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
- oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
- oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
- oss_bucket_name="cow-agent"
- oss_prefix="cow"
- file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, img_url, oss_prefix)
- return file_url
- except Exception as e:
- print(f"发生错误:{e}")
- return None # 发生错误时返回 None
-
- def upload_oss(
- access_key_id,
- access_key_secret,
- endpoint,
- bucket_name,
- file_source,
- prefix,
- expiration_days=7
- ):
- """
- 上传文件到阿里云OSS并设置生命周期规则,同时返回文件的公共访问地址。
-
- :param access_key_id: 阿里云AccessKey ID
- :param access_key_secret: 阿里云AccessKey Secret
- :param endpoint: OSS区域对应的Endpoint
- :param bucket_name: OSS中的Bucket名称
- :param file_source: 本地文件路径或HTTP链接
- :param prefix: 设置规则应用的前缀为文件所在目录
- :param expiration_days: 文件保存天数,默认7天后删除
- :return: 文件的公共访问地址
- """
-
- # 创建Bucket实例
- auth = oss2.Auth(access_key_id, access_key_secret)
- bucket = oss2.Bucket(auth, endpoint, bucket_name)
-
- ### 1. 设置生命周期规则 ###
- rule_id = f'delete_after_{expiration_days}_days' # 规则ID
- # prefix = oss_file_name.split('/')[0] + '/' # 设置规则应用的前缀为文件所在目录
-
-
- # 定义生命周期规则
- rule = oss2.models.LifecycleRule(rule_id, prefix, status=oss2.models.LifecycleRule.ENABLED,
- expiration=oss2.models.LifecycleExpiration(days=expiration_days))
-
- # 设置Bucket的生命周期
- # lifecycle = oss2.models.BucketLifecycle([rule])
- # bucket.put_bucket_lifecycle(lifecycle)
-
- # print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")
-
- ### 2. 判断文件来源并上传到OSS ###
- if file_source.startswith('http://') or file_source.startswith('https://'):
- # HTTP 链接,先下载文件
- try:
- response = requests.get(file_source, stream=True)
- response.raise_for_status()
- parsed_url = urlparse(file_source)
- # 提取路径部分并解码
- path = unquote(parsed_url.path)
- # 获取路径的最后一部分作为文件名
- filename = path.split('/')[-1]
- oss_file_name=prefix+'/'+ filename
- bucket.put_object(oss_file_name, response.content)
- print(f"文件从 HTTP 链接上传成功:{file_source}")
- except requests.exceptions.RequestException as e:
- print(f"从 HTTP 链接下载文件失败: {e}")
- return None
- else:
- # 本地文件路径
- try:
- filename=os.path.basename(file_source)
- oss_file_name=prefix+'/'+ filename
- bucket.put_object_from_file(oss_file_name, file_source)
- print(f"文件从本地路径上传成功:{file_source}")
- except oss2.exceptions.OssError as e:
- print(f"从本地路径上传文件失败: {e}")
- return None
-
- ### 3. 构建公共访问URL ###
- file_url = f"http://{bucket_name}.{endpoint.replace('http://', '')}/{oss_file_name}"
-
- print(f"文件上传成功,公共访问地址:{file_url}")
-
- return file_url
-
-
- def download_video_and_get_thumbnail(url, thumbnail_path):
- """
- 从指定URL下载MP4视频,提取首帧作为缩略图,并返回缩略图路径及视频时长。
-
- 参数:
- url (str): 视频的URL地址。
- thumbnail_path (str): 缩略图的保存路径。
-
- 返回:
- tuple: (缩略图路径, 视频时长(秒))
-
- 异常:
- 可能抛出requests.exceptions.RequestException,cv2.error,IOError等异常。
- """
- logger.info("处理视频开始")
- # 创建临时目录以下载视频
- with tempfile.TemporaryDirectory() as tmp_dir:
- # 下载视频到临时文件
- video_path = os.path.join(tmp_dir, 'temp_video.mp4')
- response = requests.get(url, stream=True)
- response.raise_for_status() # 确保请求成功
-
- with open(video_path, 'wb') as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk: # 过滤掉保持连接的空白块
- f.write(chunk)
-
- # 提取视频首帧作为缩略图
- vidcap = cv2.VideoCapture(video_path)
- success, image = vidcap.read()
- vidcap.release()
- if not success:
- raise RuntimeError("无法读取视频的首帧,请检查视频文件是否有效。")
-
- # 确保缩略图的目录存在
- thumbnail_dir = os.path.dirname(thumbnail_path)
- if thumbnail_dir:
- os.makedirs(thumbnail_dir, exist_ok=True)
-
- # 保存缩略图
- cv2.imwrite(thumbnail_path, image)
-
- # 使用moviepy计算视频时长
- clip = VideoFileClip(video_path)
- duration = clip.duration
- clip.close()
- logger.info("处理视频完成")
- # OSS 配置(建议将凭证存储在安全的地方)
- oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
- oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
- oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
- oss_bucket_name="cow-agent"
- oss_prefix="cow"
-
- # 上传文件到 OSS
- file_path = thumbnail_path
- file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix)
- logger.info("上传缩略图")
- # 删除临时文件
- try:
- os.remove(thumbnail_path)
- except FileNotFoundError:
- pass # 如果文件未找到,跳过删除
-
- return file_url, duration
-
- def contains_url(text):
- # 定义检测网址的正则表达式
- url_pattern = re.compile(
- r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
- )
- # 检查字符串是否包含网址
- return bool(url_pattern.search(text))
-
- def get_first_char_if_digit(s):
- if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
- return int(s[0]) # 返回数字形式
- return None # 如果不是数字则返回 None
-
- def remove_at_mention_regex(text):
- # 使用正则表达式去掉“在群聊中@了你”
- return re.sub(r"在群聊中@了你", "", text)
-
- def extract_nickname(text)->str:
- if "在群聊中@了你" in text:
- # 如果包含 "在群聊中@了你",提取其前面的名字
- match = re.search(r"^(.*?)在群聊中@了你", text)
- if match:
- return match.group(1).strip()
- elif ": @" in text:
- # 如果包含 ": @",提取其前面的名字
- return text.split(": @")[0].strip()
- return ''
-
- def check_chatroom(userName):
- pattern = r'^\d+@chatroom$'
- if re.match(pattern, userName):
- return True
- return False
-
- # def remove_markdown_symbol(text: str):
- # # 移除markdown格式,目前先移除**
- # if not text or not isinstance(text, str):
- # return text
- # # 去除加粗、斜体等格式
- # #text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # 去除加粗
- # text=re.sub(r'\*\*(.*?)\*\*', r'\1', text)
- # text = re.sub(r'\*([^*]+)\*', r'\1', text) # 去除斜体
- # text = re.sub(r'__([^_]+)__', r'\1', text) # 去除加粗(下划线)
- # text = re.sub(r'_(.*?)_', r'\1', text) # 去除斜体(下划线)
-
- # # 去除行内代码块
- # text = re.sub(r'`([^`]+)`', r'\1', text)
-
- # # 去除换行符\n,或者多余的空格
- # #text = re.sub(r'\n+', ' ', text)
-
- # # 去除列表编号等
- # #text = re.sub(r'^\d+\.\s*', '', text, flags=re.MULTILINE)
-
-
- # #text = re.sub('[\\\`\*\_\[\]\#\+\-\!\>]', '', text)
- # text = re.sub('[\\\`\*\_\[\]\#\+\!\>]', '', text)
- # print(text)
- # return text
-
- # def remove_markdown_symbol(text: str):
- # if not text or not isinstance(text, str):
- # return text
-
- # # 去除加粗、斜体等格式
- # text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) # 去除加粗
- # text = re.sub(r'\*([^*]+)\*', r'\1', text) # 去除斜体
- # text = re.sub(r'__([^_]+)__', r'\1', text) # 去除加粗(下划线)
- # text = re.sub(r'_(.*?)_', r'\1', text) # 去除斜体(下划线)
-
- # # 去除行内代码块
- # text = re.sub(r'`([^`]+)`', r'\1', text)
-
- # # 去除其他 Markdown 符号
- # text = re.sub('[\\\`\*\_\[\]\#\+\!\>]', '', text)
-
- # return text
-
- # def remove_markdown_symbol(text: str):
- # # 去除标题
- # text = re.sub(r'#+\s*', '', text)
-
- # # 去除粗体和斜体
- # text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text)
- # text = re.sub(r'__([^_]+)__', r'\1', text)
- # text = re.sub(r'\*([^*]+)\*', r'\1', text)
- # text = re.sub(r'_([^_]+)_', r'\1', text)
-
- # # 保留链接地址
- # text = re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', r'\2', text)
-
- # # 保留图片地址
- # text = re.sub(r'!\[([^\]]+)\]\(([^\)]+)\)', r'\2', text)
-
- # # 去除列表
- # text = re.sub(r'^\s*[\*\+\-]\s+', '', text, flags=re.MULTILINE)
- # text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
-
- # # 去除代码块和内联代码
- # text = re.sub(r'```[^`]*```', '', text, flags=re.DOTALL)
- # text = re.sub(r'`([^`]+)`', r'\1', text)
-
- # # 去除引用
- # text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE)
-
- # # 去除水平线
- # text = re.sub(r'^\s*[-*_]{3,}\s*$', '', text, flags=re.MULTILINE)
-
- # return text.strip()
-
- def remove_markdown_symbol(text: str)->str:
- url_placeholders = []
-
- def url_replacer(url):
- url_placeholders.append(url)
- return f"[[URL{len(url_placeholders)-1}]]"
-
- # 先处理 Markdown 图片语法: 
- text = re.sub(r'!\[[^\]]*?\]\((https?://[^\s)]+)\)', lambda m: url_replacer(m.group(1)), text)
-
- # 处理 Markdown 链接语法: [text](url)
- text = re.sub(r'\[[^\]]*?\]\((https?://[^\s)]+)\)', lambda m: url_replacer(m.group(1)), text)
-
- # 再处理裸链接
- text = re.sub(r'https?://[^\s)]+', lambda m: url_replacer(m.group(0)), text)
-
- # 去除标题
- text = re.sub(r'^\s*#+\s*', '', text, flags=re.MULTILINE)
-
- # 去除粗体、斜体(现在不会破坏 URL 中的 _ 了)
- text = re.sub(r'\*\*([^\*]+)\*\*', r'\1', text)
- text = re.sub(r'__([^_]+)__', r'\1', text)
- text = re.sub(r'\*([^\*]+)\*', r'\1', text)
- text = re.sub(r'_([^_]+)_', r'\1', text)
-
- # 去除列表
- text = re.sub(r'^\s*[\*\+\-]\s+', '', text, flags=re.MULTILINE)
- text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
-
- # 去除代码块和内联代码
- text = re.sub(r'```.*?```', '', text, flags=re.DOTALL)
- text = re.sub(r'`([^`]+)`', r'\1', text)
-
- # 去除引用
- text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE)
-
- # 去除水平线
- text = re.sub(r'^\s*[-*_]{3,}\s*$', '', text, flags=re.MULTILINE)
-
- # 恢复 URL
- for i, url in enumerate(url_placeholders):
- text = text.replace(f"[[URL{i}]]", url)
-
- return text.strip()
-
-
- async def save_to_local_from_url_async(url):
- '''
- 从url保存到本地tmp目录
- '''
- parsed_url = urlparse(url)
- # 从 URL 提取文件名
- filename = os.path.basename(parsed_url.path)
- # 拼接完整路径
- tmp_file_path = os.path.join(os.getcwd(), 'tmp', filename)
-
- # 检查是否存在同名文件
- if os.path.exists(tmp_file_path):
- logger.info(f"文件已存在,将覆盖:{tmp_file_path}")
-
- # 异步下载文件并保存到临时目录
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as response:
- if response.status == 200:
- async with aiofiles.open(tmp_file_path, 'wb') as f:
- async for chunk in response.content.iter_chunked(1024):
- await f.write(chunk)
- else:
- logger.error(f"无法下载文件,HTTP状态码:{response.status}")
- return None
-
- return tmp_file_path
-
-
- def extract_and_replace_image_url(text):
- # 正则表达式匹配图片地址(png、jpg、jpeg)
- pattern = r'https?://\S+\.(?:png|jpg|jpeg)'
-
- # 查找匹配的图片地址
- match = re.search(pattern, text)
-
- if match:
- image_url = match.group() # 获取图片地址
- updated_text = text.replace(image_url, "如下图") # 替换图片地址
- return image_url, updated_text
- else:
- return None, text # 没有匹配到图片时,返回原文本
-
- def extract_and_replace_image_urls(text):
- # 正则表达式匹配所有图片地址(png、jpg、jpeg)
- pattern = r'https?://\S+\.(?:png|jpg|jpeg)'
-
- # 查找所有匹配的图片地址
- image_urls = re.findall(pattern, text)
-
- # 用 "如下图" 替换所有匹配的图片地址
- updated_text = re.sub(pattern, "如下图", text)
-
- return image_urls, updated_text
-
- def extract_and_replace_video_urls(text):
-
- pattern = r'https?://\S+\.(?:mp4)'
- # 使用正则表达式提取所有视频链接
- video_links = re.findall(pattern, text)
-
- # 将包含视频链接的部分替换为 "如下视频"
- updated_text = re.sub(pattern, '如下视频', text)
-
- return video_links, updated_text
-
-
- def replace_placeholders(reply_content, replacements:dict)->str:
- for placeholder, value in replacements.items():
- reply_content:str = reply_content.replace(placeholder, value)
- return reply_content
|