From 4a9819254d2ffa57d05fc399a4dc45466c085c89 Mon Sep 17 00:00:00 2001 From: H Vs Date: Thu, 10 Apr 2025 10:52:36 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BE=A4=E5=8A=A0=E5=A5=BD?= =?UTF-8?q?=E5=8F=8B=E8=A7=84=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/gewe_service.py | 12 ++++++++++ tasks.py | 52 +++++++++++++++++++++++++++++++++------- 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/services/gewe_service.py b/services/gewe_service.py index 4038b76..5fa7915 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -1431,6 +1431,9 @@ class GeWeService: # return False async def save_task_run_time_async(self,task_name,log:list,expire_time=None): + ''' + 任务运行锁 + ''' hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" await self.redis_service.set_hash(hash_key,{"data": json.dumps(log, ensure_ascii=False)}, expire_time) @@ -1438,6 +1441,15 @@ class GeWeService: hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" cache= await self.redis_service.get_hash_field(hash_key,"data") return json.loads(cache) if cache else [] + + async def save_wx_expection_async(self,wxid,api_name,exception:str,expire_time=None): + hash_key = f"__AI_OPS_WX__:WX_EXCEPTION:{wxid}:{api_name}" + await self.redis_service.set_hash(hash_key,{"data": json.dumps(exception, ensure_ascii=False)}, expire_time) + + async def get_wx_expection_async(self,wxid,api_name:str)->str: + hash_key = f"__AI_OPS_WX__:WX_EXCEPTION:{wxid}:{api_name}" + cache= await self.redis_service.get_hash_field(hash_key,"data") + return json.loads(cache) if cache else "" async def wx_add_contacts_from_chatroom_task_status_async(self,wxid,chatroom_id)->int: history_hash_key = f'__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}' diff --git a/tasks.py b/tasks.py index 838b98f..d912ede 100644 --- a/tasks.py +++ b/tasks.py @@ -593,16 +593,18 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, return # 获取当前时间 - current_time = datetime.now() + # current_time = datetime.now() - # 计算当天的结束时间(23:59:59) - end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59) + # # 计算当天的结束时间(23:59:59) + # end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59) - # 计算时间差 - time_difference = end_of_day - current_time + # # 计算时间差 + # time_difference = end_of_day - current_time - # 将时间差转换为秒数 - time_difference_seconds = int(time_difference.total_seconds()) + # # 将时间差转换为秒数 + # time_difference_seconds = int(time_difference.total_seconds()) + + time_difference_seconds = today_seconds_remaining() cache_task_run_time_logs.append({"runTime":int(time.time())}) await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds) @@ -628,6 +630,13 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") continue + + # 判断是否过于频繁 + is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend") + if is_wx_expection: + logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。") + return + c = await gewe_service.get_wxchat_config_from_cache_async(wxid) contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) @@ -699,6 +708,12 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, if is_add_group_times: logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") return + + # 判断是否过于频繁 + is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend") + if is_wx_expection: + logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}") + return contact_wxid= m.get('wxid') member_nickname=m.get("nickName") @@ -731,7 +746,7 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) if not is_more_than_one_day: - logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请') + logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请') continue @@ -739,6 +754,10 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') if ret!=200: logger.warning(f'群好友邀请失败原因:{ret} {data}') + if msg in '操作过于频繁,请稍后再试。': + await gewe_service.save_wx_expection_async(wxid,"addGroupMemberAsFriend",msg,today_seconds_remaining()) + logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。') + continue history=AddGroupContactsHistory.model_validate({ "chatroomId":chatroom_id, @@ -753,7 +772,8 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) await kafka_service.send_message_async(k_message) - await asyncio.sleep(random.uniform(1.5, 3)) + #await asyncio.sleep(random.uniform(1.5, 3)) + await asyncio.sleep(random.uniform(10,60)) # 任务状态推送到kafka task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id) @@ -863,6 +883,20 @@ def add_friends_task(self,redis_config): loop.run_until_complete(task()) # 在现有事件循环中运行任务 +def today_seconds_remaining()->int: + current_time = datetime.now() + + # 计算当天的结束时间(23:59:59) + end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59) + + # 计算时间差 + time_difference = end_of_day - current_time + + # 将时间差转换为秒数 + time_difference_seconds = int(time_difference.total_seconds()) + + return time_difference_seconds + @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True) def random_scheduled_task(self,):