From 17c460c7dded195f42089894d6899351a453db2e Mon Sep 17 00:00:00 2001 From: H Vs Date: Wed, 30 Apr 2025 15:37:49 +0800 Subject: [PATCH] =?UTF-8?q?#AI=20=E6=89=8B=E5=8A=A8=E6=81=A2=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/endpoints/pipeline_endpoint.py | 35 +++++++++++++++++++++++++----- services/gewe_service.py | 20 +++++++++++++++++ 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index 5acb832..cb14f70 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -368,28 +368,52 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from ''' 私聊文本消息 ''' + msg_content=msg_data["Content"]["string"] + config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) validated_config = AgentConfig.model_validate(config) + if not validated_config.privateGroupChatEnabled: logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_async.__name__} 不回复消息") return # 判断是否转人工处理功能 is_human_handle_msg= await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_async(wxid,from_wxid) - if is_human_handle_msg: - logger.warning(f'微信号 {wxid} 发送到微信号{to_wxid} 暂时工人接管30分钟中') - return + is_human_handle_msg_with_contact_wxid_key=await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_key_word_async(wxid,from_wxid) + if is_human_handle_msg or is_human_handle_msg_with_contact_wxid_key: + #logger.warning(f'微信号 {wxid} 发送到微信号{to_wxid} 暂时工人接管30分钟中') + if is_human_handle_msg: + logger.warning(f'微信号 {wxid} 之前收到手动发送内容到微信号 {to_wxid} 暂时工人接管30分钟中') + if is_human_handle_msg_with_contact_wxid_key: + logger.warning(f'微信号 {wxid} 的好友微信号 {to_wxid} 触发关键词, 暂时工人接管30分钟中') + human_handle_expire_time=is_human_handle_msg or is_human_handle_msg_with_contact_wxid_key - msg_content=msg_data["Content"]["string"] + expire_time=int(human_handle_expire_time)-int(time.time()) + await request.app.state.gewe_service.set_human_handle_msg_last_msg_content(wxid,from_wxid,msg_content,expire_time) + return + + + if wxid == from_wxid: #手动发送消息 logger.info(f"{wxid} 手动发送消息") await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid]) callback_to_user=msg_data["ToUserName"]["string"] + human_handle_msg_last_msg_content=await request.app.state.gewe_service.get_human_handle_msg_last_msg_content(wxid,from_wxid) + # 恢复 AI 回答 + if '#AI' in msg_content and human_handle_msg_last_msg_content: + # 删除标记缓存 + await request.app.state.gewe_service.del_human_handle_msg_with_contact_wxid_async(wxid,to_wxid) + await request.app.state.gewe_service.del_human_handle_msg_with_contact_wxid_key_word_async(wxid,to_wxid) + await request.app.state.gewe_service.del_human_handle_msg_last_msg_content_async(wxid,to_wxid) + + await ai_chat_text_async(request,token_id, app_id, wxid, msg_data, msg_content) + return # 转人工处理功能 - await request.app.state.gewe_service.set_human_handle_msg_with_contact_wxid_async(wxid,to_wxid,60*30) + else: + await request.app.state.gewe_service.set_human_handle_msg_with_contact_wxid_async(wxid,to_wxid,60*30) # 推送到kafka input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] @@ -456,7 +480,6 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from is_human_handle_msg_with_contact_wxid= await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_key_word_async(wxid,callback_to_user) if is_human_handle_msg_with_contact_wxid: logger.warning(f'微信号 {wxid} 与 {callback_to_user} 有关键字匹配,暂时工人接管30分钟中,请查看长服AI商机提醒助理') - return request.app.state.message_lock[app_id]= asyncio.Lock() diff --git a/services/gewe_service.py b/services/gewe_service.py index 90f2893..17fa26e 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -1840,6 +1840,10 @@ class GeWeService: hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_WTIH_CONTACT:{wxid}:{contact_wxid}" await self.redis_service.set_hash(hash_key,{"data": json.dumps(int(time.time()), ensure_ascii=False)}, expire_time) + async def del_human_handle_msg_with_contact_wxid_async(self,wxid,contact_wxid): + hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_WTIH_CONTACT:{wxid}:{contact_wxid}" + await self.redis_service.delete_hash(hash_key) + async def is_human_handle_msg_with_contact_wxid_key_word_async(self,wxid,contact_wxid)->bool: hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_WTIH_CONTACT_KEY_WORD:{wxid}:{contact_wxid}" cache= await self.redis_service.get_hash_field(hash_key,"data") @@ -1850,8 +1854,24 @@ class GeWeService: await self.redis_service.set_hash(hash_key,{"data": json.dumps(int(time.time()), ensure_ascii=False)}, expire_time) + async def del_human_handle_msg_with_contact_wxid_key_word_async(self,wxid,contact_wxid): + hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_WTIH_CONTACT_KEY_WORD:{wxid}:{contact_wxid}" + await self.redis_service.delete_hash(hash_key) + async def set_human_handle_msg_last_msg_content(self,wxid,contact_wxid,content,expire_time=60*30): + hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_LAST_MSG:{wxid}:{contact_wxid}" + await self.redis_service.set_hash(hash_key,{"data": content}, expire_time) + + async def get_human_handle_msg_last_msg_content(self,wxid,contact_wxid)->str: + hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_LAST_MSG:{wxid}:{contact_wxid}" + return await self.redis_service.get_hash_field(hash_key,"data") + + async def del_human_handle_msg_last_msg_content_async(self,wxid,contact_wxid)->str: + hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_LAST_MSG:{wxid}:{contact_wxid}" + return await self.redis_service.delete_hash(hash_key) + + # 依赖项:获取 GeWeChatCom 单例 async def get_gewe_service(app: FastAPI = Depends()) -> GeWeService: return app.state.gewe_service \ No newline at end of file