From 8b101269b63573c596c65f8ee023f225ce2c2e53 Mon Sep 17 00:00:00 2001 From: H Vs Date: Fri, 11 Apr 2025 18:23:24 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- celery_app.py | 4 +- tasks.py | 241 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 242 insertions(+), 3 deletions(-) diff --git a/celery_app.py b/celery_app.py index 5753358..a885ad9 100644 --- a/celery_app.py +++ b/celery_app.py @@ -102,8 +102,8 @@ for task_id, task_name, schedule_obj, task_args in tasks_schedule: print(f"已注册 `{task_name}` 任务,将在每小时的第 {random_minute} 分钟执行") logger.info(f"已注册 `{task_name}` 任务,将在每小时的第 {random_minute} 分钟执行") else: - print('scheduled_task_sync_wx_info 定时任务执行成功!') - logger.info(f"scheduled_task_sync_wx_info 定时任务执行成功!") + print('scheduled_task_sync_wx_info 定时任务载入成功!') + logger.info(f"scheduled_task_sync_wx_info 定时任务执行载入成功!") def setup_schedule(): # 初始化第一次执行(例如:5秒后执行) diff --git a/tasks.py b/tasks.py index c82f692..ff2c255 100644 --- a/tasks.py +++ b/tasks.py @@ -704,7 +704,7 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, f'我是群聊"{chatroom_nickname}"群的{nickname}') if ret != 200: logger.warning(f'群好友邀请失败原因:{ret} {msg} {data}') - if '操作过于频繁' in msg: + if '操作过于频繁' in data.get('msg'): await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining()) logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。') return @@ -785,6 +785,245 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, +# @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) +# def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): + +# ''' +# 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。 +# 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。 +# ''' +# async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total): +# r = await redis_service.get_hash(k) +# app_id = r.get("appId") +# token_id = r.get("tokenId") +# wxid = r.get("wxid") +# status = r.get('status') + +# if status == '0': +# logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") +# return + +# config = await gewe_service.get_wxchat_config_from_cache_async(wxid) +# validated_config = AgentConfig.model_validate(config) +# if not validated_config.agentEnabled: +# logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") +# return + +# # 判断是否过于频繁 +# is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") +# if is_wx_expection: +# logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。") +# return + +# cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms') +# if cache_task_run_time_wxid_logs: +# sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True) +# last_run_time = sorted_tasks[0].get("runTime") +# if last_run_time > 1e12: # 毫秒级时间戳 +# last_run_time = last_run_time / 1000 # 转换为秒 + +# # 将时间戳转换为 datetime 对象 +# last_run_time = datetime.fromtimestamp(last_run_time) + +# # 获取当前时间 +# current_time = datetime.now() + +# # 计算时间差 +# time_difference = current_time - last_run_time + +# # 判断是否相差2小时 +# if time_difference < timedelta(hours=2): +# logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行") +# return + +# cache_task_run_time_wxid_logs.append({"runTime": int(time.time())}) +# await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2) +# c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid) +# contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) + +# contact_wxids = [c.get('userName') for c in contacts] +# chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) +# logger.info(f'{wxid} 定时群成员定时添好友任务开始') +# wixd_add_contacts_from_chatrooms_times = {wxid: 0} +# for chatroom_id in chatrooms: +# chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) +# chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) + +# chatroom_nickname = chatroom.get('nickName') +# chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) +# admin_wxids = chatroom_member.get('adminWxid', []) +# admin_wxids = chatroom_member.get('adminWxid') +# if admin_wxids is None: +# admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 + +# logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') +# contact_wxids_set = set(contact_wxids) +# if admin_wxids: +# contact_wxids_set.update(set(admin_wxids)) +# if chatroom_owner_wxid is not None: +# contact_wxids_set.add(chatroom_owner_wxid) + +# contact_wxids_set.add(wxid) +# unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id) +# if unavailable_wixds: +# contact_wxids_set.update(set(unavailable_wixds)) + +# chatroom_member_list = chatroom.get('memberList', []) +# if chatroom_member_list is None: +# chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 +# elif not isinstance(chatroom_member_list, list): +# chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 + +# remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] + +# nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) +# if not remaining_chatroot_members: +# logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请') +# # 任务状态推送到kafka +# k_message = wx_add_contacts_from_chatroom_task_status(wxid, chatroom_id, 2) +# await kafka_service.send_message_async(k_message) +# return + +# logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') +# for m in remaining_chatroot_members: +# # 判断本次任务是否已经邀请了30个好友 +# if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total: +# logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请") +# return + +# # 判断当天群成员是否已经加了90个好友 +# is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total) +# if is_add_group_times: +# logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") +# return + +# # 判断是否过于频繁 +# is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") +# if is_wx_expection: +# logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}") +# return + +# contact_wxid = m.get('wxid') +# member_nickname = m.get("nickName") +# group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid) + +# if group_add_contacts_history: +# sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) + +# # 已经邀请过两次,不再邀请 +# if len(sorted_history) == 2: +# logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') +# return + +# # 当天邀请过,不再邀请 +# if len(sorted_history) > 0: +# last_add_time = sorted_history[0].addTime + +# def is_add_time_more_than_one_day(addTime: int) -> bool: +# """ +# 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 +# :param addTime: Unix 时间戳 +# :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False +# """ +# # 获取当前时间的时间戳 +# current_time = time.time() + +# # 计算时间戳差值 +# time_difference = abs(current_time - addTime) + +# # 检查是否大于 3600 × 24 秒 +# return time_difference > 3600 * 24 + +# is_more_than_one_day = is_add_time_more_than_one_day(last_add_time) +# if not is_more_than_one_day: +# logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请') +# return + +# ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), +# f'我是群聊"{chatroom_nickname}"群的{nickname}') +# if ret != 200: +# logger.warning(f'群好友邀请失败原因:{ret} {msg} {data}') +# if '操作过于频繁' in data.get('msg'): +# await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining()) +# logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。') +# return + +# history = AddGroupContactsHistory.model_validate({ +# "chatroomId": chatroom_id, +# "wxid": wxid, +# "contactWixd": contact_wxid, +# "addTime": int(time.time()) +# }) +# await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history) +# wixd_add_contacts_from_chatrooms_times[wxid] += 1 +# logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') +# # 推送到kafka +# k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime) +# await kafka_service.send_message_async(k_message) +# # await asyncio.sleep(random.uniform(1.5, 3)) +# await asyncio.sleep(random.uniform(30, 60)) +# # 任务状态推送到kafka +# task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id) +# wx_add_contacts_from_chatroom_task_status(wxid, chatroom_id, task_status) +# await kafka_service.send_message_async(k_message) +# # 下一个群 +# await asyncio.sleep(random.uniform(1.5, 3)) + +# async def task(): +# try: +# now = datetime.now() +# if now.hour < 8: +# logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") +# return + +# logger.info('定时群成员定时添好友任务开始') +# redis_service = RedisService() +# await redis_service.init(**redis_config) +# gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url']) +# KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers'] +# KAFKA_TOPIC = kafka_config['topic'] +# KAFKA_GROUP_ID = kafka_config['group_id'] + +# kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID) +# await kafka_service.start_producer() +# global_config = await gewe_service.get_global_config_from_cache_async() +# scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {}) + +# oneday_add_contacts_total = 90 +# once_add_contacts_total = 30 +# # oneday_times=3 +# if scheduled_task_add_contacts_from_chatrooms_config: +# oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90) +# once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30) +# # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) + +# login_keys = [] +# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): +# login_keys.append(key) + +# # 使用 asyncio.gather 并发处理每个 login_key +# await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total) for k in login_keys]) + +# except Exception as e: +# # 获取当前的堆栈跟踪 +# tb = sys.exc_info()[2] +# # 为异常附加堆栈跟踪 +# e = e.with_traceback(tb) +# # 输出详细的错误信息 +# logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}") +# finally: +# await kafka_service.stop_producer() + +# loop = asyncio.get_event_loop() +# if loop.is_closed(): +# loop = asyncio.new_event_loop() +# asyncio.set_event_loop(loop) +# loop.run_until_complete(task()) # 在现有事件循环中运行任务 + + + + + REDIS_KEY_PATTERN = "friend_add_limit:{date}" REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"