You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

414 lines
14KB

  1. import io
  2. import os
  3. import uuid
  4. import requests
  5. from urllib.parse import urlparse
  6. from PIL import Image
  7. from common.log import logger
  8. import oss2,time,json
  9. from urllib.parse import urlparse, unquote
  10. from voice.ali.ali_voice import AliVoice
  11. from voice import audio_convert
  12. import cv2
  13. import os
  14. import tempfile
  15. from moviepy import VideoFileClip
  16. from common import redis_helper
  17. from datetime import datetime
  18. def fsize(file):
  19. if isinstance(file, io.BytesIO):
  20. return file.getbuffer().nbytes
  21. elif isinstance(file, str):
  22. return os.path.getsize(file)
  23. elif hasattr(file, "seek") and hasattr(file, "tell"):
  24. pos = file.tell()
  25. file.seek(0, os.SEEK_END)
  26. size = file.tell()
  27. file.seek(pos)
  28. return size
  29. else:
  30. raise TypeError("Unsupported type")
  31. def compress_imgfile(file, max_size):
  32. if fsize(file) <= max_size:
  33. return file
  34. file.seek(0)
  35. img = Image.open(file)
  36. rgb_image = img.convert("RGB")
  37. quality = 95
  38. while True:
  39. out_buf = io.BytesIO()
  40. rgb_image.save(out_buf, "JPEG", quality=quality)
  41. if fsize(out_buf) <= max_size:
  42. return out_buf
  43. quality -= 5
  44. def split_string_by_utf8_length(string, max_length, max_split=0):
  45. encoded = string.encode("utf-8")
  46. start, end = 0, 0
  47. result = []
  48. while end < len(encoded):
  49. if max_split > 0 and len(result) >= max_split:
  50. result.append(encoded[start:].decode("utf-8"))
  51. break
  52. end = min(start + max_length, len(encoded))
  53. # 如果当前字节不是 UTF-8 编码的开始字节,则向前查找直到找到开始字节为止
  54. while end < len(encoded) and (encoded[end] & 0b11000000) == 0b10000000:
  55. end -= 1
  56. result.append(encoded[start:end].decode("utf-8"))
  57. start = end
  58. return result
  59. def get_path_suffix(path):
  60. path = urlparse(path).path
  61. return os.path.splitext(path)[-1].lstrip('.')
  62. def convert_webp_to_png(webp_image):
  63. from PIL import Image
  64. try:
  65. webp_image.seek(0)
  66. img = Image.open(webp_image).convert("RGBA")
  67. png_image = io.BytesIO()
  68. img.save(png_image, format="PNG")
  69. png_image.seek(0)
  70. return png_image
  71. except Exception as e:
  72. logger.error(f"Failed to convert WEBP to PNG: {e}")
  73. raise
  74. def generate_timestamp():
  75. # 获取当前时间
  76. now = datetime.now()
  77. # 格式化时间字符串为 'yyyyMMddHHmmssSS'
  78. timestamp = now.strftime('%Y%m%d%H%M%S%f')[:-4]
  79. return timestamp
  80. def at_extract_content(text):
  81. # 找到最后一个空格的索引
  82. last_space_index = text.rfind(" ")
  83. if last_space_index != -1:
  84. # 返回空格后面的内容
  85. return text[last_space_index + 1:]
  86. return ""
  87. def audio_extract_content(text):
  88. result = text.split('\n', 1)[1]
  89. return result
  90. def save_to_local_from_url(url):
  91. '''
  92. 从url保存到本地tmp目录
  93. '''
  94. parsed_url = urlparse(url)
  95. # 从 URL 提取文件名
  96. filename = os.path.basename(parsed_url.path)
  97. # tmp_dir = os.path(__file__) # 获取系统临时目录
  98. # print(tmp_dir)
  99. tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径
  100. # 检查是否存在同名文件
  101. if os.path.exists(tmp_file_path):
  102. logger.info(f"文件已存在,将覆盖:{tmp_file_path}")
  103. # 下载文件并保存到临时目录
  104. response = requests.get(url, stream=True)
  105. with open(tmp_file_path, 'wb') as f:
  106. for chunk in response.iter_content(chunk_size=1024):
  107. if chunk: # 检查是否有内容
  108. f.write(chunk)
  109. return tmp_file_path
  110. def upload_oss(
  111. access_key_id,
  112. access_key_secret,
  113. endpoint,
  114. bucket_name,
  115. file_source,
  116. prefix,
  117. expiration_days=7
  118. ):
  119. """
  120. 上传文件到阿里云OSS并设置生命周期规则,同时返回文件的公共访问地址。
  121. :param access_key_id: 阿里云AccessKey ID
  122. :param access_key_secret: 阿里云AccessKey Secret
  123. :param endpoint: OSS区域对应的Endpoint
  124. :param bucket_name: OSS中的Bucket名称
  125. :param file_source: 本地文件路径或HTTP链接
  126. :param prefix: 设置规则应用的前缀为文件所在目录
  127. :param expiration_days: 文件保存天数,默认7天后删除
  128. :return: 文件的公共访问地址
  129. """
  130. # 创建Bucket实例
  131. auth = oss2.Auth(access_key_id, access_key_secret)
  132. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  133. ### 1. 设置生命周期规则 ###
  134. rule_id = f'delete_after_{expiration_days}_days' # 规则ID
  135. # prefix = oss_file_name.split('/')[0] + '/' # 设置规则应用的前缀为文件所在目录
  136. # 定义生命周期规则
  137. rule = oss2.models.LifecycleRule(rule_id, prefix, status=oss2.models.LifecycleRule.ENABLED,
  138. expiration=oss2.models.LifecycleExpiration(days=expiration_days))
  139. # 设置Bucket的生命周期
  140. lifecycle = oss2.models.BucketLifecycle([rule])
  141. bucket.put_bucket_lifecycle(lifecycle)
  142. print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")
  143. ### 2. 判断文件来源并上传到OSS ###
  144. if file_source.startswith('http://') or file_source.startswith('https://'):
  145. # HTTP 链接,先下载文件
  146. try:
  147. response = requests.get(file_source, stream=True)
  148. response.raise_for_status()
  149. parsed_url = urlparse(file_source)
  150. # 提取路径部分并解码
  151. path = unquote(parsed_url.path)
  152. # 获取路径的最后一部分作为文件名
  153. filename = path.split('/')[-1]
  154. oss_file_name=prefix+'/'+ filename
  155. bucket.put_object(oss_file_name, response.content)
  156. print(f"文件从 HTTP 链接上传成功:{file_source}")
  157. except requests.exceptions.RequestException as e:
  158. print(f"从 HTTP 链接下载文件失败: {e}")
  159. return None
  160. else:
  161. # 本地文件路径
  162. try:
  163. filename=os.path.basename(file_source)
  164. oss_file_name=prefix+'/'+ filename
  165. bucket.put_object_from_file(oss_file_name, file_source)
  166. print(f"文件从本地路径上传成功:{file_source}")
  167. except oss2.exceptions.OssError as e:
  168. print(f"从本地路径上传文件失败: {e}")
  169. return None
  170. ### 3. 构建公共访问URL ###
  171. file_url = f"http://{bucket_name}.{endpoint.replace('http://', '')}/{oss_file_name}"
  172. print(f"文件上传成功,公共访问地址:{file_url}")
  173. return file_url
  174. def generate_guid_no_dashes():
  175. """
  176. 生成一个无分隔符的 GUID
  177. :return: 返回生成的无分隔符 GUID 字符串
  178. """
  179. return str(uuid.uuid4()).replace('-', '')
  180. def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list,is_ai:bool=False):
  181. """
  182. 构造消息的 JSON 数据
  183. :param contents: list,包含多个消息内容,每个内容为字典,如:
  184. [{"type": "text", "text": "AAAAAAA"},
  185. {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}},
  186. {"type":"file","file_url":{"url":"https://AAAAA.pdf"}}
  187. ]
  188. :return: JSON 字符串
  189. """
  190. # 获取当前时间戳,精确到毫秒
  191. current_timestamp = int(time.time() * 1000)
  192. # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
  193. current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  194. # 构造 JSON 数据
  195. data = {
  196. "message_id": str(current_timestamp),
  197. "topic": "topic.ai.ops.wx",
  198. "time": current_time,
  199. "data": {
  200. "msg_type": "dialogue",
  201. "is_ai":is_ai,
  202. "content": {
  203. "wxid_from": wxid_from,
  204. "wxid_to": wxid_to,
  205. "wx_content":wx_content
  206. }
  207. }
  208. }
  209. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  210. def kafka_base_message(content: dict)->dict:
  211. # 获取当前时间戳,精确到毫秒
  212. current_timestamp = int(time.time() * 1000)
  213. # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
  214. current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  215. # 构造 JSON 数据
  216. data = {
  217. "message_id": str(current_timestamp),
  218. "topic": "topic.ai.ops.wx",
  219. "time": current_time,
  220. "data": {
  221. "msg_type": "login-qrcode",
  222. "content": content
  223. }
  224. }
  225. return data
  226. def login_qrcode_message(token_id: str,agent_tel:str,qr_code_img_base64:str,qr_code_url:list)->str:
  227. """
  228. 构造消息的 JSON 数据
  229. :param contents: list,包含多个消息内容,每个内容为字典,如:
  230. {
  231. "tel":"18029274615",
  232. "token_id":"f828cb3c-1039-489f-b9ae-7494d1778a15",
  233. "qr_code_urls":["url1","url2","url3","url4",],
  234. "qr_code_img_base64":"aaaaaaaaaaaaaa"
  235. }
  236. :return: JSON 字符串
  237. """
  238. content = {
  239. "tel":agent_tel,
  240. "token_id":token_id,
  241. "qr_code_urls":qr_code_url,
  242. "qr_code_img_base64":qr_code_img_base64
  243. }
  244. data=kafka_base_message(content)
  245. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  246. def wx_voice(text: str):
  247. try:
  248. # 将文本转换为语音
  249. reply_text_voice = AliVoice().textToVoice(text)
  250. reply_text_voice_path = os.path.join(os.getcwd(), reply_text_voice)
  251. # 转换为 Silk 格式
  252. reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk"
  253. reply_silk_during = audio_convert.any_to_sil(reply_text_voice_path, reply_silk_path)
  254. # OSS 配置(建议将凭证存储在安全的地方)
  255. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  256. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  257. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  258. oss_bucket_name="cow-agent"
  259. oss_prefix="cow"
  260. # 上传文件到 OSS
  261. file_path = reply_silk_path
  262. file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix)
  263. # 删除临时文件
  264. try:
  265. os.remove(reply_text_voice_path)
  266. except FileNotFoundError:
  267. pass # 如果文件未找到,跳过删除
  268. try:
  269. os.remove(reply_silk_path)
  270. except FileNotFoundError:
  271. pass # 如果文件未找到,跳过删除
  272. return int(reply_silk_during), file_url
  273. except Exception as e:
  274. print(f"发生错误:{e}")
  275. return None, None # 发生错误时返回 None
  276. def get_login_info_by_wxid(wxid: str) ->dict:
  277. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  278. cursor = 0
  279. while True:
  280. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  281. # 批量获取所有键的 hash 数据
  282. for k in login_keys:
  283. r = redis_helper.redis_helper.get_hash(k)
  284. if r.get("wxid") == wxid:
  285. return k,r
  286. # 如果游标为 0,则表示扫描完成
  287. if cursor == 0:
  288. break
  289. return None,None
  290. def download_video_and_get_thumbnail(url, thumbnail_path):
  291. """
  292. 从指定URL下载MP4视频,提取首帧作为缩略图,并返回缩略图路径及视频时长。
  293. 参数:
  294. url (str): 视频的URL地址。
  295. thumbnail_path (str): 缩略图的保存路径。
  296. 返回:
  297. tuple: (缩略图路径, 视频时长(秒))
  298. 异常:
  299. 可能抛出requests.exceptions.RequestException,cv2.error,IOError等异常。
  300. """
  301. # 创建临时目录以下载视频
  302. with tempfile.TemporaryDirectory() as tmp_dir:
  303. # 下载视频到临时文件
  304. video_path = os.path.join(tmp_dir, 'temp_video.mp4')
  305. response = requests.get(url, stream=True)
  306. response.raise_for_status() # 确保请求成功
  307. with open(video_path, 'wb') as f:
  308. for chunk in response.iter_content(chunk_size=8192):
  309. if chunk: # 过滤掉保持连接的空白块
  310. f.write(chunk)
  311. # 提取视频首帧作为缩略图
  312. vidcap = cv2.VideoCapture(video_path)
  313. success, image = vidcap.read()
  314. vidcap.release()
  315. if not success:
  316. raise RuntimeError("无法读取视频的首帧,请检查视频文件是否有效。")
  317. # 确保缩略图的目录存在
  318. thumbnail_dir = os.path.dirname(thumbnail_path)
  319. if thumbnail_dir:
  320. os.makedirs(thumbnail_dir, exist_ok=True)
  321. # 保存缩略图
  322. cv2.imwrite(thumbnail_path, image)
  323. # 使用moviepy计算视频时长
  324. clip = VideoFileClip(video_path)
  325. duration = clip.duration
  326. clip.close()
  327. # OSS 配置(建议将凭证存储在安全的地方)
  328. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  329. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  330. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  331. oss_bucket_name="cow-agent"
  332. oss_prefix="cow"
  333. # 上传文件到 OSS
  334. file_path = thumbnail_path
  335. file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix)
  336. # 删除临时文件
  337. try:
  338. os.remove(thumbnail_path)
  339. except FileNotFoundError:
  340. pass # 如果文件未找到,跳过删除
  341. return file_url, duration