Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

603 lignes
22KB

  1. import io
  2. import os
  3. import uuid
  4. import requests
  5. from urllib.parse import urlparse
  6. from PIL import Image
  7. from common.log import logger
  8. import oss2,time,json
  9. from urllib.parse import urlparse, unquote
  10. from voice.ali.ali_voice import AliVoice
  11. from voice import audio_convert
  12. import aiohttp,aiofiles
  13. import cv2,re
  14. import os
  15. import tempfile
  16. from moviepy.editor import VideoFileClip
  17. from datetime import datetime
  18. def clean_json_string(json_str):
  19. # 删除所有控制字符(非打印字符),包括换行符、回车符等
  20. return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
  21. def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list,is_ai:bool=False):
  22. """
  23. 构造消息的 JSON 数据
  24. :param contents: list,包含多个消息内容,每个内容为字典,如:
  25. [{"type": "text", "text": "AAAAAAA"},
  26. {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}},
  27. {"type":"file","file_url":{"url":"https://AAAAA.pdf"}}
  28. ]
  29. :return: JSON 字符串
  30. """
  31. # 获取当前时间戳,精确到毫秒
  32. current_timestamp = int(time.time() * 1000)
  33. # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
  34. current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  35. # 构造 JSON 数据
  36. data = {
  37. "message_id": str(current_timestamp),
  38. "topic": "topic.ai.ops.wx",
  39. "time": current_time,
  40. "data": {
  41. "msg_type": "dialogue",
  42. "is_ai":is_ai,
  43. "content": {
  44. "wxid_from": wxid_from,
  45. "wxid_to": wxid_to,
  46. "wx_content":wx_content
  47. }
  48. }
  49. }
  50. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  51. def kafka_base_message(msg_type:str,content: dict|list)->dict:
  52. """
  53. 构造消息的 JSON 数据
  54. :param wxid: 微信ID
  55. :param data: 一个包含了所有联系人的数据,格式为list,
  56. 每个元素为字典,包含wxid、alias、remark、sex、city、province、country,
  57. headimgurl、signature、skey、uin、nickname这10个字段
  58. :return: JSON 字符串
  59. """
  60. # 获取当前时间戳,精确到毫秒
  61. current_timestamp = int(time.time() * 1000)
  62. # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
  63. current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  64. # 构造 JSON 数据
  65. data = {
  66. "message_id": str(current_timestamp),
  67. "topic": "topic.ai.ops.wx",
  68. "time": current_time,
  69. "data": {
  70. #"msg_type": "login-qrcode",
  71. "msg_type": msg_type,
  72. "content": content
  73. }
  74. }
  75. return data
  76. def wx_offline_message(appid:str,wxid:str)->str:
  77. content = {"appid": appid,"wxid":wxid}
  78. data=kafka_base_message("wx-offline",content)
  79. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  80. def wx_del_contact_message(wxid:str,contact_wixd:str)->str:
  81. content = {"wxid": wxid,"contact_wixd":contact_wixd}
  82. data=kafka_base_message("del-contact",content)
  83. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  84. def wx_mod_contact_message(wxid:str,contact_data:dict)->str:
  85. content = {"wxid": wxid,"contact_data":contact_data}
  86. data=kafka_base_message("mod-contact",content)
  87. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  88. def wx_all_contacts_message(wxid:str,data:dict|list)->str:
  89. content = {"wxid": wxid,"contacts_data":data}
  90. data=kafka_base_message("all-contacts",content)
  91. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  92. def wx_groups_info_members_message(wxid:str,data:dict|list)->str:
  93. content = {"wxid": wxid,"groups_info":data}
  94. data=kafka_base_message("all-groups",content)
  95. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  96. def wx_all_contacts_key_message(wxid:str)->str:
  97. content = {"wxid": wxid,"key":f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" }
  98. data=kafka_base_message("all-contacts-key",content)
  99. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  100. def wx_groups_info_members_key_message(wxid:str)->str:
  101. content = {
  102. "wxid": wxid,
  103. "info_key" :f"__AI_OPS_WX__:GROUPS_INFO:{wxid}",
  104. "members_key":f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
  105. }
  106. data=kafka_base_message("all-groups-key",content)
  107. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  108. def wx_mod_group_info_members_message(wxid:str,data:dict|list)->str:
  109. content = {"wxid": wxid,"group_info":data}
  110. data=kafka_base_message("mod-group",content)
  111. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  112. def wx_add_contacts_from_chatroom_message(wxid:str,chatroom_id:str,contact_wixd:str,add_time:int)->str:
  113. content = {"wxid": wxid,"chatroomId":chatroom_id,"contactWixd":contact_wixd,"addTime":add_time}
  114. data=kafka_base_message("add-contacts-from-chatroom",content)
  115. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  116. def wx_add_contacts_from_chatroom_task_status_message(wxid:str,chatroom_id:str,status:int):
  117. content = {"wxid": wxid,"chatroomId":chatroom_id,"status":status}
  118. data=kafka_base_message("add-contacts-from-chatroom-task-status",content)
  119. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  120. def wx_del_group_message(wxid:str,chatroom_id:str)->str:
  121. content = {"wxid": wxid,"chatroom_id":chatroom_id}
  122. data=kafka_base_message("del-group",content)
  123. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  124. def login_qrcode_message(token_id: str,agent_tel:str,qr_code_img_base64:str,qr_code_url:list)->str:
  125. """
  126. 构造消息的 JSON 数据
  127. :param contents: list,包含多个消息内容,每个内容为字典,如:
  128. {
  129. "tel":"18029274615",
  130. "token_id":"f828cb3c-1039-489f-b9ae-7494d1778a15",
  131. "qr_code_urls":["url1","url2","url3","url4",],
  132. "qr_code_img_base64":"aaaaaaaaaaaaaa"
  133. }
  134. :return: JSON 字符串
  135. """
  136. content = {
  137. "tel":agent_tel,
  138. "token_id":token_id,
  139. "qr_code_urls":qr_code_url,
  140. "qr_code_img_base64":qr_code_img_base64
  141. }
  142. data=kafka_base_message("login-qrcode",content)
  143. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  144. def login_result_message(token_id: str,agent_tel:str,region_id:str,agent_token_id:str,wxid:str)->str:
  145. content = {
  146. "tel":agent_tel,
  147. "token_id":token_id,
  148. "region_id":region_id,
  149. "agent_token_id":agent_token_id,
  150. "wxid":wxid
  151. }
  152. data=kafka_base_message("login-result",content)
  153. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  154. def wx_voice(text: str):
  155. try:
  156. # 将文本转换为语音
  157. reply_text_voice = AliVoice().textToVoice(text)
  158. reply_text_voice_path = os.path.join(os.getcwd(), reply_text_voice)
  159. # 转换为 Silk 格式
  160. reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk"
  161. reply_silk_during = audio_convert.any_to_sil(reply_text_voice_path, reply_silk_path)
  162. # OSS 配置(建议将凭证存储在安全的地方)
  163. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  164. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  165. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  166. oss_bucket_name="cow-agent"
  167. oss_prefix="cow"
  168. # 上传文件到 OSS
  169. file_path = reply_silk_path
  170. file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix)
  171. # 删除临时文件
  172. try:
  173. os.remove(reply_text_voice_path)
  174. except FileNotFoundError:
  175. pass # 如果文件未找到,跳过删除
  176. try:
  177. os.remove(reply_silk_path)
  178. except FileNotFoundError:
  179. pass # 如果文件未找到,跳过删除
  180. return int(reply_silk_during), file_url
  181. except Exception as e:
  182. print(f"发生错误:{e}")
  183. return None, None # 发生错误时返回 None
  184. def wx_img_url_to_oss_url(img_url: str)->str:
  185. try:
  186. # OSS 配置(建议将凭证存储在安全的地方)
  187. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  188. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  189. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  190. oss_bucket_name="cow-agent"
  191. oss_prefix="cow"
  192. file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, img_url, oss_prefix)
  193. return file_url
  194. except Exception as e:
  195. print(f"发生错误:{e}")
  196. return None # 发生错误时返回 None
  197. def upload_oss(
  198. access_key_id,
  199. access_key_secret,
  200. endpoint,
  201. bucket_name,
  202. file_source,
  203. prefix,
  204. expiration_days=7
  205. ):
  206. """
  207. 上传文件到阿里云OSS并设置生命周期规则,同时返回文件的公共访问地址。
  208. :param access_key_id: 阿里云AccessKey ID
  209. :param access_key_secret: 阿里云AccessKey Secret
  210. :param endpoint: OSS区域对应的Endpoint
  211. :param bucket_name: OSS中的Bucket名称
  212. :param file_source: 本地文件路径或HTTP链接
  213. :param prefix: 设置规则应用的前缀为文件所在目录
  214. :param expiration_days: 文件保存天数,默认7天后删除
  215. :return: 文件的公共访问地址
  216. """
  217. # 创建Bucket实例
  218. auth = oss2.Auth(access_key_id, access_key_secret)
  219. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  220. ### 1. 设置生命周期规则 ###
  221. rule_id = f'delete_after_{expiration_days}_days' # 规则ID
  222. # prefix = oss_file_name.split('/')[0] + '/' # 设置规则应用的前缀为文件所在目录
  223. # 定义生命周期规则
  224. rule = oss2.models.LifecycleRule(rule_id, prefix, status=oss2.models.LifecycleRule.ENABLED,
  225. expiration=oss2.models.LifecycleExpiration(days=expiration_days))
  226. # 设置Bucket的生命周期
  227. # lifecycle = oss2.models.BucketLifecycle([rule])
  228. # bucket.put_bucket_lifecycle(lifecycle)
  229. # print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")
  230. ### 2. 判断文件来源并上传到OSS ###
  231. if file_source.startswith('http://') or file_source.startswith('https://'):
  232. # HTTP 链接,先下载文件
  233. try:
  234. response = requests.get(file_source, stream=True)
  235. response.raise_for_status()
  236. parsed_url = urlparse(file_source)
  237. # 提取路径部分并解码
  238. path = unquote(parsed_url.path)
  239. # 获取路径的最后一部分作为文件名
  240. filename = path.split('/')[-1]
  241. oss_file_name=prefix+'/'+ filename
  242. bucket.put_object(oss_file_name, response.content)
  243. print(f"文件从 HTTP 链接上传成功:{file_source}")
  244. except requests.exceptions.RequestException as e:
  245. print(f"从 HTTP 链接下载文件失败: {e}")
  246. return None
  247. else:
  248. # 本地文件路径
  249. try:
  250. filename=os.path.basename(file_source)
  251. oss_file_name=prefix+'/'+ filename
  252. bucket.put_object_from_file(oss_file_name, file_source)
  253. print(f"文件从本地路径上传成功:{file_source}")
  254. except oss2.exceptions.OssError as e:
  255. print(f"从本地路径上传文件失败: {e}")
  256. return None
  257. ### 3. 构建公共访问URL ###
  258. file_url = f"http://{bucket_name}.{endpoint.replace('http://', '')}/{oss_file_name}"
  259. print(f"文件上传成功,公共访问地址:{file_url}")
  260. return file_url
  261. def download_video_and_get_thumbnail(url, thumbnail_path):
  262. """
  263. 从指定URL下载MP4视频,提取首帧作为缩略图,并返回缩略图路径及视频时长。
  264. 参数:
  265. url (str): 视频的URL地址。
  266. thumbnail_path (str): 缩略图的保存路径。
  267. 返回:
  268. tuple: (缩略图路径, 视频时长(秒))
  269. 异常:
  270. 可能抛出requests.exceptions.RequestException,cv2.error,IOError等异常。
  271. """
  272. logger.info("处理视频开始")
  273. # 创建临时目录以下载视频
  274. with tempfile.TemporaryDirectory() as tmp_dir:
  275. # 下载视频到临时文件
  276. video_path = os.path.join(tmp_dir, 'temp_video.mp4')
  277. response = requests.get(url, stream=True)
  278. response.raise_for_status() # 确保请求成功
  279. with open(video_path, 'wb') as f:
  280. for chunk in response.iter_content(chunk_size=8192):
  281. if chunk: # 过滤掉保持连接的空白块
  282. f.write(chunk)
  283. # 提取视频首帧作为缩略图
  284. vidcap = cv2.VideoCapture(video_path)
  285. success, image = vidcap.read()
  286. vidcap.release()
  287. if not success:
  288. raise RuntimeError("无法读取视频的首帧,请检查视频文件是否有效。")
  289. # 确保缩略图的目录存在
  290. thumbnail_dir = os.path.dirname(thumbnail_path)
  291. if thumbnail_dir:
  292. os.makedirs(thumbnail_dir, exist_ok=True)
  293. # 保存缩略图
  294. cv2.imwrite(thumbnail_path, image)
  295. # 使用moviepy计算视频时长
  296. clip = VideoFileClip(video_path)
  297. duration = clip.duration
  298. clip.close()
  299. logger.info("处理视频完成")
  300. # OSS 配置(建议将凭证存储在安全的地方)
  301. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  302. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  303. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  304. oss_bucket_name="cow-agent"
  305. oss_prefix="cow"
  306. # 上传文件到 OSS
  307. file_path = thumbnail_path
  308. file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix)
  309. logger.info("上传缩略图")
  310. # 删除临时文件
  311. try:
  312. os.remove(thumbnail_path)
  313. except FileNotFoundError:
  314. pass # 如果文件未找到,跳过删除
  315. return file_url, duration
  316. def contains_url(text):
  317. # 定义检测网址的正则表达式
  318. url_pattern = re.compile(
  319. r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  320. )
  321. # 检查字符串是否包含网址
  322. return bool(url_pattern.search(text))
  323. def get_first_char_if_digit(s):
  324. if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
  325. return int(s[0]) # 返回数字形式
  326. return None # 如果不是数字则返回 None
  327. def remove_at_mention_regex(text):
  328. # 使用正则表达式去掉“在群聊中@了你”
  329. return re.sub(r"在群聊中@了你", "", text)
  330. def extract_nickname(text)->str:
  331. if "在群聊中@了你" in text:
  332. # 如果包含 "在群聊中@了你",提取其前面的名字
  333. match = re.search(r"^(.*?)在群聊中@了你", text)
  334. if match:
  335. return match.group(1).strip()
  336. elif ": @" in text:
  337. # 如果包含 ": @",提取其前面的名字
  338. return text.split(": @")[0].strip()
  339. return ''
  340. def check_chatroom(userName):
  341. pattern = r'^\d+@chatroom$'
  342. if re.match(pattern, userName):
  343. return True
  344. return False
  345. # def remove_markdown_symbol(text: str):
  346. # # 移除markdown格式,目前先移除**
  347. # if not text or not isinstance(text, str):
  348. # return text
  349. # # 去除加粗、斜体等格式
  350. # #text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # 去除加粗
  351. # text=re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  352. # text = re.sub(r'\*([^*]+)\*', r'\1', text) # 去除斜体
  353. # text = re.sub(r'__([^_]+)__', r'\1', text) # 去除加粗(下划线)
  354. # text = re.sub(r'_(.*?)_', r'\1', text) # 去除斜体(下划线)
  355. # # 去除行内代码块
  356. # text = re.sub(r'`([^`]+)`', r'\1', text)
  357. # # 去除换行符\n,或者多余的空格
  358. # #text = re.sub(r'\n+', ' ', text)
  359. # # 去除列表编号等
  360. # #text = re.sub(r'^\d+\.\s*', '', text, flags=re.MULTILINE)
  361. # #text = re.sub('[\\\`\*\_\[\]\#\+\-\!\>]', '', text)
  362. # text = re.sub('[\\\`\*\_\[\]\#\+\!\>]', '', text)
  363. # print(text)
  364. # return text
  365. # def remove_markdown_symbol(text: str):
  366. # if not text or not isinstance(text, str):
  367. # return text
  368. # # 去除加粗、斜体等格式
  369. # text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) # 去除加粗
  370. # text = re.sub(r'\*([^*]+)\*', r'\1', text) # 去除斜体
  371. # text = re.sub(r'__([^_]+)__', r'\1', text) # 去除加粗(下划线)
  372. # text = re.sub(r'_(.*?)_', r'\1', text) # 去除斜体(下划线)
  373. # # 去除行内代码块
  374. # text = re.sub(r'`([^`]+)`', r'\1', text)
  375. # # 去除其他 Markdown 符号
  376. # text = re.sub('[\\\`\*\_\[\]\#\+\!\>]', '', text)
  377. # return text
  378. # def remove_markdown_symbol(text: str):
  379. # # 去除标题
  380. # text = re.sub(r'#+\s*', '', text)
  381. # # 去除粗体和斜体
  382. # text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text)
  383. # text = re.sub(r'__([^_]+)__', r'\1', text)
  384. # text = re.sub(r'\*([^*]+)\*', r'\1', text)
  385. # text = re.sub(r'_([^_]+)_', r'\1', text)
  386. # # 保留链接地址
  387. # text = re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', r'\2', text)
  388. # # 保留图片地址
  389. # text = re.sub(r'!\[([^\]]+)\]\(([^\)]+)\)', r'\2', text)
  390. # # 去除列表
  391. # text = re.sub(r'^\s*[\*\+\-]\s+', '', text, flags=re.MULTILINE)
  392. # text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
  393. # # 去除代码块和内联代码
  394. # text = re.sub(r'```[^`]*```', '', text, flags=re.DOTALL)
  395. # text = re.sub(r'`([^`]+)`', r'\1', text)
  396. # # 去除引用
  397. # text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE)
  398. # # 去除水平线
  399. # text = re.sub(r'^\s*[-*_]{3,}\s*$', '', text, flags=re.MULTILINE)
  400. # return text.strip()
  401. def remove_markdown_symbol(text: str)->str:
  402. url_placeholders = []
  403. def url_replacer(url):
  404. url_placeholders.append(url)
  405. return f"[[URL{len(url_placeholders)-1}]]"
  406. # 先处理 Markdown 图片语法: ![alt](url)
  407. text = re.sub(r'!\[[^\]]*?\]\((https?://[^\s)]+)\)', lambda m: url_replacer(m.group(1)), text)
  408. # 处理 Markdown 链接语法: [text](url)
  409. text = re.sub(r'\[[^\]]*?\]\((https?://[^\s)]+)\)', lambda m: url_replacer(m.group(1)), text)
  410. # 再处理裸链接
  411. text = re.sub(r'https?://[^\s)]+', lambda m: url_replacer(m.group(0)), text)
  412. # 去除标题
  413. text = re.sub(r'^\s*#+\s*', '', text, flags=re.MULTILINE)
  414. # 去除粗体、斜体(现在不会破坏 URL 中的 _ 了)
  415. text = re.sub(r'\*\*([^\*]+)\*\*', r'\1', text)
  416. text = re.sub(r'__([^_]+)__', r'\1', text)
  417. text = re.sub(r'\*([^\*]+)\*', r'\1', text)
  418. text = re.sub(r'_([^_]+)_', r'\1', text)
  419. # 去除列表
  420. text = re.sub(r'^\s*[\*\+\-]\s+', '', text, flags=re.MULTILINE)
  421. text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
  422. # 去除代码块和内联代码
  423. text = re.sub(r'```.*?```', '', text, flags=re.DOTALL)
  424. text = re.sub(r'`([^`]+)`', r'\1', text)
  425. # 去除引用
  426. text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE)
  427. # 去除水平线
  428. text = re.sub(r'^\s*[-*_]{3,}\s*$', '', text, flags=re.MULTILINE)
  429. # 恢复 URL
  430. for i, url in enumerate(url_placeholders):
  431. text = text.replace(f"[[URL{i}]]", url)
  432. return text.strip()
  433. async def save_to_local_from_url_async(url):
  434. '''
  435. 从url保存到本地tmp目录
  436. '''
  437. parsed_url = urlparse(url)
  438. # 从 URL 提取文件名
  439. filename = os.path.basename(parsed_url.path)
  440. # 拼接完整路径
  441. tmp_file_path = os.path.join(os.getcwd(), 'tmp', filename)
  442. # 检查是否存在同名文件
  443. if os.path.exists(tmp_file_path):
  444. logger.info(f"文件已存在,将覆盖:{tmp_file_path}")
  445. # 异步下载文件并保存到临时目录
  446. async with aiohttp.ClientSession() as session:
  447. async with session.get(url) as response:
  448. if response.status == 200:
  449. async with aiofiles.open(tmp_file_path, 'wb') as f:
  450. async for chunk in response.content.iter_chunked(1024):
  451. await f.write(chunk)
  452. else:
  453. logger.error(f"无法下载文件,HTTP状态码:{response.status}")
  454. return None
  455. return tmp_file_path
  456. def extract_and_replace_image_url(text):
  457. # 正则表达式匹配图片地址(png、jpg、jpeg)
  458. pattern = r'https?://\S+\.(?:png|jpg|jpeg)'
  459. # 查找匹配的图片地址
  460. match = re.search(pattern, text)
  461. if match:
  462. image_url = match.group() # 获取图片地址
  463. updated_text = text.replace(image_url, "如下图") # 替换图片地址
  464. return image_url, updated_text
  465. else:
  466. return None, text # 没有匹配到图片时,返回原文本
  467. def extract_and_replace_image_urls(text):
  468. # 正则表达式匹配所有图片地址(png、jpg、jpeg)
  469. pattern = r'https?://\S+\.(?:png|jpg|jpeg)'
  470. # 查找所有匹配的图片地址
  471. image_urls = re.findall(pattern, text)
  472. # 用 "如下图" 替换所有匹配的图片地址
  473. updated_text = re.sub(pattern, "如下图", text)
  474. return image_urls, updated_text
  475. def extract_and_replace_video_urls(text):
  476. pattern = r'https?://\S+\.(?:mp4)'
  477. # 使用正则表达式提取所有视频链接
  478. video_links = re.findall(pattern, text)
  479. # 将包含视频链接的部分替换为 "如下视频"
  480. updated_text = re.sub(pattern, '如下视频', text)
  481. return video_links, updated_text
  482. def replace_placeholders(reply_content, replacements:dict)->str:
  483. for placeholder, value in replacements.items():
  484. reply_content:str = reply_content.replace(placeholder, value)
  485. return reply_content