diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index 808ca10..943732e 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -17,7 +17,7 @@ from common.memory import * import traceback import sys -timeout_duration = 8.0 +timeout_duration = 2.0 messages_router = APIRouter() @@ -454,6 +454,8 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from return + request.app.state.message_lock[app_id]= asyncio.Lock() + # 创建并启动任务协程,将参数传递给 ai_chat_text 函数 task = asyncio.create_task( ai_chat_text_async(request,token_id, app_id, wxid, msg_data, msg_content) @@ -469,11 +471,12 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from # 取消定时器 timeout_timer.cancel() -message_lock = asyncio.Lock() +#message_lock = asyncio.Lock() async def check_timeout_async(task: asyncio.Task, request: Request,token_id, wxid, app_id, callback_to_user): await asyncio.sleep(timeout_duration) # 等待超时时间 if not task.done(): + message_lock=request.app.state.message_lock[app_id] async with message_lock: print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}") wx_config = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) @@ -588,6 +591,7 @@ async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) return + message_lock=request.app.state.message_lock[app_id] # 昵称替换 replacements = { '{昵称}': wxid_nickname, @@ -598,6 +602,7 @@ async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, # 判断图片url img_urls,reply_content=extract_and_replace_image_urls(reply_content) if img_urls: + #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content) async with message_lock: await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content) @@ -872,6 +877,8 @@ async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,fro msg_content=msg_data["Content"]["string"] msg_id=msg_data["MsgId"] + request.app.state.message_lock[app_id]=asyncio.Lock() + config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) validated_config = AgentConfig.model_validate(config) if not validated_config.privateGroupChatEnabled: @@ -918,6 +925,7 @@ async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,fro # 判断图片url img_urls,reply_content=extract_and_replace_image_urls(reply_content) if img_urls: + message_lock=request.app.state.message_lock[app_id] #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content) async with message_lock: await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content) diff --git a/app/main.py b/app/main.py index 3258f7d..946a19b 100644 --- a/app/main.py +++ b/app/main.py @@ -161,6 +161,8 @@ async def lifespan(app: FastAPI): # # 初始化 GeWeChatCom #app.state.gwechat_service = GeWeService(app) + app.state.message_lock={} + # 初始化业务服务 biz_service = BizService(app) app.state.biz_service = biz_service