|
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- import requests
- from config import conf
- from common.log import logger
- import os
-
-
- class LinkSummary:
- def __init__(self):
- pass
-
- def summary_file(self, file_path: str):
- file_body = {
- "file": open(file_path, "rb"),
- "name": file_path.split("/")[-1],
- }
- url = self.base_url() + "/v1/summary/file"
- res = requests.post(url, headers=self.headers(), files=file_body, timeout=(5, 300))
- return self._parse_summary_res(res)
-
- def summary_url(self, url: str):
- body = {
- "url": url
- }
- res = requests.post(url=self.base_url() + "/v1/summary/url", headers=self.headers(), json=body, timeout=(5, 180))
- return self._parse_summary_res(res)
-
- def summary_chat(self, summary_id: str):
- body = {
- "summary_id": summary_id
- }
- res = requests.post(url=self.base_url() + "/v1/summary/chat", headers=self.headers(), json=body, timeout=(5, 180))
- if res.status_code == 200:
- res = res.json()
- logger.debug(f"[LinkSum] chat open, res={res}")
- if res.get("code") == 200:
- data = res.get("data")
- return {
- "questions": data.get("questions"),
- "file_id": data.get("file_id")
- }
- else:
- res_json = res.json()
- logger.error(f"[LinkSum] summary error, status_code={res.status_code}, msg={res_json.get('message')}")
- return None
-
- def _parse_summary_res(self, res):
- if res.status_code == 200:
- res = res.json()
- logger.debug(f"[LinkSum] url summary, res={res}")
- if res.get("code") == 200:
- data = res.get("data")
- return {
- "summary": data.get("summary"),
- "summary_id": data.get("summary_id")
- }
- else:
- res_json = res.json()
- logger.error(f"[LinkSum] summary error, status_code={res.status_code}, msg={res_json.get('message')}")
- return None
-
- def base_url(self):
- return conf().get("linkai_api_base", "https://api.link-ai.chat")
-
- def headers(self):
- return {"Authorization": "Bearer " + conf().get("linkai_api_key")}
-
- def check_file(self, file_path: str, sum_config: dict) -> bool:
- file_size = os.path.getsize(file_path) // 1000
-
- if (sum_config.get("max_file_size") and file_size > sum_config.get("max_file_size")) or file_size > 15000:
- logger.warn(f"[LinkSum] file size exceeds limit, No processing, file_size={file_size}KB")
- return False
-
- suffix = file_path.split(".")[-1]
- support_list = ["txt", "csv", "docx", "pdf", "md", "jpg", "jpeg", "png"]
- if suffix not in support_list:
- logger.warn(f"[LinkSum] unsupported file, suffix={suffix}, support_list={support_list}")
- return False
-
- return True
-
- def check_url(self, url: str):
- if not url:
- return False
- support_list = ["http://mp.weixin.qq.com", "https://mp.weixin.qq.com"]
- black_support_list = ["https://mp.weixin.qq.com/mp/waerrpage"]
- for black_url_prefix in black_support_list:
- if url.strip().startswith(black_url_prefix):
- logger.warn(f"[LinkSum] unsupported url, no need to process, url={url}")
- return False
- for support_url in support_list:
- if url.strip().startswith(support_url):
- return True
- return False
|