diff --git a/app/endpoints/agent_endpoint.py b/app/endpoints/agent_endpoint.py index 6e1beb4..de37904 100644 --- a/app/endpoints/agent_endpoint.py +++ b/app/endpoints/agent_endpoint.py @@ -141,10 +141,10 @@ async def waitting_login_result(request: Request, token_id, app_id,region_id, ag config=AgentConfig.model_validate({ "chatroomIdWhiteList": [], "agentTokenId": agent_token_id, - "agentEnabled": False, + "agentEnabled": True, "addContactsFromChatroomIdWhiteList": [], "chatWaitingMsgEnabled": True, - "privateGroupChatEnabled": True + "privateGroupChatEnabled": False }) else: diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index ae55a10..746816c 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -778,7 +778,9 @@ async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,fro callback_to_user=from_wxid hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content) - + if not wx_img_url: + logger.warning(f'{wxid} 下载 {callback_to_user} 图片失败') + return oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" @@ -819,6 +821,10 @@ async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_da img_xml=f'\n\n\t\n\t\n\t\n' wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,img_xml) + if not wx_img_url: + logger.warning(f'{wxid} 下载 {callback_to_user} 图片失败') + return + oss_url=wx_img_url_to_oss_url(wx_img_url) reply_content = re.sub(r'<\?xml.*', f'{oss_url}', msg_content, flags=re.DOTALL) diff --git a/model/models.py b/model/models.py index 9240dab..1d6b434 100644 --- a/model/models.py +++ b/model/models.py @@ -12,10 +12,10 @@ import time class AgentConfig(BaseModel): chatroomIdWhiteList: List[str] = [] agentTokenId: str - agentEnabled: bool + agentEnabled: bool=True addContactsFromChatroomIdWhiteList: List[str] = [] chatWaitingMsgEnabled: bool - privateGroupChatEnabled: bool=True + privateGroupChatEnabled: bool=False @dataclass @@ -33,6 +33,46 @@ class OperationType(Enum): ACCEPT_FRIEND = 3 REJECT_FRIEND = 4 +# 聊天内容类型 +@unique +class MessageContentType(Enum): + TEXT = 1 + IMAGE = 2 + VOICE = 3 + VIDEO = 4 + FILE = 5 + LOCATION = 6 + LINK = 7 + SYSTEM = 8 + UNKNOWN = 9 + # Group_TEXT = 11 + # Group_IMAGE = 12 + # Group_VOICE = 13 + # Group_VIDEO = 14 + # Group_FILE = 15 + # Group_LOCATION = 16 + # Group_LINK = 17 + # Group_SYSTEM = 18 + # Group_UNKNOWN = 19 + # Group_NOTICE = 20 + +# 聊天方式:私聊还是群聊 +@unique +class ChatType(Enum): + PRIVATE = 1 + GROUP = 2 + +@dataclass +class WxChatMessage(BaseModel): + contentType:int + chatType:int + belongToWxid:str + fromWxid:str + toWxid:str + content:str + Timestamp:int + + def validate_wxid(func): @wraps(func) diff --git a/services/gewe_service.py b/services/gewe_service.py index a8e5f95..029721e 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -10,6 +10,8 @@ import time,datetime import uuid from fastapi import FastAPI, Depends from common.singleton import singleton +from typing import Optional +from model.models import WxChatMessage from aiohttp import ClientError from json.decoder import JSONDecodeError @@ -656,9 +658,11 @@ class GeWeService: if data['ret'] == 200: return data['data']['fileUrl'] else: - return False + logger.warning(f'下载图片失败 {data.get('ret', None),data.get("msg", None), data.get("data", None)}') + return "" else: - return False + logger.warning(f'下载图片失败 {response.status} {response.reason}') + return "" async def download_cdn_msg_async(self, token_id:str,aeskey: str, file_id: str, type: str,total_size:str,suffix:str): @@ -1622,12 +1626,34 @@ class GeWeService: data=await self.redis_service.dequeue(hash_key) return json.loads(data) if data else {} - # async def acquire_task_run_time_lock_async(self,task_name,run_time,expire_time=None): - # hash_key = f"__AI_OPS_WX__:{task_name}" - # if await self.redis_service.client.setnx(hash_key, run_time): - # await self.redis_service.client.expire(hash_key, expire_time) - # return True - # return False + + async def enqueue_wxchat_message_async(self,msg:WxChatMessage): + """ + 入列消息 + """ + hash_key = f'__AI_OPS_WX__:WX_CHAT_MESSAGE:{msg.belongToWxid}' + data_str=json.dumps(msg.model_dump(),ensure_ascii=False) + await self.redis_service.enqueue(hash_key,data_str) + + + + async def dequeue_wxchat_message_async(self,wxid)->Optional[WxChatMessage]: + """ + 出列消息 + """ + hash_key = f'__AI_OPS_WX__:WX_CHAT_MESSAGE:{wxid}' + data = await self.redis_service.dequeue(hash_key) + + if not data: + return None + try: + # 将 JSON 数据反序列化为 WxChatMessage 对象 + return WxChatMessage.model_validate(data) + except Exception as e: + # 处理反序列化错误 + print(f"Failed to deserialize message: {e}") + return None + async def save_task_run_time_async(self,task_name,log:list,expire_time=None): '''