Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

451 lines
16KB

  1. import io
  2. import os
  3. import uuid
  4. import requests
  5. from urllib.parse import urlparse
  6. from PIL import Image
  7. from common.log import logger
  8. import oss2,time,json
  9. from urllib.parse import urlparse, unquote
  10. from voice.ali.ali_voice import AliVoice
  11. from voice import audio_convert
  12. import aiohttp,aiofiles
  13. import cv2,re
  14. import os
  15. import tempfile
  16. from moviepy.editor import VideoFileClip
  17. from datetime import datetime
  18. def clean_json_string(json_str):
  19. # 删除所有控制字符(非打印字符),包括换行符、回车符等
  20. return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
  21. def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list,is_ai:bool=False):
  22. """
  23. 构造消息的 JSON 数据
  24. :param contents: list,包含多个消息内容,每个内容为字典,如:
  25. [{"type": "text", "text": "AAAAAAA"},
  26. {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}},
  27. {"type":"file","file_url":{"url":"https://AAAAA.pdf"}}
  28. ]
  29. :return: JSON 字符串
  30. """
  31. # 获取当前时间戳,精确到毫秒
  32. current_timestamp = int(time.time() * 1000)
  33. # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
  34. current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  35. # 构造 JSON 数据
  36. data = {
  37. "message_id": str(current_timestamp),
  38. "topic": "topic.ai.ops.wx",
  39. "time": current_time,
  40. "data": {
  41. "msg_type": "dialogue",
  42. "is_ai":is_ai,
  43. "content": {
  44. "wxid_from": wxid_from,
  45. "wxid_to": wxid_to,
  46. "wx_content":wx_content
  47. }
  48. }
  49. }
  50. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  51. def kafka_base_message(msg_type:str,content: dict|list)->dict:
  52. """
  53. 构造消息的 JSON 数据
  54. :param wxid: 微信ID
  55. :param data: 一个包含了所有联系人的数据,格式为list,
  56. 每个元素为字典,包含wxid、alias、remark、sex、city、province、country,
  57. headimgurl、signature、skey、uin、nickname这10个字段
  58. :return: JSON 字符串
  59. """
  60. # 获取当前时间戳,精确到毫秒
  61. current_timestamp = int(time.time() * 1000)
  62. # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
  63. current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  64. # 构造 JSON 数据
  65. data = {
  66. "message_id": str(current_timestamp),
  67. "topic": "topic.ai.ops.wx",
  68. "time": current_time,
  69. "data": {
  70. #"msg_type": "login-qrcode",
  71. "msg_type": msg_type,
  72. "content": content
  73. }
  74. }
  75. return data
  76. def wx_offline_message(appid:str,wxid:str)->str:
  77. content = {"appid": appid,"wxid":wxid}
  78. data=kafka_base_message("wx-offline",content)
  79. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  80. def wx_del_contact_message(wxid:str,contact_wixd:str)->str:
  81. content = {"wxid": wxid,"contact_wixd":contact_wixd}
  82. data=kafka_base_message("del-contact",content)
  83. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  84. def wx_mod_contact_message(wxid:str,contact_data:dict)->str:
  85. content = {"wxid": wxid,"contact_data":contact_data}
  86. data=kafka_base_message("mod-contact",content)
  87. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  88. def wx_all_contacts_message(wxid:str,data:dict|list)->str:
  89. content = {"wxid": wxid,"contacts_data":data}
  90. data=kafka_base_message("all-contacts",content)
  91. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  92. def wx_groups_info_members_message(wxid:str,data:dict|list)->str:
  93. content = {"wxid": wxid,"groups_info":data}
  94. data=kafka_base_message("all-groups",content)
  95. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  96. def wx_mod_group_info_members_message(wxid:str,data:dict|list)->str:
  97. content = {"wxid": wxid,"group_info":data}
  98. data=kafka_base_message("mod-group",content)
  99. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  100. def wx_del_group_message(wxid:str,chatroom_id:str)->str:
  101. content = {"wxid": wxid,"chatroom_id":chatroom_id}
  102. data=kafka_base_message("del-group",content)
  103. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  104. def login_qrcode_message(token_id: str,agent_tel:str,qr_code_img_base64:str,qr_code_url:list)->str:
  105. """
  106. 构造消息的 JSON 数据
  107. :param contents: list,包含多个消息内容,每个内容为字典,如:
  108. {
  109. "tel":"18029274615",
  110. "token_id":"f828cb3c-1039-489f-b9ae-7494d1778a15",
  111. "qr_code_urls":["url1","url2","url3","url4",],
  112. "qr_code_img_base64":"aaaaaaaaaaaaaa"
  113. }
  114. :return: JSON 字符串
  115. """
  116. content = {
  117. "tel":agent_tel,
  118. "token_id":token_id,
  119. "qr_code_urls":qr_code_url,
  120. "qr_code_img_base64":qr_code_img_base64
  121. }
  122. data=kafka_base_message("login-qrcode",content)
  123. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  124. def login_result_message(token_id: str,agent_tel:str,region_id:str,agent_token_id:str,wxid:str)->str:
  125. content = {
  126. "tel":agent_tel,
  127. "token_id":token_id,
  128. "region_id":region_id,
  129. "agent_token_id":agent_token_id,
  130. "wxid":wxid
  131. }
  132. data=kafka_base_message("login-result",content)
  133. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  134. def wx_voice(text: str):
  135. try:
  136. # 将文本转换为语音
  137. reply_text_voice = AliVoice().textToVoice(text)
  138. reply_text_voice_path = os.path.join(os.getcwd(), reply_text_voice)
  139. # 转换为 Silk 格式
  140. reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk"
  141. reply_silk_during = audio_convert.any_to_sil(reply_text_voice_path, reply_silk_path)
  142. # OSS 配置(建议将凭证存储在安全的地方)
  143. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  144. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  145. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  146. oss_bucket_name="cow-agent"
  147. oss_prefix="cow"
  148. # 上传文件到 OSS
  149. file_path = reply_silk_path
  150. file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix)
  151. # 删除临时文件
  152. try:
  153. os.remove(reply_text_voice_path)
  154. except FileNotFoundError:
  155. pass # 如果文件未找到,跳过删除
  156. try:
  157. os.remove(reply_silk_path)
  158. except FileNotFoundError:
  159. pass # 如果文件未找到,跳过删除
  160. return int(reply_silk_during), file_url
  161. except Exception as e:
  162. print(f"发生错误:{e}")
  163. return None, None # 发生错误时返回 None
  164. def upload_oss(
  165. access_key_id,
  166. access_key_secret,
  167. endpoint,
  168. bucket_name,
  169. file_source,
  170. prefix,
  171. expiration_days=7
  172. ):
  173. """
  174. 上传文件到阿里云OSS并设置生命周期规则,同时返回文件的公共访问地址。
  175. :param access_key_id: 阿里云AccessKey ID
  176. :param access_key_secret: 阿里云AccessKey Secret
  177. :param endpoint: OSS区域对应的Endpoint
  178. :param bucket_name: OSS中的Bucket名称
  179. :param file_source: 本地文件路径或HTTP链接
  180. :param prefix: 设置规则应用的前缀为文件所在目录
  181. :param expiration_days: 文件保存天数,默认7天后删除
  182. :return: 文件的公共访问地址
  183. """
  184. # 创建Bucket实例
  185. auth = oss2.Auth(access_key_id, access_key_secret)
  186. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  187. ### 1. 设置生命周期规则 ###
  188. rule_id = f'delete_after_{expiration_days}_days' # 规则ID
  189. # prefix = oss_file_name.split('/')[0] + '/' # 设置规则应用的前缀为文件所在目录
  190. # 定义生命周期规则
  191. rule = oss2.models.LifecycleRule(rule_id, prefix, status=oss2.models.LifecycleRule.ENABLED,
  192. expiration=oss2.models.LifecycleExpiration(days=expiration_days))
  193. # 设置Bucket的生命周期
  194. lifecycle = oss2.models.BucketLifecycle([rule])
  195. bucket.put_bucket_lifecycle(lifecycle)
  196. print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")
  197. ### 2. 判断文件来源并上传到OSS ###
  198. if file_source.startswith('http://') or file_source.startswith('https://'):
  199. # HTTP 链接,先下载文件
  200. try:
  201. response = requests.get(file_source, stream=True)
  202. response.raise_for_status()
  203. parsed_url = urlparse(file_source)
  204. # 提取路径部分并解码
  205. path = unquote(parsed_url.path)
  206. # 获取路径的最后一部分作为文件名
  207. filename = path.split('/')[-1]
  208. oss_file_name=prefix+'/'+ filename
  209. bucket.put_object(oss_file_name, response.content)
  210. print(f"文件从 HTTP 链接上传成功:{file_source}")
  211. except requests.exceptions.RequestException as e:
  212. print(f"从 HTTP 链接下载文件失败: {e}")
  213. return None
  214. else:
  215. # 本地文件路径
  216. try:
  217. filename=os.path.basename(file_source)
  218. oss_file_name=prefix+'/'+ filename
  219. bucket.put_object_from_file(oss_file_name, file_source)
  220. print(f"文件从本地路径上传成功:{file_source}")
  221. except oss2.exceptions.OssError as e:
  222. print(f"从本地路径上传文件失败: {e}")
  223. return None
  224. ### 3. 构建公共访问URL ###
  225. file_url = f"http://{bucket_name}.{endpoint.replace('http://', '')}/{oss_file_name}"
  226. print(f"文件上传成功,公共访问地址:{file_url}")
  227. return file_url
  228. def download_video_and_get_thumbnail(url, thumbnail_path):
  229. """
  230. 从指定URL下载MP4视频,提取首帧作为缩略图,并返回缩略图路径及视频时长。
  231. 参数:
  232. url (str): 视频的URL地址。
  233. thumbnail_path (str): 缩略图的保存路径。
  234. 返回:
  235. tuple: (缩略图路径, 视频时长(秒))
  236. 异常:
  237. 可能抛出requests.exceptions.RequestException,cv2.error,IOError等异常。
  238. """
  239. logger.info("处理视频开始")
  240. # 创建临时目录以下载视频
  241. with tempfile.TemporaryDirectory() as tmp_dir:
  242. # 下载视频到临时文件
  243. video_path = os.path.join(tmp_dir, 'temp_video.mp4')
  244. response = requests.get(url, stream=True)
  245. response.raise_for_status() # 确保请求成功
  246. with open(video_path, 'wb') as f:
  247. for chunk in response.iter_content(chunk_size=8192):
  248. if chunk: # 过滤掉保持连接的空白块
  249. f.write(chunk)
  250. # 提取视频首帧作为缩略图
  251. vidcap = cv2.VideoCapture(video_path)
  252. success, image = vidcap.read()
  253. vidcap.release()
  254. if not success:
  255. raise RuntimeError("无法读取视频的首帧,请检查视频文件是否有效。")
  256. # 确保缩略图的目录存在
  257. thumbnail_dir = os.path.dirname(thumbnail_path)
  258. if thumbnail_dir:
  259. os.makedirs(thumbnail_dir, exist_ok=True)
  260. # 保存缩略图
  261. cv2.imwrite(thumbnail_path, image)
  262. # 使用moviepy计算视频时长
  263. clip = VideoFileClip(video_path)
  264. duration = clip.duration
  265. clip.close()
  266. logger.info("处理视频完成")
  267. # OSS 配置(建议将凭证存储在安全的地方)
  268. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  269. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  270. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  271. oss_bucket_name="cow-agent"
  272. oss_prefix="cow"
  273. # 上传文件到 OSS
  274. file_path = thumbnail_path
  275. file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix)
  276. logger.info("上传缩略图")
  277. # 删除临时文件
  278. try:
  279. os.remove(thumbnail_path)
  280. except FileNotFoundError:
  281. pass # 如果文件未找到,跳过删除
  282. return file_url, duration
  283. def contains_url(text):
  284. # 定义检测网址的正则表达式
  285. url_pattern = re.compile(
  286. r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  287. )
  288. # 检查字符串是否包含网址
  289. return bool(url_pattern.search(text))
  290. def get_first_char_if_digit(s):
  291. if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
  292. return int(s[0]) # 返回数字形式
  293. return None # 如果不是数字则返回 None
  294. def remove_at_mention_regex(text):
  295. # 使用正则表达式去掉“在群聊中@了你”
  296. return re.sub(r"在群聊中@了你", "", text)
  297. def extract_nickname(text)->str:
  298. if "在群聊中@了你" in text:
  299. # 如果包含 "在群聊中@了你",提取其前面的名字
  300. match = re.search(r"^(.*?)在群聊中@了你", text)
  301. if match:
  302. return match.group(1).strip()
  303. elif ": @" in text:
  304. # 如果包含 ": @",提取其前面的名字
  305. return text.split(": @")[0].strip()
  306. return ''
  307. def check_chatroom(userName):
  308. pattern = r'^\d+@chatroom$'
  309. if re.match(pattern, userName):
  310. return True
  311. return False
  312. def remove_markdown_symbol(text: str):
  313. # 移除markdown格式,目前先移除**
  314. if not text or not isinstance(text, str):
  315. return text
  316. # 去除加粗、斜体等格式
  317. #text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # 去除加粗
  318. text=re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  319. text = re.sub(r'\*([^*]+)\*', r'\1', text) # 去除斜体
  320. text = re.sub(r'__([^_]+)__', r'\1', text) # 去除加粗(下划线)
  321. text = re.sub(r'_(.*?)_', r'\1', text) # 去除斜体(下划线)
  322. # 去除行内代码块
  323. text = re.sub(r'`([^`]+)`', r'\1', text)
  324. # 去除换行符\n,或者多余的空格
  325. #text = re.sub(r'\n+', ' ', text)
  326. # 去除列表编号等
  327. #text = re.sub(r'^\d+\.\s*', '', text, flags=re.MULTILINE)
  328. #text = re.sub('[\\\`\*\_\[\]\#\+\-\!\>]', '', text)
  329. text = re.sub('[\\\`\*\_\[\]\#\+\!\>]', '', text)
  330. print(text)
  331. return text
  332. async def save_to_local_from_url_async(url):
  333. '''
  334. 从url保存到本地tmp目录
  335. '''
  336. parsed_url = urlparse(url)
  337. # 从 URL 提取文件名
  338. filename = os.path.basename(parsed_url.path)
  339. # 拼接完整路径
  340. tmp_file_path = os.path.join(os.getcwd(), 'tmp', filename)
  341. # 检查是否存在同名文件
  342. if os.path.exists(tmp_file_path):
  343. logger.info(f"文件已存在,将覆盖:{tmp_file_path}")
  344. # 异步下载文件并保存到临时目录
  345. async with aiohttp.ClientSession() as session:
  346. async with session.get(url) as response:
  347. if response.status == 200:
  348. async with aiofiles.open(tmp_file_path, 'wb') as f:
  349. async for chunk in response.content.iter_chunked(1024):
  350. await f.write(chunk)
  351. else:
  352. logger.error(f"无法下载文件,HTTP状态码:{response.status}")
  353. return None
  354. return tmp_file_path
  355. def extract_and_replace_image_url(text):
  356. # 正则表达式匹配图片地址(png、jpg、jpeg)
  357. pattern = r'https?://\S+\.(?:png|jpg|jpeg)'
  358. # 查找匹配的图片地址
  359. match = re.search(pattern, text)
  360. if match:
  361. image_url = match.group() # 获取图片地址
  362. updated_text = text.replace(image_url, "如下图") # 替换图片地址
  363. return image_url, updated_text
  364. else:
  365. return None, text # 没有匹配到图片时,返回原文本
  366. def extract_and_replace_image_urls(text):
  367. # 正则表达式匹配所有图片地址(png、jpg、jpeg)
  368. pattern = r'https?://\S+\.(?:png|jpg|jpeg)'
  369. # 查找所有匹配的图片地址
  370. image_urls = re.findall(pattern, text)
  371. # 用 "如下图" 替换所有匹配的图片地址
  372. updated_text = re.sub(pattern, "如下图", text)
  373. return image_urls, updated_text