import requests import json import re import plugins from bridge.reply import Reply, ReplyType from bridge.context import ContextType from channel.chat_message import ChatMessage from plugins import * from common.log import logger from common.expired_dict import ExpiredDict import os from docx import Document import markdown import fitz from openpyxl import load_workbook import csv from bs4 import BeautifulSoup from pptx import Presentation from PIL import Image import base64 import html EXTENSION_TO_TYPE = { 'pdf': 'pdf', 'doc': 'docx', 'docx': 'docx', 'md': 'md', 'txt': 'txt', 'xls': 'excel', 'xlsx': 'excel', 'csv': 'csv', 'html': 'html', 'htm': 'html', 'ppt': 'ppt', 'pptx': 'ppt' } @plugins.register( name="sum4all", desire_priority=2, desc="A plugin for summarizing all things", version="0.7.10", author="fatwang2", ) class sum4all(Plugin): def __init__(self): super().__init__() try: curdir = os.path.dirname(__file__) config_path = os.path.join(curdir, "config.json") if os.path.exists(config_path): with open(config_path, "r", encoding="utf-8") as f: self.config = json.load(f) else: # 使用父类的方法来加载配置 self.config = super().load_config() if not self.config: raise Exception("config.json not found") # 设置事件处理函数 self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context self.params_cache = ExpiredDict(300) # 从配置中提取所需的设置 self.keys = self.config.get("keys", {}) self.url_sum = self.config.get("url_sum", {}) self.search_sum = self.config.get("search_sum", {}) self.file_sum = self.config.get("file_sum", {}) self.image_sum = self.config.get("image_sum", {}) self.note = self.config.get("note", {}) self.sum4all_key = self.keys.get("sum4all_key", "") self.search1api_key = self.keys.get("search1api_key", "") self.gemini_key = self.keys.get("gemini_key", "") self.bibigpt_key = self.keys.get("bibigpt_key", "") self.outputLanguage = self.keys.get("outputLanguage", "zh-CN") self.opensum_key = self.keys.get("opensum_key", "") self.open_ai_api_key = self.keys.get("open_ai_api_key", "") self.model = self.keys.get("model", "gpt-3.5-turbo") self.open_ai_api_base = self.keys.get("open_ai_api_base", "https://api.openai.com/v1") self.xunfei_app_id = self.keys.get("xunfei_app_id", "") self.xunfei_api_key = self.keys.get("xunfei_api_key", "") self.xunfei_api_secret = self.keys.get("xunfei_api_secret", "") self.perplexity_key = self.keys.get("perplexity_key", "") self.flomo_key = self.keys.get("flomo_key", "") # 提取sum服务的配置 self.url_sum_enabled = self.url_sum.get("enabled", False) self.url_sum_service = self.url_sum.get("service", "") self.url_sum_group = self.url_sum.get("group", True) self.url_sum_qa_enabled = self.url_sum.get("qa_enabled", True) self.url_sum_qa_prefix = self.url_sum.get("qa_prefix", "问") self.url_sum_prompt = self.url_sum.get("prompt", "") self.search_sum_enabled = self.search_sum.get("enabled", False) self.search_sum_service = self.search_sum.get("service", "") self.search_service = self.search_sum.get("search_service", "duckduckgo") self.search_sum_group = self.search_sum.get("group", True) self.search_sum_search_prefix = self.search_sum.get("search_prefix", "搜") self.search_sum_prompt = self.search_sum.get("prompt", "") self.file_sum_enabled = self.file_sum.get("enabled", False) self.file_sum_service = self.file_sum.get("service", "") self.max_file_size = self.file_sum.get("max_file_size", 15000) self.file_sum_group = self.file_sum.get("group", True) self.file_sum_qa_prefix = self.file_sum.get("qa_prefix", "问") self.file_sum_prompt = self.file_sum.get("prompt", "") self.image_sum_enabled = self.image_sum.get("enabled", False) self.image_sum_service = self.image_sum.get("service", "") self.image_sum_group = self.image_sum.get("group", True) self.image_sum_qa_prefix = self.image_sum.get("qa_prefix", "问") self.image_sum_prompt = self.image_sum.get("prompt", "") self.note_enabled = self.note.get("enabled", False) self.note_service = self.note.get("service", "") self.note_prefix = self.note.get("prefix", "记") # 初始化成功日志 logger.info("[sum4all] inited.") except Exception as e: # 初始化失败日志 logger.warn(f"sum4all init failed: {e}") def on_handle_context(self, e_context: EventContext): context = e_context["context"] if context.type not in [ContextType.TEXT, ContextType.SHARING,ContextType.FILE,ContextType.IMAGE]: return msg: ChatMessage = e_context["context"]["msg"] user_id = msg.from_user_id content = context.content isgroup = e_context["context"].get("isgroup", False) url_match = re.match('https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+', content) unsupported_urls = re.search(r'.*finder\.video\.qq\.com.*|.*support\.weixin\.qq\.com/update.*|.*support\.weixin\.qq\.com/security.*|.*mp\.weixin\.qq\.com/mp/waerrpage.*', content) # 检查输入是否以"搜索前缀词" 开头 if content.startswith(self.search_sum_search_prefix) and self.search_sum_enabled: # 如果消息来自一个群聊,并且你不希望在群聊中启用搜索功能,直接返回 if isgroup and not self.search_sum_group: return # Call new function to handle search operation self.call_service(content, e_context, "search") return if user_id in self.params_cache and ('last_file_content' in self.params_cache[user_id] or 'last_image_base64' in self.params_cache[user_id] or 'last_url' in self.params_cache[user_id]): # 如果存在最近一次处理的文件路径,触发文件理解函数 if 'last_file_content' in self.params_cache[user_id] and content.startswith(self.file_sum_qa_prefix): logger.info('Content starts with the file_sum_qa_prefix.') # 去除关键词和紧随其后的空格 new_content = content[len(self.file_sum_qa_prefix):] self.params_cache[user_id]['prompt'] = new_content logger.info('params_cache for user has been successfully updated.') self.handle_file(self.params_cache[user_id]['last_file_content'], e_context) # 如果存在最近一次处理的图片路径,触发图片理解函数 elif 'last_image_base64' in self.params_cache[user_id] and content.startswith(self.image_sum_qa_prefix): logger.info('Content starts with the image_sum_qa_prefix.') # 去除关键词和紧随其后的空格 new_content = content[len(self.image_sum_qa_prefix):] self.params_cache[user_id]['prompt'] = new_content logger.info('params_cache for user has been successfully updated.') self.handle_image(self.params_cache[user_id]['last_image_base64'], e_context) # 如果存在最近一次处理的URL,触发URL理解函数 elif 'last_url' in self.params_cache[user_id] and content.startswith(self.url_sum_qa_prefix): logger.info('Content starts with the url_sum_qa_prefix.') # 去除关键词和紧随其后的空格 new_content = content[len(self.url_sum_qa_prefix):] self.params_cache[user_id]['prompt'] = new_content logger.info('params_cache for user has been successfully updated.') self.call_service(self.params_cache[user_id]['last_url'], e_context ,"sum") elif 'last_url' in self.params_cache[user_id] and content.startswith(self.note_prefix) and self.note_enabled and not isgroup: logger.info('Content starts with the note_prefix.') new_content = content[len(self.note_prefix):] self.params_cache[user_id]['note'] = new_content logger.info('params_cache for user has been successfully updated.') self.call_service(self.params_cache[user_id]['last_url'], e_context, "note") if context.type == ContextType.FILE: if isgroup and not self.file_sum_group: # 群聊中忽略处理文件 logger.info("群聊消息,文件处理功能已禁用") return logger.info("on_handle_context: 处理上下文开始") context.get("msg").prepare() file_path = context.content logger.info(f"on_handle_context: 获取到文件路径 {file_path}") # 检查是否应该进行文件总结 if self.file_sum_enabled: # 更新params_cache中的last_file_content self.params_cache[user_id] = {} file_content = self.extract_content(file_path) if file_content is None: logger.info("文件内容无法提取,跳过处理") else: self.params_cache[user_id]['last_file_content'] = file_content logger.info('Updated last_file_content in params_cache for user.') self.handle_file(file_content, e_context) else: logger.info("文件总结功能已禁用,不对文件内容进行处理") # 删除文件 os.remove(file_path) logger.info(f"文件 {file_path} 已删除") elif context.type == ContextType.IMAGE: if isgroup and not self.image_sum_group: # 群聊中忽略处理图片 logger.info("群聊消息,图片处理功能已禁用") return logger.info("on_handle_context: 开始处理图片") context.get("msg").prepare() image_path = context.content logger.info(f"on_handle_context: 获取到图片路径 {image_path}") # 检查是否应该进行图片总结 if self.image_sum_enabled: # 将图片路径转换为Base64编码的字符串 base64_image = self.encode_image_to_base64(image_path) # 更新params_cache中的last_image_path self.params_cache[user_id] = {} self.params_cache[user_id]['last_image_base64'] = base64_image logger.info('Updated last_image_base64 in params_cache for user.') self.handle_image(base64_image, e_context) else: logger.info("图片总结功能已禁用,不对图片内容进行处理") # 删除文件 os.remove(image_path) logger.info(f"文件 {image_path} 已删除") elif context.type == ContextType.SHARING and self.url_sum_enabled: #匹配卡片分享 content = html.unescape(content) if unsupported_urls: #匹配不支持总结的卡片 if isgroup: ##群聊中忽略 return else: ##私聊回复不支持 logger.info("[sum4all] Unsupported URL : %s", content) reply = Reply(type=ReplyType.TEXT, content="不支持总结小程序和视频号") e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS else: #匹配支持总结的卡片 if isgroup: #处理群聊总结 if self.url_sum_group: #group_sharing = True进行总结,False则忽略。 logger.info("[sum4all] Summary URL : %s", content) # 更新params_cache中的last_url self.params_cache[user_id] = {} self.params_cache[user_id]['last_url'] = content logger.info('Updated last_url in params_cache for user.') self.call_service(content, e_context, "sum") return else: return else: #处理私聊总结 logger.info("[sum4all] Summary URL : %s", content) # 更新params_cache中的last_url self.params_cache[user_id] = {} self.params_cache[user_id]['last_url'] = content logger.info('Updated last_url in params_cache for user.') self.call_service(content, e_context, "sum") return elif url_match and self.url_sum_enabled: #匹配URL链接 if unsupported_urls: #匹配不支持总结的网址 logger.info("[sum4all] Unsupported URL : %s", content) reply = Reply(type=ReplyType.TEXT, content="不支持总结小程序和视频号") e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS else: logger.info("[sum4all] Summary URL : %s", content) # 更新params_cache中的last_url self.params_cache[user_id] = {} self.params_cache[user_id]['last_url'] = content logger.info('Updated last_url in params_cache for user.') self.call_service(content, e_context, "sum") return def call_service(self, content, e_context, service_type): if service_type == "search": if self.search_sum_service == "openai" or self.search_sum_service == "sum4all" or self.search_sum_service == "gemini": self.handle_search(content, e_context) elif self.search_sum_service == "perplexity": self.handle_perplexity(content, e_context) elif service_type == "sum": if self.url_sum_service == "bibigpt": self.handle_bibigpt(content, e_context) elif self.url_sum_service == "openai" or self.url_sum_service == "sum4all" or self.url_sum_service == "gemini": self.handle_url(content, e_context) elif self.url_sum_service == "opensum": self.handle_opensum(content, e_context) elif service_type == "note": if self.note_service == "flomo": self.handle_note(content, e_context) def handle_note(self,link,e_context): msg: ChatMessage = e_context["context"]["msg"] user_id = msg.from_user_id title = self.params_cache[user_id].get('title', '') content = self.params_cache[user_id].get('content', '') note = self.params_cache[user_id].get('note', '') # 将这些内容按照一定的格式整合到一起 note_content = f"#sum4all\n{title}\n📒笔记:{note}\n{content}\n{link}" payload = {"content": note_content} # 将这个字典转换为JSON格式 payload_json = json.dumps(payload) # 创建一个POST请求 url = self.flomo_key headers = {'Content-Type': 'application/json'} # 发送这个POST请求 response = requests.post(url, headers=headers, data=payload_json) reply = Reply() reply.type = ReplyType.TEXT if response.status_code == 200 and response.json()['code'] == 0: reply.content = f"已发送到{self.note_service}" else: reply.content = "发送失败,错误码:" + str(response.status_code) e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS def short_url(self, long_url): url = "https://short.fatwang2.com" payload = { "url": long_url } headers = {'Content-Type': "application/json"} response = requests.request("POST", url, json=payload, headers=headers) if response.status_code == 200: res_data = response.json() # 直接从返回的 JSON 中获取短链接 short_url = res_data.get('shorturl', None) if short_url: return short_url return None def handle_url(self, content, e_context): logger.info('Handling Sum4All request...') # 根据sum_service的值选择API密钥和基础URL if self.url_sum_service == "openai": api_key = self.open_ai_api_key api_base = self.open_ai_api_base model = self.model elif self.url_sum_service == "sum4all": api_key = self.sum4all_key api_base = "https://pro.sum4all.site/v1" model = "sum4all" elif self.url_sum_service == "gemini": api_key = self.gemini_key model = "gemini" api_base = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent" else: logger.error(f"未知的sum_service配置: {self.url_sum_service}") return msg: ChatMessage = e_context["context"]["msg"] user_id = msg.from_user_id user_params = self.params_cache.get(user_id, {}) isgroup = e_context["context"].get("isgroup", False) prompt = user_params.get('prompt', self.url_sum_prompt) headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}' } payload = json.dumps({ "link": content, "prompt": prompt, "model": model, "base": api_base }) additional_content = "" # 在 try 块之前初始化 additional_content try: logger.info('Sending request to LLM...') api_url = "https://ai.sum4all.site" response = requests.post(api_url, headers=headers, data=payload) response.raise_for_status() logger.info('Received response from LLM.') response_data = response.json() # 解析响应的 JSON 数据 if response_data.get("success"): content = response_data["content"].replace("\\n", "\n") # 替换 \\n 为 \n self.params_cache[user_id]['content'] = content # 新增加的部分,用于解析 meta 数据 meta = response_data.get("meta", {}) # 如果没有 meta 数据,则默认为空字典 title = meta.get("og:title", "") # 获取 og:title,如果没有则默认为空字符串 self.params_cache[user_id]['title'] = title # 只有当 title 非空时,才加入到回复中 if title: additional_content += f"{title}\n\n" reply_content = additional_content + content # 将内容加入回复 else: reply_content = "Content not found or error in response" except requests.exceptions.RequestException as e: # 处理可能出现的错误 logger.error(f"Error calling new combined api: {e}") reply_content = f"An error occurred" reply = Reply() reply.type = ReplyType.TEXT if not self.url_sum_qa_enabled: reply.content = remove_markdown(reply_content) elif isgroup or not self.note_enabled: # reply.content = f"{remove_markdown(reply_content)}\n\n💬5min内输入{self.url_sum_qa_prefix}+问题,可继续追问" reply.content = f"{remove_markdown(reply_content)}" elif self.note_enabled: reply.content = f"{remove_markdown(reply_content)}\n\n💬5min内输入{self.url_sum_qa_prefix}+问题,可继续追问。\n\n📒输入{self.note_prefix}+笔记,可发送当前总结&笔记到{self.note_service}" e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS def handle_bibigpt(self, content, e_context): headers = { 'Content-Type': 'application/json' } payload_params = { "url": content, "includeDetail": False, "promptConfig": { "outputLanguage": self.outputLanguage } } payload = json.dumps(payload_params) try: api_url = f"https://bibigpt.co/api/open/{self.bibigpt_key}" response = requests.request("POST",api_url, headers=headers, data=payload) response.raise_for_status() data = json.loads(response.text) summary_original = data.get('summary', 'Summary not available') html_url = data.get('htmlUrl', 'HTML URL not available') # 获取短链接 short_url = self.short_url(html_url) # 如果获取短链接失败,使用 html_url if short_url is None: short_url = html_url if html_url != 'HTML URL not available' else 'URL not available' # 移除 "##摘要"、"## 亮点" 和 "-" summary = summary_original.split("详细版(支持对话追问)")[0].replace("## 摘要\n", "📌总结:").replace("## 亮点\n", "").replace("- ", "") except requests.exceptions.RequestException as e: reply = f"An error occurred" reply = Reply() reply.type = ReplyType.TEXT reply.content = f"{summary}\n\n详细链接:{short_url}" e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS def handle_opensum(self, content, e_context): headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {self.opensum_key}' } payload = json.dumps({"link": content}) try: api_url = "https://read.thinkwx.com/api/v1/article/summary" response = requests.request("POST",api_url, headers=headers, data=payload) response.raise_for_status() data = json.loads(response.text) summary_data = data.get('data', {}) # 获取data字段 summary_original = summary_data.get('summary', 'Summary not available') # 使用正则表达式提取URL url_pattern = r'https:\/\/[^\s]+' match = re.search(url_pattern, summary_original) html_url = match.group(0) if match else 'HTML URL not available' # 获取短链接 short_url = self.short_url(html_url) if match else html_url # 用于移除摘要中的URL及其后的所有内容 url_pattern_remove = r'https:\/\/[^\s]+[\s\S]*' summary = re.sub(url_pattern_remove, '', summary_original).strip() except requests.exceptions.RequestException as e: summary = f"An error occurred" short_url = 'URL not available' reply = Reply() reply.type = ReplyType.TEXT reply.content = f"{summary}\n\n详细链接:{short_url}" e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS def handle_search(self, content, e_context): # 根据sum_service的值选择API密钥和基础URL if self.search_sum_service == "openai": api_key = self.open_ai_api_key api_base = self.open_ai_api_base model = self.model elif self.search_sum_service == "sum4all": api_key = self.sum4all_key api_base = "https://pro.sum4all.site/v1" model = "sum4all" elif self.search_sum_service == "gemini": api_key = self.gemini_key model = "gemini" api_base = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent" else: logger.error(f"未知的search_service配置: {self.search_sum_service}") return headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}' } content = content[len(self.search_sum_search_prefix):] payload = json.dumps({ "ur": content, "prompt": self.search_sum_prompt, "model": model, "base": api_base, "search1api_key": self.search1api_key, "search_service": self.search_service }) try: api_url = "https://ai.sum4all.site" response = requests.post(api_url, headers=headers, data=payload) response.raise_for_status() response_data = response.json() # 解析响应的 JSON 数据 if response_data.get("success"): content = response_data["content"].replace("\\n", "\n") # 替换 \\n 为 \n reply_content = content # 将内容加入回复 # 解析 meta 数据 meta = response_data.get("meta", {}) # 如果没有 meta 数据,则默认为空字典 title = meta.get("og:title", "") # 获取 og:title,如果没有则默认为空字符串 og_url = meta.get("og:url", "") # 获取 og:url,如果没有则默认为空字符串 # 打印 title 和 og_url 以调试 print("Title:", title) print("Original URL:", og_url) # 只有当 title 和 url 非空时,才加入到回复中 if title: reply_content += f"\n\n参考文章:{title}" if og_url: short_url = self.short_url(og_url) # 获取短链接 reply_content += f"\n\n参考链接:{short_url}" else: content = "Content not found or error in response" except requests.exceptions.RequestException as e: # 处理可能出现的错误 logger.error(f"Error calling new combined api: {e}") reply_content = f"An error occurred" reply = Reply() reply.type = ReplyType.TEXT reply.content = f"{remove_markdown(reply_content)}" e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS def handle_perplexity(self, content, e_context): headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {self.perplexity_key}' } data = { "model": "sonar-small-online", "messages": [ {"role": "system", "content": self.search_sum_prompt}, {"role": "user", "content": content} ] } try: api_url = "https://api.perplexity.ai/chat/completions" response = requests.post(api_url, headers=headers, json=data) response.raise_for_status() # 处理响应数据 response_data = response.json() # 这里可以根据你的需要处理响应数据 # 解析 JSON 并获取 content if "choices" in response_data and len(response_data["choices"]) > 0: first_choice = response_data["choices"][0] if "message" in first_choice and "content" in first_choice["message"]: content = first_choice["message"]["content"] else: print("Content not found in the response") else: print("No choices available in the response") except requests.exceptions.RequestException as e: # 处理可能出现的错误 logger.error(f"Error calling perplexity: {e}") reply = Reply() reply.type = ReplyType.TEXT reply.content = f"{remove_markdown(content)}" e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS def get_help_text(self, verbose=False, **kwargs): help_text = "Help you summarize all things\n" if not verbose: return help_text help_text += "1.Share me the link and I will summarize it for you\n" help_text += f"2.{self.search_sum_search_prefix}+query,I will search online for you\n" return help_text def handle_file(self, content, e_context): logger.info("handle_file: 向LLM发送内容总结请求") # 根据sum_service的值选择API密钥和基础URL if self.file_sum_service == "openai": api_key = self.open_ai_api_key api_base = self.open_ai_api_base model = self.model elif self.file_sum_service == "sum4all": api_key = self.sum4all_key api_base = "https://pro.sum4all.site/v1" model = "sum4all" elif self.file_sum_service == "gemini": api_key = self.gemini_key model = "gemini" api_base = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent" else: logger.error(f"未知的sum_service配置: {self.file_sum_service}") return msg: ChatMessage = e_context["context"]["msg"] user_id = msg.from_user_id user_params = self.params_cache.get(user_id, {}) prompt = user_params.get('prompt', self.file_sum_prompt) if model == "gemini": headers = { 'Content-Type': 'application/json', 'x-goog-api-key': api_key } data = { "contents": [ {"role": "user", "parts": [{"text": prompt}]}, {"role": "model", "parts": [{"text": "okay"}]}, {"role": "user", "parts": [{"text": content}]} ], "generationConfig": { "maxOutputTokens": 800 } } api_url = api_base else: headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}' } data = { "model": model, "messages": [ {"role": "system", "content": prompt}, {"role": "user", "content": content} ] } api_url = f"{api_base}/chat/completions" try: response = requests.post(api_url, headers=headers, data=json.dumps(data)) response.raise_for_status() response_data = response.json() # 解析 JSON 并获取 content if model == "gemini": if "candidates" in response_data and len(response_data["candidates"]) > 0: first_candidate = response_data["candidates"][0] if "content" in first_candidate: if "parts" in first_candidate["content"] and len(first_candidate["content"]["parts"]) > 0: response_content = first_candidate["content"]["parts"][0]["text"].strip() # 获取响应内容 logger.info(f"Gemini API response content: {response_content}") # 记录响应内容 reply_content = response_content.replace("\\n", "\n") # 替换 \\n 为 \n else: logger.error("Parts not found in the Gemini API response content") reply_content = "Parts not found in the Gemini API response content" else: logger.error("Content not found in the Gemini API response candidate") reply_content = "Content not found in the Gemini API response candidate" else: logger.error("No candidates available in the Gemini API response") reply_content = "No candidates available in the Gemini API response" else: if "choices" in response_data and len(response_data["choices"]) > 0: first_choice = response_data["choices"][0] if "message" in first_choice and "content" in first_choice["message"]: response_content = first_choice["message"]["content"].strip() # 获取响应内容 logger.info(f"LLM API response content") # 记录响应内容 reply_content = response_content.replace("\\n", "\n") # 替换 \\n 为 \n else: logger.error("Content not found in the response") reply_content = "Content not found in the LLM API response" else: logger.error("No choices available in the response") reply_content = "No choices available in the LLM API response" except requests.exceptions.RequestException as e: logger.error(f"Error calling LLM API: {e}") reply_content = f"An error occurred while calling LLM API" reply = Reply() reply.type = ReplyType.TEXT # reply.content = f"{remove_markdown(reply_content)}\n\n💬5min内输入{self.file_sum_qa_prefix}+问题,可继续追问" reply.content = f"{remove_markdown(reply_content)}" e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS def read_pdf(self, file_path): logger.info(f"开始读取PDF文件:{file_path}") doc = fitz.open(file_path) content = ' '.join([page.get_text() for page in doc]) logger.info(f"PDF文件读取完成:{file_path}") return content def read_word(self, file_path): doc = Document(file_path) return ' '.join([p.text for p in doc.paragraphs]) def read_markdown(self, file_path): with open(file_path, 'r', encoding='utf-8') as file: md_content = file.read() return markdown.markdown(md_content) def read_excel(self, file_path): workbook = load_workbook(file_path) content = '' for sheet in workbook: for row in sheet.iter_rows(): content += ' '.join([str(cell.value) for cell in row]) content += '\n' return content def read_txt(self, file_path): logger.debug(f"开始读取TXT文件: {file_path}") try: with open(file_path, 'r', encoding='utf-8') as file: content = file.read() logger.debug(f"TXT文件读取完成: {file_path}") logger.debug("TXT文件内容的前50个字符:") logger.debug(content[:50]) # 打印文件内容的前50个字符 return content except Exception as e: logger.error(f"读取TXT文件时出错: {file_path},错误信息: {str(e)}") return "" def read_csv(self, file_path): content = '' with open(file_path, 'r', encoding='utf-8') as csvfile: reader = csv.reader(csvfile) for row in reader: content += ' '.join(row) + '\n' return content def read_html(self, file_path): with open(file_path, 'r', encoding='utf-8') as file: soup = BeautifulSoup(file, 'html.parser') return soup.get_text() def read_ppt(self, file_path): presentation = Presentation(file_path) content = '' for slide in presentation.slides: for shape in slide.shapes: if hasattr(shape, "text"): content += shape.text + '\n' return content def extract_content(self, file_path): logger.info(f"extract_content: 提取文件内容,文件路径: {file_path}") file_size = os.path.getsize(file_path) // 1000 # 将文件大小转换为KB if file_size > int(self.max_file_size): logger.warning(f"文件大小超过限制({self.max_file_size}KB),不进行处理。文件大小: {file_size}KB") return None file_extension = os.path.splitext(file_path)[1][1:].lower() logger.info(f"extract_content: 文件类型为 {file_extension}") file_type = EXTENSION_TO_TYPE.get(file_extension) if not file_type: logger.error(f"不支持的文件扩展名: {file_extension}") return None read_func = { 'pdf': self.read_pdf, 'docx': self.read_word, 'md': self.read_markdown, 'txt': self.read_txt, 'excel': self.read_excel, 'csv': self.read_csv, 'html': self.read_html, 'ppt': self.read_ppt }.get(file_type) if not read_func: logger.error(f"不支持的文件类型: {file_type}") return None logger.info("extract_content: 文件内容提取完成") return read_func(file_path) def encode_image_to_base64(self, image_path): logger.info(f"开始处理图片: {image_path}") try: with Image.open(image_path) as img: logger.info(f"成功打开图片. 原始大小: {img.size}") if img.width > 1024: new_size = (1024, int(img.height*1024/img.width)) img = img.resize(new_size) img.save(image_path) # 保存调整大小后的图片 logger.info(f"调整图片大小至: {new_size}") with open(image_path, "rb") as image_file: img_byte_arr = image_file.read() logger.info(f"读取图片完成. 大小: {len(img_byte_arr)} 字节") encoded = base64.b64encode(img_byte_arr).decode('ascii') logger.info(f"Base64编码完成. 编码后长度: {len(encoded)}") return encoded except Exception as e: logger.error(f"图片编码过程中发生错误: {str(e)}", exc_info=True) raise # Function to handle OpenAI image processing def handle_image(self, base64_image, e_context): logger.info("handle_image: 解析图像处理API的响应") msg: ChatMessage = e_context["context"]["msg"] user_id = msg.from_user_id user_params = self.params_cache.get(user_id, {}) prompt = user_params.get('prompt', self.image_sum_prompt) if self.image_sum_service == "openai": api_key = self.open_ai_api_key api_base = f"{self.open_ai_api_base}/chat/completions" model = "gpt-4o-mini" elif self.image_sum_service == "xunfei": api_key = self.xunfei_api_key api_base = "https://spark.sum4all.site/v1/chat/completions" model = "spark-chat-vision" elif self.image_sum_service == "sum4all": api_key = self.sum4all_key api_base = "https://pro.sum4all.site/v1/chat/completions" model = "sum4all-vision" elif self.image_sum_service == "gemini": api_key = self.gemini_key api_base = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent" payload = { "contents": [ { "parts": [ {"text": prompt}, { "inline_data": { "mime_type":"image/png", "data": base64_image } } ] } ] } headers = { "Content-Type": "application/json", "x-goog-api-key": api_key } logger.info(f"准备发送请求. Payload大小: {len(json.dumps(payload))} 字节") else: logger.error(f"未知的image_sum_service配置: {self.image_sum_service}") return if self.image_sum_service != "gemini": payload = { "model": model, "messages": [ { "role": "user", "content": [ { "type": "text", "text": prompt }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } } ] } ], "max_tokens": 3000 } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } try: response = requests.post(api_base, headers=headers, json=payload) logger.info(f"API请求已发送. 状态码: {response.status_code}") response.raise_for_status() logger.info("API响应状态码正常,开始解析JSON") response_json = response.json() logger.info("JSON解析完成") if self.image_sum_service == "gemini": reply_content = response_json.get('candidates', [{}])[0].get('content', {}).get('parts', [{}])[0].get('text', 'No text found in the response') logger.info(f"成功解析Gemini响应. 回复内容长度: {len(reply_content)}") else: if "choices" in response_json and len(response_json["choices"]) > 0: first_choice = response_json["choices"][0] if "message" in first_choice and "content" in first_choice["message"]: response_content = first_choice["message"]["content"].strip() logger.info("LLM API response content") reply_content = response_content else: logger.error("Content not found in the response") reply_content = "Content not found in the LLM API response" else: logger.error("No choices available in the response") reply_content = "No choices available in the LLM API response" except Exception as e: logger.error(f"Error processing LLM API response: {e}") reply_content = f"An error occurred while processing LLM API response" reply = Reply() reply.type = ReplyType.TEXT # reply.content = f"{remove_markdown(reply_content)}\n\n💬5min内输入{self.image_sum_qa_prefix}+问题,可继续追问" reply.content = f"{remove_markdown(reply_content)}" e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS def remove_markdown(text): # 替换Markdown的粗体标记 text = text.replace("**", "") # 替换Markdown的标题标记 text = text.replace("### ", "").replace("## ", "").replace("# ", "") return text