From 7d4e53b9d3a0e327de58e6d46958d1d96cf32f8e Mon Sep 17 00:00:00 2001 From: H Vs Date: Thu, 10 Apr 2025 17:48:38 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BE=A4=E5=8A=A0=E5=A5=BD?= =?UTF-8?q?=E5=8F=8B=E8=A7=84=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/endpoints/pipeline_endpoint.py | 9 ++++ services/gewe_service.py | 22 ++++++++ tasks.py | 85 +++++++++++++++++------------- 3 files changed, 80 insertions(+), 36 deletions(-) diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index b72047c..eff41d5 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -407,13 +407,22 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_async.__name__} 不回复消息") return + # 判断是否转人工处理功能 + is_human_handle_msg= await request.app.state.gewe_service.is_human_handle_msg_async(wxid) + if is_human_handle_msg: + logger.warning(f'微信号 {wxid} 暂时工人接管30分钟中') + return + msg_content=msg_data["Content"]["string"] if wxid == from_wxid: #手动发送消息 logger.info("Active message sending detected") await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid]) callback_to_user=msg_data["ToUserName"]["string"] + # 转人工处理功能 + await request.app.state.gewe_service.set_human_handle_msg_async(wxid,60*30) + # 推送到kafka input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) diff --git a/services/gewe_service.py b/services/gewe_service.py index 03c4e0f..6392934 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -1469,6 +1469,19 @@ class GeWeService: cache= await self.redis_service.get_hash_field(hash_key,"data") return json.loads(cache) if cache else [] + async def save_task_run_time_by_wxid_async(self,wxid,task_name,log:list,expire_time=None): + ''' + 任务运行锁 + ''' + hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{wxid}:{task_name}" + await self.redis_service.set_hash(hash_key,{"data": json.dumps(log, ensure_ascii=False)}, expire_time) + + async def get_task_run_time_by_wxid_async(self,wxid,task_name)->list: + hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{wxid}:{task_name}" + cache= await self.redis_service.get_hash_field(hash_key,"data") + return json.loads(cache) if cache else [] + + async def save_wx_expection_async(self,wxid,api_name,exception:str,expire_time=None): hash_key = f"__AI_OPS_WX__:WX_EXCEPTION:{wxid}:{api_name}" await self.redis_service.set_hash(hash_key,{"data": json.dumps(exception, ensure_ascii=False)}, expire_time) @@ -1511,6 +1524,15 @@ class GeWeService: hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}" await self.redis_service.delete_hash(hash_key) + async def is_human_handle_msg_async(self,wxid)->bool: + hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG:{wxid}" + cache= await self.redis_service.get_hash_field(hash_key,"data") + return True if cache else False + + async def set_human_handle_msg_async(self,wxid,expire_time=60*30): + hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG:{wxid}" + await self.redis_service.set_hash(hash_key,{"data": json.dumps(int(time.time()), ensure_ascii=False)}, expire_time) + # 依赖项:获取 GeWeChatCom 单例 async def get_gewe_service(app: FastAPI = Depends()) -> GeWeService: return app.state.gewe_service \ No newline at end of file diff --git a/tasks.py b/tasks.py index c66829c..ed6022e 100644 --- a/tasks.py +++ b/tasks.py @@ -566,47 +566,31 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30) #oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) - cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms') - # if len(cache_task_run_time_logs) == oneday_times: - # logger.info(f"今日定时群成员定时添好友任务已达上限 {oneday_times} 次!") - # return - - if cache_task_run_time_logs: - sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True) - last_run_time=sorted_tasks[0].get("runTime") + # cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms') + # if cache_task_run_time_logs: + # sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True) + # last_run_time=sorted_tasks[0].get("runTime") - if last_run_time > 1e12: # 毫秒级时间戳 - last_run_time = last_run_time / 1000 # 转换为秒 - - # 将时间戳转换为 datetime 对象 - last_run_time = datetime.fromtimestamp(last_run_time) + # if last_run_time > 1e12: # 毫秒级时间戳 + # last_run_time = last_run_time / 1000 # 转换为秒 - # 获取当前时间 - current_time = datetime.now() + # # 将时间戳转换为 datetime 对象 + # last_run_time = datetime.fromtimestamp(last_run_time) - # 计算时间差 - time_difference = current_time - last_run_time + # # 获取当前时间 + # current_time = datetime.now() - # 判断是否相差2小时 - if time_difference < timedelta(hours=2): - logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行") - return + # # 计算时间差 + # time_difference = current_time - last_run_time - # 获取当前时间 - # current_time = datetime.now() - - # # 计算当天的结束时间(23:59:59) - # end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59) - - # # 计算时间差 - # time_difference = end_of_day - current_time - - # # 将时间差转换为秒数 - # time_difference_seconds = int(time_difference.total_seconds()) - - time_difference_seconds = today_seconds_remaining() - cache_task_run_time_logs.append({"runTime":int(time.time())}) - await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds) + # # 判断是否相差2小时 + # if time_difference < timedelta(hours=2): + # logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行") + # return + + # time_difference_seconds = today_seconds_remaining() + # cache_task_run_time_logs.append({"runTime":int(time.time())}) + # await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): @@ -636,6 +620,35 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, if is_wx_expection: logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。") return + + + cache_task_run_time_wxid_logs= await gewe_service.get_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms') + if cache_task_run_time_wxid_logs: + sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True) + last_run_time=sorted_tasks[0].get("runTime") + + if last_run_time > 1e12: # 毫秒级时间戳 + last_run_time = last_run_time / 1000 # 转换为秒 + + # 将时间戳转换为 datetime 对象 + last_run_time = datetime.fromtimestamp(last_run_time) + + # 获取当前时间 + current_time = datetime.now() + + # 计算时间差 + time_difference = current_time - last_run_time + + # 判断是否相差2小时 + if time_difference < timedelta(hours=2): + logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行") + return + + time_difference_seconds = today_seconds_remaining() + cache_task_run_time_wxid_logs.append({"runTime":int(time.time())}) + await gewe_service.save_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_wxid_logs,time_difference_seconds) + + c = await gewe_service.get_wxchat_config_from_cache_async(wxid) contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)