diff --git a/tasks.py b/tasks.py index e7514fa..6a605ca 100644 --- a/tasks.py +++ b/tasks.py @@ -261,292 +261,9 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): -# @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) -# def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): -# ''' -# 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。 -# 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。 -# ''' -# async def task(): -# try: -# now = datetime.now() -# if now.hour < 8: -# logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") -# return - -# logger.info('定时群成员定时添好友任务开始') -# redis_service = RedisService() -# await redis_service.init(**redis_config) - -# gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) - -# KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers'] -# KAFKA_TOPIC=kafka_config['topic'] -# KAFKA_GROUP_ID=kafka_config['group_id'] - -# kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID) -# await kafka_service.start_producer() - -# global_config=await gewe_service.get_global_config_from_cache_async() -# scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{}) - -# oneday_add_contacts_total=90 -# once_add_contacts_total=30 -# #oneday_times=3 - -# if scheduled_task_add_contacts_from_chatrooms_config: -# oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90) -# once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30) -# #oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) - -# # cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms') -# # if cache_task_run_time_logs: -# # sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True) -# # last_run_time=sorted_tasks[0].get("runTime") - -# # if last_run_time > 1e12: # 毫秒级时间戳 -# # last_run_time = last_run_time / 1000 # 转换为秒 - -# # # 将时间戳转换为 datetime 对象 -# # last_run_time = datetime.fromtimestamp(last_run_time) - -# # # 获取当前时间 -# # current_time = datetime.now() - -# # # 计算时间差 -# # time_difference = current_time - last_run_time - -# # # 判断是否相差2小时 -# # if time_difference < timedelta(hours=2): -# # logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行") -# # return - -# # time_difference_seconds = today_seconds_remaining() -# # cache_task_run_time_logs.append({"runTime":int(time.time())}) -# # await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds) - -# login_keys = [] -# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): -# login_keys.append(key) - -# wixd_add_contacts_from_chatrooms_times = {} - -# for k in login_keys: -# r = await redis_service.get_hash(k) -# app_id = r.get("appId") -# token_id = r.get("tokenId") -# wxid = r.get("wxid") -# status = r.get('status') -# if status == '0': -# logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") -# continue - -# config=await gewe_service.get_wxchat_config_from_cache_async(wxid) -# validated_config = AgentConfig.model_validate(config) -# if not validated_config.agentEnabled: -# logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") -# continue - - -# # 判断是否过于频繁 -# is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend") -# if is_wx_expection: -# logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。") -# continue - - -# cache_task_run_time_wxid_logs= await gewe_service.get_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms') -# if cache_task_run_time_wxid_logs: -# sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True) -# last_run_time=sorted_tasks[0].get("runTime") - -# if last_run_time > 1e12: # 毫秒级时间戳 -# last_run_time = last_run_time / 1000 # 转换为秒 - -# # 将时间戳转换为 datetime 对象 -# last_run_time = datetime.fromtimestamp(last_run_time) - -# # 获取当前时间 -# current_time = datetime.now() - -# # 计算时间差 -# time_difference = current_time - last_run_time - -# # 判断是否相差2小时 -# if time_difference < timedelta(hours=2): -# logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行") -# continue - - -# cache_task_run_time_wxid_logs.append({"runTime":int(time.time())}) -# await gewe_service.save_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_wxid_logs,3600*2) - - - -# c = await gewe_service.get_wxchat_config_from_cache_async(wxid) -# contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) - -# contact_wxids = [c.get('userName') for c in contacts] -# chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) -# logger.info(f'{wxid} 定时群成员定时添好友任务开始') -# wixd_add_contacts_from_chatrooms_times[wxid] = 0 -# for chatroom_id in chatrooms: -# chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) -# chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) - -# chatroom_nickname = chatroom.get('nickName') -# chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) - -# admin_wxids = chatroom_member.get('adminWxid', []) - -# admin_wxids = chatroom_member.get('adminWxid') -# if admin_wxids is None: -# admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 - -# logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') -# contact_wxids_set = set(contact_wxids) -# # for admin_wxid in admin_wxids: -# # contact_wxids_set.add(admin_wxid) -# if admin_wxids: -# contact_wxids_set.update(set(admin_wxids)) -# if chatroom_owner_wxid is not None: -# contact_wxids_set.add(chatroom_owner_wxid) - -# contact_wxids_set.add(wxid) - -# # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) -# # contact_wxids_set.update(set(unavailable_wixds)) - -# # chatroom_member_list = chatroom.get('memberList', []) - - -# unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) -# if unavailable_wixds: -# contact_wxids_set.update(set(unavailable_wixds)) - -# chatroom_member_list = chatroom.get('memberList', []) -# if chatroom_member_list is None: -# chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 -# elif not isinstance(chatroom_member_list, list): -# chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 - - -# remaining_chatroom_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] - -# nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) - -# if not remaining_chatroom_members: -# logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请') -# # 任务状态推送到kafka -# k_message=wx_add_contacts_from_chatroom_task_status_message(wxid,chatroom_id,2) -# await kafka_service.send_message_async(k_message) -# continue - -# logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroom_members]}') -# for m in remaining_chatroom_members: -# # 判断本次任务是否已经邀请了30个好友 -# if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total: -# logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请") -# continue -# # 判断当天群成员是否已经加了90个好友 -# is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total) -# if is_add_group_times: -# logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") -# continue - -# # 判断是否过于频繁 -# is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend") -# if is_wx_expection: -# logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}") -# continue - -# contact_wxid= m.get('wxid') -# member_nickname=m.get("nickName") -# group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid) - -# if group_add_contacts_history: -# sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) - -# # 已经邀请过两次,不再邀请 -# if len(sorted_history)==2: -# logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') -# continue - -# # 当天邀请过,不再邀请 -# if len(sorted_history) > 0: -# last_add_time = sorted_history[0].addTime -# def is_add_time_more_than_one_day(addTime: int) -> bool: -# """ -# 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 -# :param addTime: Unix 时间戳 -# :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False -# """ -# # 获取当前时间的时间戳 -# current_time = time.time() - -# # 计算时间戳差值 -# time_difference = abs(current_time - addTime) - -# # 检查是否大于 3600 × 24 秒 -# return time_difference > 3600 * 24 - -# is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) - -# if not is_more_than_one_day: -# logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请') -# continue - -# ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') -# if ret!=200: -# logger.warning(f'群好友邀请失败原因:{ret} {data}') -# if msg in '操作过于频繁,请稍后再试。': -# await gewe_service.save_wx_expection_async(wxid,"addGroupMemberAsFriend",msg,today_seconds_remaining()) -# logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。') -# continue - -# history=AddGroupContactsHistory.model_validate({ -# "chatroomId":chatroom_id, -# "wxid":wxid, -# "contactWixd":contact_wxid, -# "addTime":int(time.time()) -# }) -# await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) -# wixd_add_contacts_from_chatrooms_times[wxid]+=1 -# logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') -# # 推送到kafka - -# k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) -# await kafka_service.send_message_async(k_message) -# #await asyncio.sleep(random.uniform(1.5, 3)) -# await asyncio.sleep(random.uniform(30,60)) - -# # 任务状态推送到kafka -# task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id) -# k_message=wx_add_contacts_from_chatroom_task_status_message(wxid,chatroom_id,task_status) -# await kafka_service.send_message_async(k_message) -# # 下一个群 -# await asyncio.sleep(random.uniform(1.5, 3)) -# except Exception as e: -# # 获取当前的堆栈跟踪 -# tb = sys.exc_info()[2] -# # 为异常附加堆栈跟踪 -# e = e.with_traceback(tb) -# # 输出详细的错误信息 -# logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}") -# finally: -# await kafka_service.stop_producer() - -# loop = asyncio.get_event_loop() -# if loop.is_closed(): -# loop = asyncio.new_event_loop() -# asyncio.set_event_loop(loop) - -# loop.run_until_complete(task()) # 在现有事件循环中运行任务 - - -@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) -def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): +@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_new', bind=True, acks_late=True) +def scheduled_task_add_contacts_from_chatrooms_new(self, redis_config, kafka_config, gewe_config): ''' 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。 @@ -828,241 +545,263 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, loop.run_until_complete(task()) # 在现有事件循环中运行任务 +@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) +def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): -# @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) -# def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): + ''' + 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。 + 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。 + ''' + async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore): + async with semaphore: # 使用 Semaphore 控制并发 + try: + r = await redis_service.get_hash(k) + app_id = r.get("appId") + token_id = r.get("tokenId") + wxid = r.get("wxid") + status = r.get('status') -# ''' -# 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。 -# 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。 -# ''' -# async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total): -# r = await redis_service.get_hash(k) -# app_id = r.get("appId") -# token_id = r.get("tokenId") -# wxid = r.get("wxid") -# status = r.get('status') - -# if status == '0': -# logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") -# return - -# config = await gewe_service.get_wxchat_config_from_cache_async(wxid) -# validated_config = AgentConfig.model_validate(config) -# if not validated_config.agentEnabled: -# logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") -# return - -# # 判断是否过于频繁 -# is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") -# if is_wx_expection: -# logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。") -# return - -# cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms') -# if cache_task_run_time_wxid_logs: -# sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True) -# last_run_time = sorted_tasks[0].get("runTime") -# if last_run_time > 1e12: # 毫秒级时间戳 -# last_run_time = last_run_time / 1000 # 转换为秒 - -# # 将时间戳转换为 datetime 对象 -# last_run_time = datetime.fromtimestamp(last_run_time) - -# # 获取当前时间 -# current_time = datetime.now() - -# # 计算时间差 -# time_difference = current_time - last_run_time - -# # 判断是否相差2小时 -# if time_difference < timedelta(hours=2): -# logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行") -# return - -# cache_task_run_time_wxid_logs.append({"runTime": int(time.time())}) -# await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2) -# c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid) -# contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) - -# contact_wxids = [c.get('userName') for c in contacts] -# chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) -# logger.info(f'{wxid} 定时群成员定时添好友任务开始') -# wixd_add_contacts_from_chatrooms_times = {wxid: 0} -# for chatroom_id in chatrooms: -# chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) -# chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) - -# chatroom_nickname = chatroom.get('nickName') -# chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) -# admin_wxids = chatroom_member.get('adminWxid', []) -# admin_wxids = chatroom_member.get('adminWxid') -# if admin_wxids is None: -# admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 - -# logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') -# contact_wxids_set = set(contact_wxids) -# if admin_wxids: -# contact_wxids_set.update(set(admin_wxids)) -# if chatroom_owner_wxid is not None: -# contact_wxids_set.add(chatroom_owner_wxid) - -# contact_wxids_set.add(wxid) -# unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id) -# if unavailable_wixds: -# contact_wxids_set.update(set(unavailable_wixds)) - -# chatroom_member_list = chatroom.get('memberList', []) -# if chatroom_member_list is None: -# chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 -# elif not isinstance(chatroom_member_list, list): -# chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 - -# remaining_chatroom_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] - -# nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) -# if not remaining_chatroom_members: -# logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请') -# # 任务状态推送到kafka -# k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2) -# await kafka_service.send_message_async(k_message) -# return - -# logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroom_members]}') -# for m in remaining_chatroom_members: -# # 判断本次任务是否已经邀请了30个好友 -# if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total: -# logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请") -# return - -# # 判断当天群成员是否已经加了90个好友 -# is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total) -# if is_add_group_times: -# logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") -# return - -# # 判断是否过于频繁 -# is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") -# if is_wx_expection: -# logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}") -# return - -# contact_wxid = m.get('wxid') -# member_nickname = m.get("nickName") -# group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid) - -# if group_add_contacts_history: -# sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) - -# # 已经邀请过两次,不再邀请 -# if len(sorted_history) == 2: -# logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') -# return - -# # 当天邀请过,不再邀请 -# if len(sorted_history) > 0: -# last_add_time = sorted_history[0].addTime - -# def is_add_time_more_than_one_day(addTime: int) -> bool: -# """ -# 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 -# :param addTime: Unix 时间戳 -# :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False -# """ -# # 获取当前时间的时间戳 -# current_time = time.time() - -# # 计算时间戳差值 -# time_difference = abs(current_time - addTime) - -# # 检查是否大于 3600 × 24 秒 -# return time_difference > 3600 * 24 - -# is_more_than_one_day = is_add_time_more_than_one_day(last_add_time) -# if not is_more_than_one_day: -# logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请') -# return - -# ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), -# f'我是群聊"{chatroom_nickname}"群的{nickname}') -# if ret != 200: -# logger.warning(f'群好友邀请失败原因:{ret} {msg} {data}') -# if '操作过于频繁' in data.get('msg'): -# await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining()) -# logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。') -# return + if status == '0': + logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") + return + + config = await gewe_service.get_wxchat_config_from_cache_async(wxid) + validated_config = AgentConfig.model_validate(config) + if not validated_config.agentEnabled: + logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") + return + + # 判断是否过于频繁 + is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") + if is_wx_expection: + logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。") + return + + cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms') + if cache_task_run_time_wxid_logs: + sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True) + last_run_time = sorted_tasks[0].get("runTime") + if last_run_time > 1e12: # 毫秒级时间戳 + last_run_time = last_run_time / 1000 # 转换为秒 + + # 将时间戳转换为 datetime 对象 + last_run_time = datetime.fromtimestamp(last_run_time) + + # 获取当前时间 + current_time = datetime.now() + + # 计算时间差 + time_difference = current_time - last_run_time + + # 判断是否相差2小时 + if time_difference < timedelta(hours=2): + logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行") + return + + cache_task_run_time_wxid_logs.append({"runTime": int(time.time())}) + await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2) + c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid) + contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) + + contact_wxids = [c.get('userName') for c in contacts] + chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) + logger.info(f'{wxid} 定时群成员定时添好友任务开始') + wixd_add_contacts_from_chatrooms_times = {wxid: 0} + for chatroom_id in chatrooms: + chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) + chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) + + chatroom_nickname = chatroom.get('nickName') + chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) + admin_wxids = chatroom_member.get('adminWxid', []) + admin_wxids = chatroom_member.get('adminWxid') + if admin_wxids is None: + admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 + + logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') + contact_wxids_set = set(contact_wxids) + if admin_wxids: + contact_wxids_set.update(set(admin_wxids)) + if chatroom_owner_wxid is not None: + contact_wxids_set.add(chatroom_owner_wxid) + + contact_wxids_set.add(wxid) + unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id) + if unavailable_wixds: + contact_wxids_set.update(set(unavailable_wixds)) + + chatroom_member_list = chatroom.get('memberList', []) + if chatroom_member_list is None: + chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 + elif not isinstance(chatroom_member_list, list): + chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 + + remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] + + nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) + if not remaining_chatroot_members: + logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群里没有好友可以邀请') + # 任务状态推送到kafka + k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2) + await kafka_service.send_message_async(k_message) + continue -# history = AddGroupContactsHistory.model_validate({ -# "chatroomId": chatroom_id, -# "wxid": wxid, -# "contactWixd": contact_wxid, -# "addTime": int(time.time()) -# }) -# await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history) -# wixd_add_contacts_from_chatrooms_times[wxid] += 1 -# logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') -# # 推送到kafka -# k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime) -# await kafka_service.send_message_async(k_message) -# # await asyncio.sleep(random.uniform(1.5, 3)) -# await asyncio.sleep(random.uniform(30, 60)) -# # 任务状态推送到kafka -# task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id) -# wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status) -# await kafka_service.send_message_async(k_message) -# # 下一个群 -# await asyncio.sleep(random.uniform(1.5, 3)) + logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') + for m in remaining_chatroot_members: + # 判断本次任务是否已经邀请了30个好友 + if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total: + logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请") + return -# async def task(): -# try: -# now = datetime.now() -# if now.hour < 8: -# logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") -# return + # 判断当天群成员是否已经加了90个好友 + is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total) + if is_add_group_times: + logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") + return -# logger.info('定时群成员定时添好友任务开始') -# redis_service = RedisService() -# await redis_service.init(**redis_config) -# gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url']) -# KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers'] -# KAFKA_TOPIC = kafka_config['topic'] -# KAFKA_GROUP_ID = kafka_config['group_id'] - -# kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID) -# await kafka_service.start_producer() -# global_config = await gewe_service.get_global_config_from_cache_async() -# scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {}) - -# oneday_add_contacts_total = 90 -# once_add_contacts_total = 30 -# # oneday_times=3 -# if scheduled_task_add_contacts_from_chatrooms_config: -# oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90) -# once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30) -# # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) + # 判断是否过于频繁 + is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") + if is_wx_expection: + logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}") + return + + contact_wxid = m.get('wxid') + member_nickname = m.get("nickName") + group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid) -# login_keys = [] -# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): -# login_keys.append(key) + if group_add_contacts_history: + sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) -# # 使用 asyncio.gather 并发处理每个 login_key -# await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total) for k in login_keys]) + # 已经邀请过两次,不再邀请 + if len(sorted_history) == 2: + logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') + continue + + # 24小时邀请过,不再邀请 + if len(sorted_history) > 0: + last_add_time = sorted_history[0].addTime -# except Exception as e: -# # 获取当前的堆栈跟踪 -# tb = sys.exc_info()[2] -# # 为异常附加堆栈跟踪 -# e = e.with_traceback(tb) -# # 输出详细的错误信息 -# logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}") -# finally: -# await kafka_service.stop_producer() + def is_add_time_more_than_one_day(addTime: int) -> bool: + """ + 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 + :param addTime: Unix 时间戳 + :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False + """ + # 获取当前时间的时间戳 + current_time = time.time() -# loop = asyncio.get_event_loop() -# if loop.is_closed(): -# loop = asyncio.new_event_loop() -# asyncio.set_event_loop(loop) -# loop.run_until_complete(task()) # 在现有事件循环中运行任务 + # 计算时间戳差值 + time_difference = abs(current_time - addTime) + + # 检查是否大于 3600 × 24 秒 + return time_difference > 3600 * 24 + + is_more_than_one_day = is_add_time_more_than_one_day(last_add_time) + if not is_more_than_one_day: + logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经24小时邀请,不再邀请') + continue + + ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), + f'我是群聊"{chatroom_nickname}"群的{nickname}') + + history = AddGroupContactsHistory.model_validate({ + "chatroomId": chatroom_id, + "wxid": wxid, + "contactWixd": contact_wxid, + "addTime": int(time.time()) + }) + + if ret != 200: + logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群好友 {member_nickname}-{contact_wxid} 邀请失败原因:{ret} {msg} {data}') + + if data and '操作过于频繁' in data.get('msg', ''): + await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining()) + await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history) + logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。当天不再处理该号群好友邀请任务') + return + + + await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history) + wixd_add_contacts_from_chatrooms_times[wxid] += 1 + logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg} \n {data} \n 当前已邀请好友数:{wixd_add_contacts_from_chatrooms_times[wxid]}') + # 推送到kafka + k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime) + await kafka_service.send_message_async(k_message) + # await asyncio.sleep(random.uniform(1.5, 3)) + # await asyncio.sleep(random.uniform(30, 60)) + await asyncio.sleep(random.uniform(270,300)) + # 任务状态推送到kafka + task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id) + k_message=wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status) + await kafka_service.send_message_async(k_message) + # 下一个群 + await asyncio.sleep(random.uniform(1.5, 3)) + + logger.info(f'{wxid} 定时群成员定时添好友任务完成,本次共邀请好友{wixd_add_contacts_from_chatrooms_times[wxid]}个') + except Exception as e: + # 获取当前的堆栈跟踪 + tb = sys.exc_info()[2] + # 为异常附加堆栈跟踪 + e = e.with_traceback(tb) + # 输出详细的错误信息 + logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}") + + async def task(): + try: + now = datetime.now() + # if 10> now.hour < 8: + # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") + # return + + if now.hour < 8 or now.hour > 22: + logger.info(f"定时群成员定时添好友任务不启动, 当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},不在8点到22点之间") + return + + logger.info('定时群成员定时添好友任务开始') + redis_service = RedisService() + await redis_service.init(**redis_config) + gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url']) + KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers'] + KAFKA_TOPIC = kafka_config['topic'] + KAFKA_GROUP_ID = kafka_config['group_id'] + + kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID) + await kafka_service.start_producer() + global_config = await gewe_service.get_global_config_from_cache_async() + scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {}) + + oneday_add_contacts_total = 90 + once_add_contacts_total = 30 + # oneday_times=3 + if scheduled_task_add_contacts_from_chatrooms_config: + oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90) + once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30) + # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) + + login_keys = [] + async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): + login_keys.append(key) + + # 设置 Semaphore,限制并发任务数量 + semaphore = asyncio.Semaphore(10) # 例如,限制同时运行的任务数量为 10 + # 使用 asyncio.gather 并发处理每个 login_key + await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore) for k in login_keys]) + + except Exception as e: + # 获取当前的堆栈跟踪 + tb = sys.exc_info()[2] + # 为异常附加堆栈跟踪 + e = e.with_traceback(tb) + # 输出详细的错误信息 + logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}") + finally: + await kafka_service.stop_producer() + + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(task()) # 在现有事件循环中运行任务