from celery_app import celery_app from fastapi import Request,FastAPI import time,datetime from celery import Celery import celery.schedules from redbeat import RedBeatSchedulerEntry from datetime import timedelta from services.redis_service import RedisService from services.kafka_service import KafkaService from services.gewe_service import GeWeService # from common.log import logger from common.utils import * import asyncio,random from model.models import AddGroupContactsHistory import logging from model.models import AgentConfig import logging import sys,traceback logger = logging.getLogger('redbeat') @celery_app.task(name='tasks.add_task', bind=True, acks_late=True) def add_task(self, x, y): time.sleep(5) # 模拟长时间计算 logger.info('add') return x + y @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True) def mul_task(self, x, y): time.sleep(5) # 模拟长时间计算 return x * y # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True) # async def sync_contacts_task(self,app): # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) # return login_keys # # for k in login_keys: # # print(k) @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True) async def sync_contacts_task(self, redis_service): # Use the redis_service passed as an argument login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) return login_keys @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True) def background_worker_task(self, redis_config, kafka_config, gewe_config): async def task(): redis_service = RedisService() await redis_service.init(**redis_config) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) print(login_keys) asyncio.run(task()) # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True) # async def background_worker_task(self, redis_config, kafka_config, gewe_config): # # Initialize services inside the task # redis_service = RedisService() # await redis_service.init(**redis_config) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器 # login_keys.append(key) # print(login_keys) # kafka_service = KafkaService(**kafka_config) # await kafka_service.start() # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) # # Task logic # lock_name = "background_wxchat_thread_lock" # lock_identifier = str(time.time()) # while True: # if await redis_service.acquire_lock(lock_name, timeout=10): # try: # logger.info("分布式锁已成功获取") # # Perform task logic # finally: # await redis_service.release_lock(lock_name, lock_identifier) # break # else: # logger.info("获取分布式锁失败,等待10秒后重试...") # await asyncio.sleep(10) # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) # def scheduled_task(self): # print("定时任务执行成功!~~~~~~~~~~~~~~~~~") # return "Hello from Celery Beat + RedBeat!" # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True) # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service): # print("scheduled_task_sync_wx 定时任务执行成功!") # return "Hello from Celery Beat + RedBeat!" # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True) # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config): # ''' # 定时获取微信号资料 # ''' # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # async def task(): # try: # redis_service = RedisService() # await redis_service.init(**redis_config) # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # login_keys.append(key) # print(login_keys) # # for k in login_keys: # # r = await redis_service.get_hash(k) # # app_id = r.get("appId") # # token_id = r.get("tokenId") # # wxid = r.get("wxid") # # status = r.get('status') # # if status == '0': # # continue # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) # # if ret != 200: # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") # # continue # # nickname=profile.get("nickName") # # head_img_url=profile.get("smallHeadImgUrl") # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} # # await redis_service.set_hash(k, cleaned_login_info) # # logger.info(f"同步微信号 {wxid} 资料 成功") # # redis_service.update_hash_field(k,"nickName",nickname) # # redis_service.update_hash_field(k,"headImgUrl",head_img_url) # # redis_service.update_hash_field(k,"modify_at",int(time.time())) # except Exception as e: # logger.error(f"任务执行过程中发生异常: {e}") # print("scheduled_task_sync_wx_info 定时任务执行成功!") # return "Hello from Celery Beat + RedBeat!" # loop.run_until_complete(task()) # loop.close() # @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) # def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): # ''' # 定时获取微信号资料 # ''' # async def task(): # try: # redis_service = RedisService() # await redis_service.init(**redis_config) # gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # login_keys.append(key) # #print(login_keys) # for k in login_keys: # r = await redis_service.get_hash(k) # app_id = r.get("appId") # token_id = r.get("tokenId") # wxid = r.get("wxid") # status = r.get('status') # if status == '0': # logger.warning(f"微信号 {wxid} 已经离线") # continue # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) # if ret != 200: # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") # continue # nickname=profile.get("nickName") # head_img_url=profile.get("smallHeadImgUrl") # # print(nickname) # nickname=profile.get("nickName") # head_img_url=profile.get("smallHeadImgUrl") # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} # await redis_service.set_hash(k, cleaned_login_info) # logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") # except Exception as e: # logger.error(f"任务执行过程中发生异常: {e}") # loop = asyncio.get_event_loop() # if loop.is_closed(): # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # loop.run_until_complete(task()) # 在现有事件循环中运行任务 @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): ''' 定时获取微信号资料 ''' async def process_key(redis_service, gewe_service, semaphore, key): async with semaphore: # 使用 Semaphore 控制并发 try: r = await redis_service.get_hash(key) app_id = r.get("appId") token_id = r.get("tokenId") wxid = r.get("wxid") status = r.get('status') if status == '0': logger.warning(f"微信号 {wxid} 已经离线") return ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) if ret != 200: logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") return nickname = profile.get("nickName") head_img_url = profile.get("smallHeadImgUrl") r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())}) cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} await redis_service.set_hash(key, cleaned_login_info) logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") except Exception as e: logger.error(f"处理键 {key} 时发生异常: {e}") async def task(): try: redis_service = RedisService() await redis_service.init(**redis_config) gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url']) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) # 设置 Semaphore,限制并发数为 10 semaphore = asyncio.Semaphore(10) # 使用 asyncio.gather 并发处理所有键 await asyncio.gather(*[process_key(redis_service, gewe_service, semaphore, key) for key in login_keys]) except Exception as e: logger.error(f"任务执行过程中发生异常: {e}") loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务 # @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) # def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): # ''' # 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。 # 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。 # ''' # async def task(): # try: # now = datetime.now() # if now.hour < 8: # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") # return # logger.info('定时群成员定时添好友任务开始') # redis_service = RedisService() # await redis_service.init(**redis_config) # gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) # KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers'] # KAFKA_TOPIC=kafka_config['topic'] # KAFKA_GROUP_ID=kafka_config['group_id'] # kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID) # await kafka_service.start_producer() # global_config=await gewe_service.get_global_config_from_cache_async() # scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{}) # oneday_add_contacts_total=90 # once_add_contacts_total=30 # #oneday_times=3 # if scheduled_task_add_contacts_from_chatrooms_config: # oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90) # once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30) # #oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) # # cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms') # # if cache_task_run_time_logs: # # sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True) # # last_run_time=sorted_tasks[0].get("runTime") # # if last_run_time > 1e12: # 毫秒级时间戳 # # last_run_time = last_run_time / 1000 # 转换为秒 # # # 将时间戳转换为 datetime 对象 # # last_run_time = datetime.fromtimestamp(last_run_time) # # # 获取当前时间 # # current_time = datetime.now() # # # 计算时间差 # # time_difference = current_time - last_run_time # # # 判断是否相差2小时 # # if time_difference < timedelta(hours=2): # # logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行") # # return # # time_difference_seconds = today_seconds_remaining() # # cache_task_run_time_logs.append({"runTime":int(time.time())}) # # await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # login_keys.append(key) # wixd_add_contacts_from_chatrooms_times = {} # for k in login_keys: # r = await redis_service.get_hash(k) # app_id = r.get("appId") # token_id = r.get("tokenId") # wxid = r.get("wxid") # status = r.get('status') # if status == '0': # logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") # continue # config=await gewe_service.get_wxchat_config_from_cache_async(wxid) # validated_config = AgentConfig.model_validate(config) # if not validated_config.agentEnabled: # logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") # continue # # 判断是否过于频繁 # is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend") # if is_wx_expection: # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。") # continue # cache_task_run_time_wxid_logs= await gewe_service.get_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms') # if cache_task_run_time_wxid_logs: # sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True) # last_run_time=sorted_tasks[0].get("runTime") # if last_run_time > 1e12: # 毫秒级时间戳 # last_run_time = last_run_time / 1000 # 转换为秒 # # 将时间戳转换为 datetime 对象 # last_run_time = datetime.fromtimestamp(last_run_time) # # 获取当前时间 # current_time = datetime.now() # # 计算时间差 # time_difference = current_time - last_run_time # # 判断是否相差2小时 # if time_difference < timedelta(hours=2): # logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行") # continue # cache_task_run_time_wxid_logs.append({"runTime":int(time.time())}) # await gewe_service.save_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_wxid_logs,3600*2) # c = await gewe_service.get_wxchat_config_from_cache_async(wxid) # contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) # contact_wxids = [c.get('userName') for c in contacts] # chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) # logger.info(f'{wxid} 定时群成员定时添好友任务开始') # wixd_add_contacts_from_chatrooms_times[wxid] = 0 # for chatroom_id in chatrooms: # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) # chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) # chatroom_nickname = chatroom.get('nickName') # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) # admin_wxids = chatroom_member.get('adminWxid', []) # admin_wxids = chatroom_member.get('adminWxid') # if admin_wxids is None: # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 # logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') # contact_wxids_set = set(contact_wxids) # # for admin_wxid in admin_wxids: # # contact_wxids_set.add(admin_wxid) # if admin_wxids: # contact_wxids_set.update(set(admin_wxids)) # if chatroom_owner_wxid is not None: # contact_wxids_set.add(chatroom_owner_wxid) # contact_wxids_set.add(wxid) # # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) # # contact_wxids_set.update(set(unavailable_wixds)) # # chatroom_member_list = chatroom.get('memberList', []) # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) # if unavailable_wixds: # contact_wxids_set.update(set(unavailable_wixds)) # chatroom_member_list = chatroom.get('memberList', []) # if chatroom_member_list is None: # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 # elif not isinstance(chatroom_member_list, list): # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 # remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] # nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) # if not remaining_chatroot_members: # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请') # # 任务状态推送到kafka # k_message=wx_add_contacts_from_chatroom_task_status_message(wxid,chatroom_id,2) # await kafka_service.send_message_async(k_message) # continue # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') # for m in remaining_chatroot_members: # # 判断本次任务是否已经邀请了30个好友 # if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total: # logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请") # continue # # 判断当天群成员是否已经加了90个好友 # is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total) # if is_add_group_times: # logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") # continue # # 判断是否过于频繁 # is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend") # if is_wx_expection: # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}") # continue # contact_wxid= m.get('wxid') # member_nickname=m.get("nickName") # group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid) # if group_add_contacts_history: # sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) # # 已经邀请过两次,不再邀请 # if len(sorted_history)==2: # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') # continue # # 当天邀请过,不再邀请 # if len(sorted_history) > 0: # last_add_time = sorted_history[0].addTime # def is_add_time_more_than_one_day(addTime: int) -> bool: # """ # 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 # :param addTime: Unix 时间戳 # :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False # """ # # 获取当前时间的时间戳 # current_time = time.time() # # 计算时间戳差值 # time_difference = abs(current_time - addTime) # # 检查是否大于 3600 × 24 秒 # return time_difference > 3600 * 24 # is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) # if not is_more_than_one_day: # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请') # continue # ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') # if ret!=200: # logger.warning(f'群好友邀请失败原因:{ret} {data}') # if msg in '操作过于频繁,请稍后再试。': # await gewe_service.save_wx_expection_async(wxid,"addGroupMemberAsFriend",msg,today_seconds_remaining()) # logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。') # continue # history=AddGroupContactsHistory.model_validate({ # "chatroomId":chatroom_id, # "wxid":wxid, # "contactWixd":contact_wxid, # "addTime":int(time.time()) # }) # await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) # wixd_add_contacts_from_chatrooms_times[wxid]+=1 # logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') # # 推送到kafka # k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) # await kafka_service.send_message_async(k_message) # #await asyncio.sleep(random.uniform(1.5, 3)) # await asyncio.sleep(random.uniform(30,60)) # # 任务状态推送到kafka # task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id) # k_message=wx_add_contacts_from_chatroom_task_status_message(wxid,chatroom_id,task_status) # await kafka_service.send_message_async(k_message) # # 下一个群 # await asyncio.sleep(random.uniform(1.5, 3)) # except Exception as e: # # 获取当前的堆栈跟踪 # tb = sys.exc_info()[2] # # 为异常附加堆栈跟踪 # e = e.with_traceback(tb) # # 输出详细的错误信息 # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}") # finally: # await kafka_service.stop_producer() # loop = asyncio.get_event_loop() # if loop.is_closed(): # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # loop.run_until_complete(task()) # 在现有事件循环中运行任务 @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): ''' 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。 ''' async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore): async with semaphore: # 使用 Semaphore 控制并发 r = await redis_service.get_hash(k) app_id = r.get("appId") token_id = r.get("tokenId") wxid = r.get("wxid") status = r.get('status') if status == '0': logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") return config = await gewe_service.get_wxchat_config_from_cache_async(wxid) validated_config = AgentConfig.model_validate(config) if not validated_config.agentEnabled: logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") return # 判断是否过于频繁 is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") if is_wx_expection: logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。") return cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms') if cache_task_run_time_wxid_logs: sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True) last_run_time = sorted_tasks[0].get("runTime") if last_run_time > 1e12: # 毫秒级时间戳 last_run_time = last_run_time / 1000 # 转换为秒 # 将时间戳转换为 datetime 对象 last_run_time = datetime.fromtimestamp(last_run_time) # 获取当前时间 current_time = datetime.now() # 计算时间差 time_difference = current_time - last_run_time # 判断是否相差2小时 if time_difference < timedelta(hours=2): logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行") return cache_task_run_time_wxid_logs.append({"runTime": int(time.time())}) await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2) c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid) contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) contact_wxids = [c.get('userName') for c in contacts] chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) logger.info(f'{wxid} 定时群成员定时添好友任务开始') wixd_add_contacts_from_chatrooms_times = {wxid: 0} for chatroom_id in chatrooms: chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) chatroom_nickname = chatroom.get('nickName') chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) admin_wxids = chatroom_member.get('adminWxid', []) admin_wxids = chatroom_member.get('adminWxid') if admin_wxids is None: admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') contact_wxids_set = set(contact_wxids) if admin_wxids: contact_wxids_set.update(set(admin_wxids)) if chatroom_owner_wxid is not None: contact_wxids_set.add(chatroom_owner_wxid) contact_wxids_set.add(wxid) unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id) if unavailable_wixds: contact_wxids_set.update(set(unavailable_wixds)) chatroom_member_list = chatroom.get('memberList', []) if chatroom_member_list is None: chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 elif not isinstance(chatroom_member_list, list): chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) if not remaining_chatroot_members: logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群里没有好友可以邀请') # 任务状态推送到kafka k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2) await kafka_service.send_message_async(k_message) continue logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') for m in remaining_chatroot_members: # 判断本次任务是否已经邀请了30个好友 if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total: logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请") return # 判断当天群成员是否已经加了90个好友 is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total) if is_add_group_times: logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") return # 判断是否过于频繁 is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") if is_wx_expection: logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}") return contact_wxid = m.get('wxid') member_nickname = m.get("nickName") group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid) if group_add_contacts_history: sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) # 已经邀请过两次,不再邀请 if len(sorted_history) == 2: logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') continue # 24小时邀请过,不再邀请 if len(sorted_history) > 0: last_add_time = sorted_history[0].addTime def is_add_time_more_than_one_day(addTime: int) -> bool: """ 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 :param addTime: Unix 时间戳 :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False """ # 获取当前时间的时间戳 current_time = time.time() # 计算时间戳差值 time_difference = abs(current_time - addTime) # 检查是否大于 3600 × 24 秒 return time_difference > 3600 * 24 is_more_than_one_day = is_add_time_more_than_one_day(last_add_time) if not is_more_than_one_day: logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经24小时邀请,不再邀请') continue ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') history = AddGroupContactsHistory.model_validate({ "chatroomId": chatroom_id, "wxid": wxid, "contactWixd": contact_wxid, "addTime": int(time.time()) }) if ret != 200: logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群好友 {contact_wxid} 邀请失败原因:{ret} {msg} {data}') if '操作过于频繁' in data.get('msg'): await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining()) await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history) logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。当天不再处理该号群好友邀请任务') return await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history) wixd_add_contacts_from_chatrooms_times[wxid] += 1 logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') # 推送到kafka k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime) await kafka_service.send_message_async(k_message) # await asyncio.sleep(random.uniform(1.5, 3)) # await asyncio.sleep(random.uniform(30, 60)) await asyncio.sleep(random.uniform(270,300)) # 任务状态推送到kafka task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id) k_message=wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status) await kafka_service.send_message_async(k_message) # 下一个群 await asyncio.sleep(random.uniform(1.5, 3)) async def task(): try: now = datetime.now() # if 10> now.hour < 8: # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") # return if now.hour < 8 or now.hour > 22: logger.info(f"定时群成员定时添好友任务不启动, 当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},不在8点到22点之间") return logger.info('定时群成员定时添好友任务开始') redis_service = RedisService() await redis_service.init(**redis_config) gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url']) KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers'] KAFKA_TOPIC = kafka_config['topic'] KAFKA_GROUP_ID = kafka_config['group_id'] kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID) await kafka_service.start_producer() global_config = await gewe_service.get_global_config_from_cache_async() scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {}) oneday_add_contacts_total = 90 once_add_contacts_total = 30 # oneday_times=3 if scheduled_task_add_contacts_from_chatrooms_config: oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90) once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30) # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) # 设置 Semaphore,限制并发任务数量 semaphore = asyncio.Semaphore(10) # 例如,限制同时运行的任务数量为 10 # 使用 asyncio.gather 并发处理每个 login_key await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore) for k in login_keys]) except Exception as e: # 获取当前的堆栈跟踪 tb = sys.exc_info()[2] # 为异常附加堆栈跟踪 e = e.with_traceback(tb) # 输出详细的错误信息 logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}") finally: await kafka_service.stop_producer() loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务 # @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) # def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): # ''' # 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。 # 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。 # ''' # async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total): # r = await redis_service.get_hash(k) # app_id = r.get("appId") # token_id = r.get("tokenId") # wxid = r.get("wxid") # status = r.get('status') # if status == '0': # logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") # return # config = await gewe_service.get_wxchat_config_from_cache_async(wxid) # validated_config = AgentConfig.model_validate(config) # if not validated_config.agentEnabled: # logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") # return # # 判断是否过于频繁 # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") # if is_wx_expection: # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。") # return # cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms') # if cache_task_run_time_wxid_logs: # sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True) # last_run_time = sorted_tasks[0].get("runTime") # if last_run_time > 1e12: # 毫秒级时间戳 # last_run_time = last_run_time / 1000 # 转换为秒 # # 将时间戳转换为 datetime 对象 # last_run_time = datetime.fromtimestamp(last_run_time) # # 获取当前时间 # current_time = datetime.now() # # 计算时间差 # time_difference = current_time - last_run_time # # 判断是否相差2小时 # if time_difference < timedelta(hours=2): # logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行") # return # cache_task_run_time_wxid_logs.append({"runTime": int(time.time())}) # await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2) # c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid) # contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) # contact_wxids = [c.get('userName') for c in contacts] # chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) # logger.info(f'{wxid} 定时群成员定时添好友任务开始') # wixd_add_contacts_from_chatrooms_times = {wxid: 0} # for chatroom_id in chatrooms: # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) # chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) # chatroom_nickname = chatroom.get('nickName') # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) # admin_wxids = chatroom_member.get('adminWxid', []) # admin_wxids = chatroom_member.get('adminWxid') # if admin_wxids is None: # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 # logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') # contact_wxids_set = set(contact_wxids) # if admin_wxids: # contact_wxids_set.update(set(admin_wxids)) # if chatroom_owner_wxid is not None: # contact_wxids_set.add(chatroom_owner_wxid) # contact_wxids_set.add(wxid) # unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id) # if unavailable_wixds: # contact_wxids_set.update(set(unavailable_wixds)) # chatroom_member_list = chatroom.get('memberList', []) # if chatroom_member_list is None: # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 # elif not isinstance(chatroom_member_list, list): # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 # remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] # nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) # if not remaining_chatroot_members: # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请') # # 任务状态推送到kafka # k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2) # await kafka_service.send_message_async(k_message) # return # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') # for m in remaining_chatroot_members: # # 判断本次任务是否已经邀请了30个好友 # if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total: # logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请") # return # # 判断当天群成员是否已经加了90个好友 # is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total) # if is_add_group_times: # logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") # return # # 判断是否过于频繁 # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend") # if is_wx_expection: # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}") # return # contact_wxid = m.get('wxid') # member_nickname = m.get("nickName") # group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid) # if group_add_contacts_history: # sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) # # 已经邀请过两次,不再邀请 # if len(sorted_history) == 2: # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') # return # # 当天邀请过,不再邀请 # if len(sorted_history) > 0: # last_add_time = sorted_history[0].addTime # def is_add_time_more_than_one_day(addTime: int) -> bool: # """ # 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 # :param addTime: Unix 时间戳 # :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False # """ # # 获取当前时间的时间戳 # current_time = time.time() # # 计算时间戳差值 # time_difference = abs(current_time - addTime) # # 检查是否大于 3600 × 24 秒 # return time_difference > 3600 * 24 # is_more_than_one_day = is_add_time_more_than_one_day(last_add_time) # if not is_more_than_one_day: # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请') # return # ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), # f'我是群聊"{chatroom_nickname}"群的{nickname}') # if ret != 200: # logger.warning(f'群好友邀请失败原因:{ret} {msg} {data}') # if '操作过于频繁' in data.get('msg'): # await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining()) # logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。') # return # history = AddGroupContactsHistory.model_validate({ # "chatroomId": chatroom_id, # "wxid": wxid, # "contactWixd": contact_wxid, # "addTime": int(time.time()) # }) # await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history) # wixd_add_contacts_from_chatrooms_times[wxid] += 1 # logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') # # 推送到kafka # k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime) # await kafka_service.send_message_async(k_message) # # await asyncio.sleep(random.uniform(1.5, 3)) # await asyncio.sleep(random.uniform(30, 60)) # # 任务状态推送到kafka # task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id) # wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status) # await kafka_service.send_message_async(k_message) # # 下一个群 # await asyncio.sleep(random.uniform(1.5, 3)) # async def task(): # try: # now = datetime.now() # if now.hour < 8: # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") # return # logger.info('定时群成员定时添好友任务开始') # redis_service = RedisService() # await redis_service.init(**redis_config) # gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url']) # KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers'] # KAFKA_TOPIC = kafka_config['topic'] # KAFKA_GROUP_ID = kafka_config['group_id'] # kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID) # await kafka_service.start_producer() # global_config = await gewe_service.get_global_config_from_cache_async() # scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {}) # oneday_add_contacts_total = 90 # once_add_contacts_total = 30 # # oneday_times=3 # if scheduled_task_add_contacts_from_chatrooms_config: # oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90) # once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30) # # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # login_keys.append(key) # # 使用 asyncio.gather 并发处理每个 login_key # await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total) for k in login_keys]) # except Exception as e: # # 获取当前的堆栈跟踪 # tb = sys.exc_info()[2] # # 为异常附加堆栈跟踪 # e = e.with_traceback(tb) # # 输出详细的错误信息 # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}") # finally: # await kafka_service.stop_producer() # loop = asyncio.get_event_loop() # if loop.is_closed(): # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # loop.run_until_complete(task()) # 在现有事件循环中运行任务 REDIS_KEY_PATTERN = "friend_add_limit:{date}" REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task" @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True) def add_friends_task(self,redis_config): """ 限制每天最多 15 个,每 2 小时最多 8 个 """ async def task(): redis_service = RedisService() await redis_service.init(**redis_config) today_str = datetime.now().strftime("%Y%m%d") redis_key = REDIS_KEY_PATTERN.format(date=today_str) # 获取当前总添加数量 total_added = await redis_service.get_hash_field(redis_key, "total") or 0 last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0 total_added = int(total_added) last_2h_added = int(last_2h_added) logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}") # 判断是否超过限制 if total_added >= 15: logger.warning("今日好友添加已达上限!") return if last_2h_added >= 8: logger.warning("过去2小时添加已达上限!") return # 计算本次要添加的好友数量 (控制每天 5-15 个) max_add = min(15 - total_added, 8 - last_2h_added) if max_add <= 0: return num_to_add = min(max_add, 1) # 每次最多加 1 个 logger.info(f"本次添加 {num_to_add} 位好友") # TODO: 调用好友添加逻辑 (接口 or 业务逻辑) # success = add_friends(num_to_add) success = num_to_add # 假设成功添加 num_to_add 个 # 更新 Redis 计数 if success > 0: await redis_service.increment_hash_field(redis_key, "total", success) await redis_service.increment_hash_field(redis_key, "last_2h", success) # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时) await redis_service.expire(redis_key, 86400) # 24小时 await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时 logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}") # 生成一个新的随机时间(5-15 分钟之间) # next_interval = random.randint(10, 20) # # 计算新的执行时间 # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval) # # 重新注册 RedBeat 任务,确保下次执行时间不同 # redbeat_entry = RedBeatSchedulerEntry( # name="redbeat:add_friends_task", # task="tasks.add_friends_task", # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)), # args=[redis_config], # app=celery_app # ) # # 设置任务的下次执行时间 # redbeat_entry.last_run_at = next_run_time # redbeat_entry.save() # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)") loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务 def today_seconds_remaining()->int: current_time = datetime.now() # 计算当天的结束时间(23:59:59) end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59) # 计算时间差 time_difference = end_of_day - current_time # 将时间差转换为秒数 time_difference_seconds = int(time_difference.total_seconds()) return time_difference_seconds @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True) def random_scheduled_task(self,): print(f"Task executed at {datetime.now()}") # 随机生成下次执行时间(例如:10-60秒内的随机时间) next_run_in = random.randint(10, 60) print(f"Next execution will be in {next_run_in} seconds") # 设置下次执行时间 entry = RedBeatSchedulerEntry( name='random-task', task='tasks.random_scheduled_task', schedule=timedelta(seconds=next_run_in), app=celery_app ) entry.save() return f"Scheduled next run in {next_run_in} seconds"