浏览代码

调整

1257
H Vs 1周前
父节点
当前提交
887156b7aa
共有 2 个文件被更改,包括 456 次插入226 次删除
  1. +1
    -1
      celery_app.py
  2. +455
    -225
      tasks.py

+ 1
- 1
celery_app.py 查看文件

@@ -39,7 +39,7 @@ elif environment == 'test':
scheduled_task_sync_wx_info_interval = 60*10
scheduled_task_add_contacts_from_chatrooms_interval = 60*11
else:
scheduled_task_sync_wx_info_interval = 10
scheduled_task_sync_wx_info_interval = 1000
scheduled_task_add_contacts_from_chatrooms_interval=6



+ 455
- 225
tasks.py 查看文件

@@ -258,279 +258,514 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
asyncio.set_event_loop(loop)
loop.run_until_complete(task()) # 在现有事件循环中运行任务

@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):

'''
关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
'''

async def task():
try:
now = datetime.now()
if now.hour < 8:
logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
return

# @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
# def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):

logger.info('定时群成员定时添好友任务开始')
redis_service = RedisService()
await redis_service.init(**redis_config)
# '''
# 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
# 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
# '''
# async def task():
# try:
# now = datetime.now()
# if now.hour < 8:
# logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
# return

gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
# logger.info('定时群成员定时添好友任务开始')
# redis_service = RedisService()
# await redis_service.init(**redis_config)

KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
KAFKA_TOPIC=kafka_config['topic']
KAFKA_GROUP_ID=kafka_config['group_id']
kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
await kafka_service.start_producer()
# gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])

global_config=await gewe_service.get_global_config_from_cache_async()
scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{})
# KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
# KAFKA_TOPIC=kafka_config['topic']
# KAFKA_GROUP_ID=kafka_config['group_id']
oneday_add_contacts_total=90
once_add_contacts_total=30
#oneday_times=3

if scheduled_task_add_contacts_from_chatrooms_config:
oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90)
once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
#oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)

# cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
# if cache_task_run_time_logs:
# sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
# last_run_time=sorted_tasks[0].get("runTime")
# kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
# await kafka_service.start_producer()

# if last_run_time > 1e12: # 毫秒级时间戳
# last_run_time = last_run_time / 1000 # 转换为秒
# global_config=await gewe_service.get_global_config_from_cache_async()
# scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{})
# oneday_add_contacts_total=90
# once_add_contacts_total=30
# #oneday_times=3

# if scheduled_task_add_contacts_from_chatrooms_config:
# oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90)
# once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
# #oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)

# # cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
# # if cache_task_run_time_logs:
# # sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
# # last_run_time=sorted_tasks[0].get("runTime")

# # if last_run_time > 1e12: # 毫秒级时间戳
# # last_run_time = last_run_time / 1000 # 转换为秒
# # 将时间戳转换为 datetime 对象
# last_run_time = datetime.fromtimestamp(last_run_time)
# # # 将时间戳转换为 datetime 对象
# # last_run_time = datetime.fromtimestamp(last_run_time)
# # 获取当前时间
# current_time = datetime.now()
# # # 获取当前时间
# # current_time = datetime.now()
# # 计算时间差
# time_difference = current_time - last_run_time
# # # 计算时间差
# # time_difference = current_time - last_run_time
# # 判断是否相差2小时
# if time_difference < timedelta(hours=2):
# logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
# return

# time_difference_seconds = today_seconds_remaining()
# cache_task_run_time_logs.append({"runTime":int(time.time())})
# await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
# # # 判断是否相差2小时
# # if time_difference < timedelta(hours=2):
# # logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
# # return
# # time_difference_seconds = today_seconds_remaining()
# # cache_task_run_time_logs.append({"runTime":int(time.time())})
# # await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key)
# login_keys = []
# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
# login_keys.append(key)

wixd_add_contacts_from_chatrooms_times = {}
# wixd_add_contacts_from_chatrooms_times = {}

for k in login_keys:
r = await redis_service.get_hash(k)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
continue
# for k in login_keys:
# r = await redis_service.get_hash(k)
# app_id = r.get("appId")
# token_id = r.get("tokenId")
# wxid = r.get("wxid")
# status = r.get('status')
# if status == '0':
# logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
# continue

config=await gewe_service.get_wxchat_config_from_cache_async(wxid)
validated_config = AgentConfig.model_validate(config)
if not validated_config.agentEnabled:
logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
continue
# config=await gewe_service.get_wxchat_config_from_cache_async(wxid)
# validated_config = AgentConfig.model_validate(config)
# if not validated_config.agentEnabled:
# logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
# continue


# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
continue
# # 判断是否过于频繁
# is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
# if is_wx_expection:
# logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
# continue

cache_task_run_time_wxid_logs= await gewe_service.get_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms')
if cache_task_run_time_wxid_logs:
sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
last_run_time=sorted_tasks[0].get("runTime")
# cache_task_run_time_wxid_logs= await gewe_service.get_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms')
# if cache_task_run_time_wxid_logs:
# sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
# last_run_time=sorted_tasks[0].get("runTime")

if last_run_time > 1e12: # 毫秒级时间戳
last_run_time = last_run_time / 1000 # 转换为秒
# if last_run_time > 1e12: # 毫秒级时间戳
# last_run_time = last_run_time / 1000 # 转换为秒
# 将时间戳转换为 datetime 对象
last_run_time = datetime.fromtimestamp(last_run_time)
# # 将时间戳转换为 datetime 对象
# last_run_time = datetime.fromtimestamp(last_run_time)
# 获取当前时间
current_time = datetime.now()
# # 获取当前时间
# current_time = datetime.now()
# 计算时间差
time_difference = current_time - last_run_time
# # 计算时间差
# time_difference = current_time - last_run_time
# 判断是否相差2小时
if time_difference < timedelta(hours=2):
logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
continue
# # 判断是否相差2小时
# if time_difference < timedelta(hours=2):
# logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
# continue


cache_task_run_time_wxid_logs.append({"runTime":int(time.time())})
await gewe_service.save_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_wxid_logs,3600*2)
# cache_task_run_time_wxid_logs.append({"runTime":int(time.time())})
# await gewe_service.save_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_wxid_logs,3600*2)


c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
# c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
# contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
contact_wxids = [c.get('userName') for c in contacts]
chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
logger.info(f'{wxid} 定时群成员定时添好友任务开始')
wixd_add_contacts_from_chatrooms_times[wxid] = 0
for chatroom_id in chatrooms:
chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
# contact_wxids = [c.get('userName') for c in contacts]
# chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
# logger.info(f'{wxid} 定时群成员定时添好友任务开始')
# wixd_add_contacts_from_chatrooms_times[wxid] = 0
# for chatroom_id in chatrooms:
# chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
# chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
chatroom_nickname = chatroom.get('nickName')
chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
# chatroom_nickname = chatroom.get('nickName')
# chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)

admin_wxids = chatroom_member.get('adminWxid', [])
# admin_wxids = chatroom_member.get('adminWxid', [])

admin_wxids = chatroom_member.get('adminWxid')
if admin_wxids is None:
admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
# admin_wxids = chatroom_member.get('adminWxid')
# if admin_wxids is None:
# admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
contact_wxids_set = set(contact_wxids)
# for admin_wxid in admin_wxids:
# contact_wxids_set.add(admin_wxid)
if admin_wxids:
contact_wxids_set.update(set(admin_wxids))
if chatroom_owner_wxid is not None:
contact_wxids_set.add(chatroom_owner_wxid)
# logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
# contact_wxids_set = set(contact_wxids)
# # for admin_wxid in admin_wxids:
# # contact_wxids_set.add(admin_wxid)
# if admin_wxids:
# contact_wxids_set.update(set(admin_wxids))
# if chatroom_owner_wxid is not None:
# contact_wxids_set.add(chatroom_owner_wxid)
contact_wxids_set.add(wxid)
# contact_wxids_set.add(wxid)

# unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
# contact_wxids_set.update(set(unavailable_wixds))
# # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
# # contact_wxids_set.update(set(unavailable_wixds))

# chatroom_member_list = chatroom.get('memberList', [])
# # chatroom_member_list = chatroom.get('memberList', [])


unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
if unavailable_wixds:
contact_wxids_set.update(set(unavailable_wixds))
# unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
# if unavailable_wixds:
# contact_wxids_set.update(set(unavailable_wixds))
chatroom_member_list = chatroom.get('memberList', [])
if chatroom_member_list is None:
chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
elif not isinstance(chatroom_member_list, list):
chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
# chatroom_member_list = chatroom.get('memberList', [])
# if chatroom_member_list is None:
# chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
# elif not isinstance(chatroom_member_list, list):
# chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表


remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
# remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
if not remaining_chatroot_members:
logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请')
# 任务状态推送到kafka
k_message=wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,2)
await kafka_service.send_message_async(k_message)
continue
logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
for m in remaining_chatroot_members:
# 判断本次任务是否已经邀请了30个好友
if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
continue
# 判断当天群成员是否已经加了90个好友
is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total)
if is_add_group_times:
logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
continue
# nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
# if not remaining_chatroot_members:
# logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请')
# # 任务状态推送到kafka
# k_message=wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,2)
# await kafka_service.send_message_async(k_message)
# continue
# logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
# for m in remaining_chatroot_members:
# # 判断本次任务是否已经邀请了30个好友
# if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
# logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
# continue
# # 判断当天群成员是否已经加了90个好友
# is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total)
# if is_add_group_times:
# logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
# continue
# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
continue
contact_wxid= m.get('wxid')
member_nickname=m.get("nickName")
group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
# # 判断是否过于频繁
# is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
# if is_wx_expection:
# logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
# continue
# contact_wxid= m.get('wxid')
# member_nickname=m.get("nickName")
# group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
if group_add_contacts_history:
sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
# if group_add_contacts_history:
# sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
# 已经邀请过两次,不再邀请
if len(sorted_history)==2:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
continue
# 当天邀请过,不再邀请
if len(sorted_history) > 0:
last_add_time = sorted_history[0].addTime
def is_add_time_more_than_one_day(addTime: int) -> bool:
"""
判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
:param addTime: Unix 时间戳
:return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
"""
# 获取当前时间的时间戳
current_time = time.time()
# # 已经邀请过两次,不再邀请
# if len(sorted_history)==2:
# logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
# continue
# # 当天邀请过,不再邀请
# if len(sorted_history) > 0:
# last_add_time = sorted_history[0].addTime
# def is_add_time_more_than_one_day(addTime: int) -> bool:
# """
# 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
# :param addTime: Unix 时间戳
# :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
# """
# # 获取当前时间的时间戳
# current_time = time.time()
# 计算时间戳差值
time_difference = abs(current_time - addTime)
# # 计算时间戳差值
# time_difference = abs(current_time - addTime)
# 检查是否大于 3600 × 24 秒
return time_difference > 3600 * 24
# # 检查是否大于 3600 × 24 秒
# return time_difference > 3600 * 24
is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)

if not is_more_than_one_day:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请')
continue

ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
if ret!=200:
logger.warning(f'群好友邀请失败原因:{ret} {data}')
if msg in '操作过于频繁,请稍后再试。':
await gewe_service.save_wx_expection_async(wxid,"addGroupMemberAsFriend",msg,today_seconds_remaining())
logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。')
continue

history=AddGroupContactsHistory.model_validate({
"chatroomId":chatroom_id,
"wxid":wxid,
"contactWixd":contact_wxid,
"addTime":int(time.time())
})
await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
wixd_add_contacts_from_chatrooms_times[wxid]+=1
logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
# 推送到kafka

k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
await kafka_service.send_message_async(k_message)
#await asyncio.sleep(random.uniform(1.5, 3))
await asyncio.sleep(random.uniform(30,60))

# 任务状态推送到kafka
task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id)
wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,task_status)
# is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)

# if not is_more_than_one_day:
# logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请')
# continue

# ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
# if ret!=200:
# logger.warning(f'群好友邀请失败原因:{ret} {data}')
# if msg in '操作过于频繁,请稍后再试。':
# await gewe_service.save_wx_expection_async(wxid,"addGroupMemberAsFriend",msg,today_seconds_remaining())
# logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。')
# continue

# history=AddGroupContactsHistory.model_validate({
# "chatroomId":chatroom_id,
# "wxid":wxid,
# "contactWixd":contact_wxid,
# "addTime":int(time.time())
# })
# await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
# wixd_add_contacts_from_chatrooms_times[wxid]+=1
# logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
# # 推送到kafka

# k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
# await kafka_service.send_message_async(k_message)
# #await asyncio.sleep(random.uniform(1.5, 3))
# await asyncio.sleep(random.uniform(30,60))

# # 任务状态推送到kafka
# task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id)
# wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,task_status)
# await kafka_service.send_message_async(k_message)
# # 下一个群
# await asyncio.sleep(random.uniform(1.5, 3))
# except Exception as e:
# # 获取当前的堆栈跟踪
# tb = sys.exc_info()[2]
# # 为异常附加堆栈跟踪
# e = e.with_traceback(tb)
# # 输出详细的错误信息
# logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
# finally:
# await kafka_service.stop_producer()
# loop = asyncio.get_event_loop()
# if loop.is_closed():
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)

# loop.run_until_complete(task()) # 在现有事件循环中运行任务


@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):

'''
关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
'''
async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore):
async with semaphore: # 使用 Semaphore 控制并发
r = await redis_service.get_hash(k)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')

if status == '0':
logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
return

config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
validated_config = AgentConfig.model_validate(config)
if not validated_config.agentEnabled:
logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
return

# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
return

cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
if cache_task_run_time_wxid_logs:
sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
last_run_time = sorted_tasks[0].get("runTime")
if last_run_time > 1e12: # 毫秒级时间戳
last_run_time = last_run_time / 1000 # 转换为秒

# 将时间戳转换为 datetime 对象
last_run_time = datetime.fromtimestamp(last_run_time)

# 获取当前时间
current_time = datetime.now()

# 计算时间差
time_difference = current_time - last_run_time

# 判断是否相差2小时
if time_difference < timedelta(hours=2):
logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
return

cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)

contact_wxids = [c.get('userName') for c in contacts]
chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
logger.info(f'{wxid} 定时群成员定时添好友任务开始')
wixd_add_contacts_from_chatrooms_times = {wxid: 0}
for chatroom_id in chatrooms:
chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)

chatroom_nickname = chatroom.get('nickName')
chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
admin_wxids = chatroom_member.get('adminWxid', [])
admin_wxids = chatroom_member.get('adminWxid')
if admin_wxids is None:
admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表

logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
contact_wxids_set = set(contact_wxids)
if admin_wxids:
contact_wxids_set.update(set(admin_wxids))
if chatroom_owner_wxid is not None:
contact_wxids_set.add(chatroom_owner_wxid)

contact_wxids_set.add(wxid)
unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
if unavailable_wixds:
contact_wxids_set.update(set(unavailable_wixds))

chatroom_member_list = chatroom.get('memberList', [])
if chatroom_member_list is None:
chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
elif not isinstance(chatroom_member_list, list):
chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表

remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]

nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
if not remaining_chatroot_members:
logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请')
# 任务状态推送到kafka
k_message = wx_add_contacts_from_chatroom_task_status(wxid, chatroom_id, 2)
await kafka_service.send_message_async(k_message)
return
logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
for m in remaining_chatroot_members:
# 判断本次任务是否已经邀请了30个好友
if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
return

# 判断当天群成员是否已经加了90个好友
is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
if is_add_group_times:
logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
return

# 判断是否过于频繁
is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
if is_wx_expection:
logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
return
contact_wxid = m.get('wxid')
member_nickname = m.get("nickName")
group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)

if group_add_contacts_history:
sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)

# 已经邀请过两次,不再邀请
if len(sorted_history) == 2:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
return
# 当天邀请过,不再邀请
if len(sorted_history) > 0:
last_add_time = sorted_history[0].addTime

def is_add_time_more_than_one_day(addTime: int) -> bool:
"""
判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
:param addTime: Unix 时间戳
:return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
"""
# 获取当前时间的时间戳
current_time = time.time()

# 计算时间戳差值
time_difference = abs(current_time - addTime)

# 检查是否大于 3600 × 24 秒
return time_difference > 3600 * 24

is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
if not is_more_than_one_day:
logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请')
return

ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
f'我是群聊"{chatroom_nickname}"群的{nickname}')
if ret != 200:
logger.warning(f'群好友邀请失败原因:{ret} {msg} {data}')
if '操作过于频繁' in msg:
await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。')
return
history = AddGroupContactsHistory.model_validate({
"chatroomId": chatroom_id,
"wxid": wxid,
"contactWixd": contact_wxid,
"addTime": int(time.time())
})
await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
wixd_add_contacts_from_chatrooms_times[wxid] += 1
logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
# 推送到kafka
k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
await kafka_service.send_message_async(k_message)
# 下一个群
await asyncio.sleep(random.uniform(1.5, 3))
# await asyncio.sleep(random.uniform(1.5, 3))
await asyncio.sleep(random.uniform(30, 60))
# 任务状态推送到kafka
task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
wx_add_contacts_from_chatroom_task_status(wxid, chatroom_id, task_status)
await kafka_service.send_message_async(k_message)
# 下一个群
await asyncio.sleep(random.uniform(1.5, 3))

async def task():
try:
now = datetime.now()
if now.hour < 8:
logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
return

# except Exception as e:
# logger.error(f"任务执行过程中发生异常: {e}")
logger.info('定时群成员定时添好友任务开始')
redis_service = RedisService()
await redis_service.init(**redis_config)
gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers']
KAFKA_TOPIC = kafka_config['topic']
KAFKA_GROUP_ID = kafka_config['group_id']

kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID)
await kafka_service.start_producer()
global_config = await gewe_service.get_global_config_from_cache_async()
scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {})

oneday_add_contacts_total = 90
once_add_contacts_total = 30
# oneday_times=3
if scheduled_task_add_contacts_from_chatrooms_config:
oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90)
once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30)
# oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)

login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key)

# 设置 Semaphore,限制并发任务数量
semaphore = asyncio.Semaphore(10) # 例如,限制同时运行的任务数量为 10
# 使用 asyncio.gather 并发处理每个 login_key
await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore) for k in login_keys])

except Exception as e:
# 获取当前的堆栈跟踪
@@ -539,18 +774,13 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
e = e.with_traceback(tb)
# 输出详细的错误信息
logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
# logger.error(f"异常类型: {type(e).__name__}")
# logger.error(f"异常信息: {str(e)}")
# logger.error(f"堆栈跟踪: {traceback.format_exc()}")
finally:
await kafka_service.stop_producer()

loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.run_until_complete(task()) # 在现有事件循环中运行任务




正在加载...
取消
保存