瀏覽代碼

调整群加好友规则

1257
H Vs 1 周之前
父節點
當前提交
4a9819254d
共有 2 個檔案被更改,包括 55 行新增9 行删除
  1. +12
    -0
      services/gewe_service.py
  2. +43
    -9
      tasks.py

+ 12
- 0
services/gewe_service.py 查看文件

@@ -1431,6 +1431,9 @@ class GeWeService:
# return False # return False


async def save_task_run_time_async(self,task_name,log:list,expire_time=None): async def save_task_run_time_async(self,task_name,log:list,expire_time=None):
'''
任务运行锁
'''
hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}"
await self.redis_service.set_hash(hash_key,{"data": json.dumps(log, ensure_ascii=False)}, expire_time) await self.redis_service.set_hash(hash_key,{"data": json.dumps(log, ensure_ascii=False)}, expire_time)
@@ -1438,6 +1441,15 @@ class GeWeService:
hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}"
cache= await self.redis_service.get_hash_field(hash_key,"data") cache= await self.redis_service.get_hash_field(hash_key,"data")
return json.loads(cache) if cache else [] return json.loads(cache) if cache else []
async def save_wx_expection_async(self,wxid,api_name,exception:str,expire_time=None):
hash_key = f"__AI_OPS_WX__:WX_EXCEPTION:{wxid}:{api_name}"
await self.redis_service.set_hash(hash_key,{"data": json.dumps(exception, ensure_ascii=False)}, expire_time)

async def get_wx_expection_async(self,wxid,api_name:str)->str:
hash_key = f"__AI_OPS_WX__:WX_EXCEPTION:{wxid}:{api_name}"
cache= await self.redis_service.get_hash_field(hash_key,"data")
return json.loads(cache) if cache else ""


async def wx_add_contacts_from_chatroom_task_status_async(self,wxid,chatroom_id)->int: async def wx_add_contacts_from_chatroom_task_status_async(self,wxid,chatroom_id)->int:
history_hash_key = f'__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}' history_hash_key = f'__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}'


+ 43
- 9
tasks.py 查看文件

@@ -593,16 +593,18 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
return return
# 获取当前时间 # 获取当前时间
current_time = datetime.now()
# current_time = datetime.now()


# 计算当天的结束时间(23:59:59)
end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)
# # 计算当天的结束时间(23:59:59)
# end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)


# 计算时间差
time_difference = end_of_day - current_time
# # 计算时间差
# time_difference = end_of_day - current_time


# 将时间差转换为秒数
time_difference_seconds = int(time_difference.total_seconds())
# # 将时间差转换为秒数
# time_difference_seconds = int(time_difference.total_seconds())

time_difference_seconds = today_seconds_remaining()
cache_task_run_time_logs.append({"runTime":int(time.time())}) cache_task_run_time_logs.append({"runTime":int(time.time())})
await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds) await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
@@ -628,6 +630,13 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
continue continue



# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
return

c = await gewe_service.get_wxchat_config_from_cache_async(wxid) c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
@@ -699,6 +708,12 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
if is_add_group_times: if is_add_group_times:
logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
return return
# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
return


contact_wxid= m.get('wxid') contact_wxid= m.get('wxid')
member_nickname=m.get("nickName") member_nickname=m.get("nickName")
@@ -731,7 +746,7 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)


if not is_more_than_one_day: if not is_more_than_one_day:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请')
continue continue




@@ -739,6 +754,10 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
if ret!=200: if ret!=200:
logger.warning(f'群好友邀请失败原因:{ret} {data}') logger.warning(f'群好友邀请失败原因:{ret} {data}')
if msg in '操作过于频繁,请稍后再试。':
await gewe_service.save_wx_expection_async(wxid,"addGroupMemberAsFriend",msg,today_seconds_remaining())
logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。')
continue


history=AddGroupContactsHistory.model_validate({ history=AddGroupContactsHistory.model_validate({
"chatroomId":chatroom_id, "chatroomId":chatroom_id,
@@ -753,7 +772,8 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,


k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
await kafka_service.send_message_async(k_message) await kafka_service.send_message_async(k_message)
await asyncio.sleep(random.uniform(1.5, 3))
#await asyncio.sleep(random.uniform(1.5, 3))
await asyncio.sleep(random.uniform(10,60))


# 任务状态推送到kafka # 任务状态推送到kafka
task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id) task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id)
@@ -863,6 +883,20 @@ def add_friends_task(self,redis_config):


loop.run_until_complete(task()) # 在现有事件循环中运行任务 loop.run_until_complete(task()) # 在现有事件循环中运行任务


def today_seconds_remaining()->int:
current_time = datetime.now()

# 计算当天的结束时间(23:59:59)
end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)

# 计算时间差
time_difference = end_of_day - current_time

# 将时间差转换为秒数
time_difference_seconds = int(time_difference.total_seconds())

return time_difference_seconds



@celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True) @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
def random_scheduled_task(self,): def random_scheduled_task(self,):


Loading…
取消
儲存