diff --git a/celery_app.py b/celery_app.py index 7788fe4..8abd088 100644 --- a/celery_app.py +++ b/celery_app.py @@ -99,9 +99,11 @@ from common.log import logger # } scheduled_task_sync_wx_info_interval = 6000 +scheduled_task_add_contacts_from_chatrooms_interval=10 environment = os.environ.get('environment', 'default') if environment != 'default': scheduled_task_sync_wx_info_interval = 60*11 + scheduled_task_add_contacts_from_chatrooms_interval = 3600*24 # 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) @@ -142,6 +144,9 @@ tasks_schedule = [ # crontab(minute=random_minute, hour='*'), # 每小时在随机分钟执行 # # 如果想每天在随机时间执行,可以用: crontab(minute=random_minute, hour=random_hour) # [redis_config]), + ("redbeat:scheduled_task_add_contacts_from_chatrooms", "tasks.scheduled_task_add_contacts_from_chatrooms", + celery.schedules.schedule(timedelta(seconds=scheduled_task_add_contacts_from_chatrooms_interval)), + [redis_config, kafka_config, gewe_config]), ] # # 注册 RedBeat 任务 @@ -164,81 +169,20 @@ for task_id, task_name, schedule_obj, task_args in tasks_schedule: logger.info(f"scheduled_task_sync_wx_info 定时任务执行成功!") -# def setup_schedule(): -# # 初始化第一次执行(例如:15秒后执行) -# initial_run_in = 5 -# entry = RedBeatSchedulerEntry( -# name='random-task', -# task='tasks.random_scheduled_task', -# schedule=timedelta(seconds=initial_run_in), -# app=celery_app -# ) -# entry.save() -# print(f"Initial task scheduled in {initial_run_in} seconds") - -# setup_schedule() - -# for task_id, task_name, schedule_obj, task_args in tasks_schedule: -# entry_key = f'redbeat:{task_id}' # RedBeat 任务的键 -# existing_entry = None - - -# try: -# existing_entry = RedBeatSchedulerEntry.from_key(entry_key, app=celery_app) -# except KeyError: -# # 任务不存在 -# pass - -# print(existing_entry) - -# if existing_entry: -# print(f"任务 `{task_name}` 已存在,跳过注册") -# logger.info(f"任务 `{task_name}` 已存在,跳过注册") -# else: -# redbeat_entry = RedBeatSchedulerEntry( -# name=task_id, -# task=task_name, -# schedule=schedule_obj, # 现在使用 schedule_obj 而不是创建新的 schedule -# args=task_args, -# app=celery_app -# ) -# redbeat_entry.save() - -# if task_name == "tasks.add_friends_task": -# if isinstance(schedule_obj, crontab): -# print(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行") -# logger.info(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行") -# else: -# print("scheduled_task_sync_wx_info 定时任务执行成功!") -# logger.info("scheduled_task_sync_wx_info 定时任务执行成功!") - - - -# # **仅在首次启动时,手动注册 RedBeat 任务** -# def register_initial_tasks(): -# try: -# # 生成一个随机的首次任务执行间隔(5-15 分钟) -# initial_interval = random.randint(1, 3) -# #initial_interval= -# # RedBeat 任务注册 -# redbeat_entry = RedBeatSchedulerEntry( -# name="redbeat:add_friends_task", -# task="tasks.add_friends_task", -# schedule=celery.schedules.schedule(timedelta(seconds=initial_interval)), -# args=[redis_config], -# app=celery_app -# ) -# redbeat_entry.save() -# print(f"已注册 `tasks.add_friends_task` 任务,首次将在 {initial_interval} 秒后执行") -# except Exception as e: -# print(f"任务注册失败: {e}") - -# register_initial_tasks() - -#add_friends_task.apply_async(args=[redis_config]) +def setup_schedule(): + # 初始化第一次执行(例如:15秒后执行) + initial_run_in = 5 + entry = RedBeatSchedulerEntry( + name='random-task', + task='tasks.random_scheduled_task', + schedule=timedelta(seconds=initial_run_in), + app=celery_app + ) + entry.save() + print(f"Initial task scheduled in {initial_run_in} seconds") -#trigger_initial_task() \ No newline at end of file +setup_schedule() \ No newline at end of file diff --git a/run.py b/run.py index de0e476..3cd9772 100644 --- a/run.py +++ b/run.py @@ -39,7 +39,7 @@ if __name__ == "__main__": celery_beat_process = start_celery_beat() # 等待子进程完成 - fastapi_process.wait() + #fastapi_process.wait() celery_worker_process.wait() celery_beat_process.wait() diff --git a/services/gewe_service.py b/services/gewe_service.py index 4210ba7..b766c7b 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -508,7 +508,7 @@ class GeWeService: response_object = await response.json() return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) - async def add_group_member_as_friend(self, token_id, app_id, chatroom_id, member_wxid, content): + async def add_group_member_as_friend_async(self, token_id, app_id, chatroom_id, member_wxid, content): ''' 添加群成员为好友 ''' @@ -875,7 +875,6 @@ class GeWeService: await self.redis_service.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False)) await asyncio.sleep(0.1) - async def save_groups_members_to_cache_async(self, token_id, app_id, wxid, chatroom_ids: list): """ 将群成员保存到 Redis 缓存。 @@ -1128,7 +1127,8 @@ class GeWeService: cache = await self.redis_service.get_hash(hash_key) wxids = [key for key, value in cache.items() if len(json.loads(value)) == 2] return wxids - + + async def enqueue_to_add_contacts_async(self,wxid,scene:int,v3,v4): """ 入列待添加好友 diff --git a/tasks.py b/tasks.py index b562bc3..866769d 100644 --- a/tasks.py +++ b/tasks.py @@ -11,6 +11,7 @@ from services.kafka_service import KafkaService from services.gewe_service import GeWeService from common.log import logger import asyncio,random +from model.models import AddGroupContactsHistory @celery_app.task(name='tasks.add_task', bind=True, acks_late=True) @@ -168,7 +169,7 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): wxid = r.get("wxid") status = r.get('status') if status == '0': - logger.warning(f"微信号 {wxid} 已经离线: {ret}-{msg}") + logger.warning(f"微信号 {wxid} 已经离线") continue ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) if ret != 200: @@ -197,6 +198,85 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): +@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) +def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): + async def task(): + try: + redis_service = RedisService() + await redis_service.init(**redis_config) + gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) + login_keys = [] + async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): + login_keys.append(key) + + print(login_keys) + for k in login_keys: + r = await redis_service.get_hash(k) + app_id = r.get("appId") + token_id = r.get("tokenId") + wxid = r.get("wxid") + status = r.get('status') + if status == '0': + logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") + continue + c = await gewe_service.get_wxchat_config_from_cache_async(wxid) + contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) + contact_wxids = [c.get('userName') for c in contacts] + chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) + + for chatroom_id in chatrooms: + chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) + chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) + chatroom_nickname = chatroom.get('nickName') + chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) + admin_wxid = chatroom_member.get('adminWxid', None) + logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxid}') + contact_wxids_set = set(contact_wxids) + if admin_wxid is not None: + contact_wxids_set.add(admin_wxid) + if chatroom_owner_wxid is not None: + contact_wxids_set.add(chatroom_owner_wxid) + + contact_wxids_set.add(wxid) + + unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) + contact_wxids_set.update(unavailable_wixds) + + chatroot_member_list = chatroom.get('memberList', []) + remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set] + + nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None) + + logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') + for m in remaining_chatroot_members: + ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') + if ret==200: + contact_wxids= m.get('wxid') + history=AddGroupContactsHistory.model_validate({ + "chatroomId":chatroom_id, + "wxid":wxid, + "contactWixd":contact_wxids, + "addTime":int(time.time()) + }) + await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxids,history) + else: + logger.info(f'群好友邀请失败原因:{data}') + logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') + await asyncio.sleep(random.uniform(1.5, 3)) + await asyncio.sleep(random.uniform(1.5, 3)) + + + except Exception as e: + logger.error(f"任务执行过程中发生异常: {e}") + + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + loop.run_until_complete(task()) # 在现有事件循环中运行任务 + + REDIS_KEY_PATTERN = "friend_add_limit:{date}" REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task" @@ -298,4 +378,5 @@ def random_scheduled_task(self,): app=celery_app ) entry.save() - return f"Scheduled next run in {next_run_in} seconds" \ No newline at end of file + return f"Scheduled next run in {next_run_in} seconds" +