浏览代码

调整图片下载处理

develop
H Vs 1周前
父节点
当前提交
2c3176c8cf
共有 4 个文件被更改,包括 85 次插入13 次删除
  1. +2
    -2
      app/endpoints/agent_endpoint.py
  2. +7
    -1
      app/endpoints/pipeline_endpoint.py
  3. +42
    -2
      model/models.py
  4. +34
    -8
      services/gewe_service.py

+ 2
- 2
app/endpoints/agent_endpoint.py 查看文件

@@ -141,10 +141,10 @@ async def waitting_login_result(request: Request, token_id, app_id,region_id, ag
config=AgentConfig.model_validate({ config=AgentConfig.model_validate({
"chatroomIdWhiteList": [], "chatroomIdWhiteList": [],
"agentTokenId": agent_token_id, "agentTokenId": agent_token_id,
"agentEnabled": False,
"agentEnabled": True,
"addContactsFromChatroomIdWhiteList": [], "addContactsFromChatroomIdWhiteList": [],
"chatWaitingMsgEnabled": True, "chatWaitingMsgEnabled": True,
"privateGroupChatEnabled": True
"privateGroupChatEnabled": False


}) })
else: else:


+ 7
- 1
app/endpoints/pipeline_endpoint.py 查看文件

@@ -778,7 +778,9 @@ async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,fro
callback_to_user=from_wxid callback_to_user=from_wxid
hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content) wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content)

if not wx_img_url:
logger.warning(f'{wxid} 下载 {callback_to_user} 图片失败')
return
oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
@@ -819,6 +821,10 @@ async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_da


img_xml=f'<?xml version=\"1.0\"?>\n<msg>\n\t<img aeskey=\"{aeskey}\" encryver=\"1\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnthumburl}\" cdnthumblength=\"{cdnthumblength}\" cdnthumbheight=\"{cdnthumbheight}\" cdnthumbwidth=\"{cdnthumbwidth}\" cdnmidheight=\"0\" cdnmidwidth=\"0\" cdnhdheight=\"0\" cdnhdwidth=\"0\" cdnmidimgurl=\"{cdnthumburl}\" length=\"{length}\" md5=\"{md5}\" />\n\t<platform_signature></platform_signature>\n\t<imgdatahash></imgdatahash>\n</msg>' img_xml=f'<?xml version=\"1.0\"?>\n<msg>\n\t<img aeskey=\"{aeskey}\" encryver=\"1\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnthumburl}\" cdnthumblength=\"{cdnthumblength}\" cdnthumbheight=\"{cdnthumbheight}\" cdnthumbwidth=\"{cdnthumbwidth}\" cdnmidheight=\"0\" cdnmidwidth=\"0\" cdnhdheight=\"0\" cdnhdwidth=\"0\" cdnmidimgurl=\"{cdnthumburl}\" length=\"{length}\" md5=\"{md5}\" />\n\t<platform_signature></platform_signature>\n\t<imgdatahash></imgdatahash>\n</msg>'
wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,img_xml) wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,img_xml)
if not wx_img_url:
logger.warning(f'{wxid} 下载 {callback_to_user} 图片失败')
return
oss_url=wx_img_url_to_oss_url(wx_img_url) oss_url=wx_img_url_to_oss_url(wx_img_url)


reply_content = re.sub(r'<\?xml.*', f'{oss_url}', msg_content, flags=re.DOTALL) reply_content = re.sub(r'<\?xml.*', f'{oss_url}', msg_content, flags=re.DOTALL)


+ 42
- 2
model/models.py 查看文件

@@ -12,10 +12,10 @@ import time
class AgentConfig(BaseModel): class AgentConfig(BaseModel):
chatroomIdWhiteList: List[str] = [] chatroomIdWhiteList: List[str] = []
agentTokenId: str agentTokenId: str
agentEnabled: bool
agentEnabled: bool=True
addContactsFromChatroomIdWhiteList: List[str] = [] addContactsFromChatroomIdWhiteList: List[str] = []
chatWaitingMsgEnabled: bool chatWaitingMsgEnabled: bool
privateGroupChatEnabled: bool=True
privateGroupChatEnabled: bool=False




@dataclass @dataclass
@@ -33,6 +33,46 @@ class OperationType(Enum):
ACCEPT_FRIEND = 3 ACCEPT_FRIEND = 3
REJECT_FRIEND = 4 REJECT_FRIEND = 4


# 聊天内容类型
@unique
class MessageContentType(Enum):
TEXT = 1
IMAGE = 2
VOICE = 3
VIDEO = 4
FILE = 5
LOCATION = 6
LINK = 7
SYSTEM = 8
UNKNOWN = 9
# Group_TEXT = 11
# Group_IMAGE = 12
# Group_VOICE = 13
# Group_VIDEO = 14
# Group_FILE = 15
# Group_LOCATION = 16
# Group_LINK = 17
# Group_SYSTEM = 18
# Group_UNKNOWN = 19
# Group_NOTICE = 20

# 聊天方式:私聊还是群聊
@unique
class ChatType(Enum):
PRIVATE = 1
GROUP = 2

@dataclass
class WxChatMessage(BaseModel):
contentType:int
chatType:int
belongToWxid:str
fromWxid:str
toWxid:str
content:str
Timestamp:int




def validate_wxid(func): def validate_wxid(func):
@wraps(func) @wraps(func)


+ 34
- 8
services/gewe_service.py 查看文件

@@ -10,6 +10,8 @@ import time,datetime
import uuid import uuid
from fastapi import FastAPI, Depends from fastapi import FastAPI, Depends
from common.singleton import singleton from common.singleton import singleton
from typing import Optional
from model.models import WxChatMessage


from aiohttp import ClientError from aiohttp import ClientError
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
@@ -656,9 +658,11 @@ class GeWeService:
if data['ret'] == 200: if data['ret'] == 200:
return data['data']['fileUrl'] return data['data']['fileUrl']
else: else:
return False
logger.warning(f'下载图片失败 {data.get('ret', None),data.get("msg", None), data.get("data", None)}')
return ""
else: else:
return False
logger.warning(f'下载图片失败 {response.status} {response.reason}')
return ""
async def download_cdn_msg_async(self, token_id:str,aeskey: str, file_id: str, type: str,total_size:str,suffix:str): async def download_cdn_msg_async(self, token_id:str,aeskey: str, file_id: str, type: str,total_size:str,suffix:str):
@@ -1622,12 +1626,34 @@ class GeWeService:
data=await self.redis_service.dequeue(hash_key) data=await self.redis_service.dequeue(hash_key)
return json.loads(data) if data else {} return json.loads(data) if data else {}


# async def acquire_task_run_time_lock_async(self,task_name,run_time,expire_time=None):
# hash_key = f"__AI_OPS_WX__:{task_name}"
# if await self.redis_service.client.setnx(hash_key, run_time):
# await self.redis_service.client.expire(hash_key, expire_time)
# return True
# return False

async def enqueue_wxchat_message_async(self,msg:WxChatMessage):
"""
入列消息
"""
hash_key = f'__AI_OPS_WX__:WX_CHAT_MESSAGE:{msg.belongToWxid}'
data_str=json.dumps(msg.model_dump(),ensure_ascii=False)
await self.redis_service.enqueue(hash_key,data_str)



async def dequeue_wxchat_message_async(self,wxid)->Optional[WxChatMessage]:
"""
出列消息
"""
hash_key = f'__AI_OPS_WX__:WX_CHAT_MESSAGE:{wxid}'
data = await self.redis_service.dequeue(hash_key)
if not data:
return None
try:
# 将 JSON 数据反序列化为 WxChatMessage 对象
return WxChatMessage.model_validate(data)
except Exception as e:
# 处理反序列化错误
print(f"Failed to deserialize message: {e}")
return None



async def save_task_run_time_async(self,task_name,log:list,expire_time=None): async def save_task_run_time_async(self,task_name,log:list,expire_time=None):
''' '''


正在加载...
取消
保存