Browse Source

调整

develop
H Vs 5 days ago
parent
commit
1d641ebf20
1 changed files with 174 additions and 165 deletions
  1. +174
    -165
      tasks.py

+ 174
- 165
tasks.py View File

@@ -554,187 +554,196 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
'''
async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore):
async with semaphore: # 使用 Semaphore 控制并发
r = await redis_service.get_hash(k)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')

if status == '0':
logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
return
try:
r = await redis_service.get_hash(k)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')

config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
validated_config = AgentConfig.model_validate(config)
if not validated_config.agentEnabled:
logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
return
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
return

# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
return
config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
validated_config = AgentConfig.model_validate(config)
if not validated_config.agentEnabled:
logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
return

cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
if cache_task_run_time_wxid_logs:
sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
last_run_time = sorted_tasks[0].get("runTime")
if last_run_time > 1e12: # 毫秒级时间戳
last_run_time = last_run_time / 1000 # 转换为秒
# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
return

# 将时间戳转换为 datetime 对象
last_run_time = datetime.fromtimestamp(last_run_time)
cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
if cache_task_run_time_wxid_logs:
sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
last_run_time = sorted_tasks[0].get("runTime")
if last_run_time > 1e12: # 毫秒级时间戳
last_run_time = last_run_time / 1000 # 转换为秒

# 获取当前时间
current_time = datetime.now()
# 将时间戳转换为 datetime 对象
last_run_time = datetime.fromtimestamp(last_run_time)

# 计算时间差
time_difference = current_time - last_run_time
# 获取当前时间
current_time = datetime.now()

# 判断是否相差2小时
if time_difference < timedelta(hours=2):
logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
return
# 计算时间差
time_difference = current_time - last_run_time

cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)

contact_wxids = [c.get('userName') for c in contacts]
chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
logger.info(f'{wxid} 定时群成员定时添好友任务开始')
wixd_add_contacts_from_chatrooms_times = {wxid: 0}
for chatroom_id in chatrooms:
chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)

chatroom_nickname = chatroom.get('nickName')
chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
admin_wxids = chatroom_member.get('adminWxid', [])
admin_wxids = chatroom_member.get('adminWxid')
if admin_wxids is None:
admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表

logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
contact_wxids_set = set(contact_wxids)
if admin_wxids:
contact_wxids_set.update(set(admin_wxids))
if chatroom_owner_wxid is not None:
contact_wxids_set.add(chatroom_owner_wxid)

contact_wxids_set.add(wxid)
unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
if unavailable_wixds:
contact_wxids_set.update(set(unavailable_wixds))

chatroom_member_list = chatroom.get('memberList', [])
if chatroom_member_list is None:
chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
elif not isinstance(chatroom_member_list, list):
chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表

remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]

nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
if not remaining_chatroot_members:
logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群里没有好友可以邀请')
# 任务状态推送到kafka
k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2)
await kafka_service.send_message_async(k_message)
continue
logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
for m in remaining_chatroot_members:
# 判断本次任务是否已经邀请了30个好友
if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
return

# 判断当天群成员是否已经加了90个好友
is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
if is_add_group_times:
logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
# 判断是否相差2小时
if time_difference < timedelta(hours=2):
logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
return

# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
return
cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)

contact_wxids = [c.get('userName') for c in contacts]
chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
logger.info(f'{wxid} 定时群成员定时添好友任务开始')
wixd_add_contacts_from_chatrooms_times = {wxid: 0}
for chatroom_id in chatrooms:
chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)

chatroom_nickname = chatroom.get('nickName')
chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
admin_wxids = chatroom_member.get('adminWxid', [])
admin_wxids = chatroom_member.get('adminWxid')
if admin_wxids is None:
admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表

logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
contact_wxids_set = set(contact_wxids)
if admin_wxids:
contact_wxids_set.update(set(admin_wxids))
if chatroom_owner_wxid is not None:
contact_wxids_set.add(chatroom_owner_wxid)

contact_wxids_set.add(wxid)
unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
if unavailable_wixds:
contact_wxids_set.update(set(unavailable_wixds))

chatroom_member_list = chatroom.get('memberList', [])
if chatroom_member_list is None:
chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
elif not isinstance(chatroom_member_list, list):
chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表

remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]

nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
if not remaining_chatroot_members:
logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群里没有好友可以邀请')
# 任务状态推送到kafka
k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2)
await kafka_service.send_message_async(k_message)
continue
contact_wxid = m.get('wxid')
member_nickname = m.get("nickName")
group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)
logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
for m in remaining_chatroot_members:
# 判断本次任务是否已经邀请了30个好友
if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
return

# 判断当天群成员是否已经加了90个好友
is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
if is_add_group_times:
logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
return

# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
return
contact_wxid = m.get('wxid')
member_nickname = m.get("nickName")
group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)

if group_add_contacts_history:
sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
if group_add_contacts_history:
sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)

# 已经邀请过两次,不再邀请
if len(sorted_history) == 2:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
continue
# 24小时邀请过,不再邀请
if len(sorted_history) > 0:
last_add_time = sorted_history[0].addTime

def is_add_time_more_than_one_day(addTime: int) -> bool:
"""
判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
:param addTime: Unix 时间戳
:return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
"""
# 获取当前时间的时间戳
current_time = time.time()

# 计算时间戳差值
time_difference = abs(current_time - addTime)

# 检查是否大于 3600 × 24 秒
return time_difference > 3600 * 24

is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
if not is_more_than_one_day:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经24小时邀请,不再邀请')
continue

ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
f'我是群聊"{chatroom_nickname}"群的{nickname}')
history = AddGroupContactsHistory.model_validate({
"chatroomId": chatroom_id,
"wxid": wxid,
"contactWixd": contact_wxid,
"addTime": int(time.time())
})

if ret != 200:
logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群好友 {member_nickname}-{contact_wxid} 邀请失败原因:{ret} {msg} {data}')
if '操作过于频繁' in data.get('msg'):
await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。当天不再处理该号群好友邀请任务')
return
# 已经邀请过两次,不再邀请
if len(sorted_history) == 2:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
continue
# 24小时邀请过,不再邀请
if len(sorted_history) > 0:
last_add_time = sorted_history[0].addTime

def is_add_time_more_than_one_day(addTime: int) -> bool:
"""
判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
:param addTime: Unix 时间戳
:return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
"""
# 获取当前时间的时间戳
current_time = time.time()

# 计算时间戳差值
time_difference = abs(current_time - addTime)

# 检查是否大于 3600 × 24 秒
return time_difference > 3600 * 24

is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
if not is_more_than_one_day:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经24小时邀请,不再邀请')
continue

ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
f'我是群聊"{chatroom_nickname}"群的{nickname}')
history = AddGroupContactsHistory.model_validate({
"chatroomId": chatroom_id,
"wxid": wxid,
"contactWixd": contact_wxid,
"addTime": int(time.time())
})

if ret != 200:
logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群好友 {member_nickname}-{contact_wxid} 邀请失败原因:{ret} {msg} {data}')
if not data and '操作过于频繁' in data.get('msg', ''):
await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。当天不再处理该号群好友邀请任务')
return

await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
wixd_add_contacts_from_chatrooms_times[wxid] += 1
logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg} \n {data} \n 当前已邀请好友数:{wixd_add_contacts_from_chatrooms_times[wxid]}')
# 推送到kafka
k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
wixd_add_contacts_from_chatrooms_times[wxid] += 1
logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg} \n {data} \n 当前已邀请好友数:{wixd_add_contacts_from_chatrooms_times[wxid]}')
# 推送到kafka
k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
await kafka_service.send_message_async(k_message)
# await asyncio.sleep(random.uniform(1.5, 3))
# await asyncio.sleep(random.uniform(30, 60))
await asyncio.sleep(random.uniform(270,300))
# 任务状态推送到kafka
task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
k_message=wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status)
await kafka_service.send_message_async(k_message)
# await asyncio.sleep(random.uniform(1.5, 3))
# await asyncio.sleep(random.uniform(30, 60))
await asyncio.sleep(random.uniform(270,300))
# 任务状态推送到kafka
task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
k_message=wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status)
await kafka_service.send_message_async(k_message)
# 下一个群
await asyncio.sleep(random.uniform(1.5, 3))
# 下一个群
await asyncio.sleep(random.uniform(1.5, 3))
except Exception as e:
# 获取当前的堆栈跟踪
tb = sys.exc_info()[2]
# 为异常附加堆栈跟踪
e = e.with_traceback(tb)
# 输出详细的错误信息
logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
async def task():
try:
now = datetime.now()


Loading…
Cancel
Save