From 355d06ed2d6a617394dfc9560f324a1235bf0f24 Mon Sep 17 00:00:00 2001 From: H Vs Date: Tue, 22 Apr 2025 10:27:41 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/endpoints/pipeline_endpoint.py | 18 +++++++----------- run.py | 8 ++++---- services/gewe_service.py | 11 ++++++++++- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index ffd306c..3336a2b 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -85,7 +85,7 @@ async def handle_self_cmd_async(request: Request,wxid,msg): to_wxid=msg_data["ToUserName"]["string"] from_wxid=msg_data["FromUserName"]["string"] msg_content=msg_data["Content"]["string"] - if from_wxid == to_wxid: + if from_wxid == to_wxid and wxid == to_wxid: commands = { '启用托管': True, '关闭托管': False @@ -98,7 +98,7 @@ async def handle_self_cmd_async(request: Request,wxid,msg): await request.app.state.gewe_service.save_wxchat_config_async(wxid, agent_config.model_dump()) logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管') - + return {"message": "收到微信回调消息"} @@ -374,7 +374,7 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from return # 判断是否转人工处理功能 - is_human_handle_msg= await request.app.state.gewe_service.is_human_handle_msg_async(to_wxid) + is_human_handle_msg= await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_async(wxid,from_wxid) if is_human_handle_msg: logger.warning(f'微信号 {wxid} 发送到微信号{to_wxid} 暂时工人接管30分钟中') return @@ -383,12 +383,12 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from msg_content=msg_data["Content"]["string"] if wxid == from_wxid: #手动发送消息 - logger.info("Active message sending detected") + logger.info(f"{wxid} 手动发送消息") await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid]) callback_to_user=msg_data["ToUserName"]["string"] # 转人工处理功能 - await request.app.state.gewe_service.set_human_handle_msg_async(to_wxid,60*30) + await request.app.state.gewe_service.set_human_handle_msg_with_contact_wxid_async(wxid,to_wxid,60*30) # 推送到kafka input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] @@ -402,7 +402,7 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from keywords = ["预约", "报价", "购买", "价钱","价格", "多少钱", "下单"] found_keywords = [keyword for keyword in keywords if keyword in msg_content] if found_keywords: - await request.app.state.gewe_service.set_human_handle_msg_with_contact_wxid_async(wxid,callback_to_user,60*30) + await request.app.state.gewe_service.set_human_handle_msg_with_contact_wxid_key_word_async(wxid,callback_to_user,60*30) logger.info(f"{wxid} 收到 {callback_to_user} 私聊消息匹配到关键词:{', '.join(found_keywords)}") # 回复好友 @@ -437,15 +437,11 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from await asyncio.sleep(random.uniform(1.5,3)) assistant_msg_content=f'收到,{assistant_msg_content}' await request.app.state.gewe_service.post_text_async(kf_token_id, kf_appid, wxid, assistant_msg_content) - - - - return # 是否在转人工处理 - is_human_handle_msg_with_contact_wxid= await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_async(wxid,callback_to_user) + is_human_handle_msg_with_contact_wxid= await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_key_word_async(wxid,callback_to_user) if is_human_handle_msg_with_contact_wxid: logger.warning(f'微信号 {wxid} 与 {callback_to_user} 有关键字匹配,暂时工人接管30分钟中,请查看长服AI商机提醒助理') diff --git a/run.py b/run.py index 4512bb2..f8edcd2 100644 --- a/run.py +++ b/run.py @@ -35,13 +35,13 @@ def start_celery_beat(): if __name__ == "__main__": # 启动 FastAPI、Celery Worker 和 Celery Beat fastapi_process = start_fastapi() - celery_worker_process = start_celery_worker() - celery_beat_process = start_celery_beat() + #celery_worker_process = start_celery_worker() + #celery_beat_process = start_celery_beat() # 等待子进程完成 fastapi_process.wait() - celery_worker_process.wait() - celery_beat_process.wait() + #celery_worker_process.wait() + #celery_beat_process.wait() # def signal_handler(sig, frame): diff --git a/services/gewe_service.py b/services/gewe_service.py index e5f24c4..e1f29d9 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -1747,8 +1747,17 @@ class GeWeService: async def set_human_handle_msg_with_contact_wxid_async(self,wxid,contact_wxid,expire_time=60*30)->bool: hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_WTIH_CONTACT:{wxid}:{contact_wxid}" - cache= await self.redis_service.set_hash(hash_key,{"data": json.dumps(int(time.time()), ensure_ascii=False)}, expire_time) + await self.redis_service.set_hash(hash_key,{"data": json.dumps(int(time.time()), ensure_ascii=False)}, expire_time) + + async def is_human_handle_msg_with_contact_wxid_key_word_async(self,wxid,contact_wxid)->bool: + hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_WTIH_CONTACT_KEY_WORD:{wxid}:{contact_wxid}" + cache= await self.redis_service.get_hash_field(hash_key,"data") return True if cache else False + + async def set_human_handle_msg_with_contact_wxid_key_word_async(self,wxid,contact_wxid,expire_time=60*30)->bool: + hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_WTIH_CONTACT_KEY_WORD:{wxid}:{contact_wxid}" + await self.redis_service.set_hash(hash_key,{"data": json.dumps(int(time.time()), ensure_ascii=False)}, expire_time) +