diff --git a/.gitignore b/.gitignore
index 2732b7a..9f8ccd6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,4 +36,4 @@ client_config.json
!plugins.json
tmp/
logs/
-cmd.txt
\ No newline at end of file
+*.txt
\ No newline at end of file
diff --git a/celery_app.py b/celery_app.py
index 0589f84..7788fe4 100644
--- a/celery_app.py
+++ b/celery_app.py
@@ -1,46 +1,50 @@
-from celery import Celery
+#from celery import Celery
import celery.schedules
from redbeat import RedBeatSchedulerEntry
from datetime import timedelta
+from celery.schedules import crontab
+from celery_config import *
from services.redis_service import RedisService
from services.kafka_service import KafkaService
from services.biz_service import BizService
-from config import load_config,conf
+#from config import load_config,conf
from urllib.parse import quote
-import asyncio,os
-
-
-load_config()
-
-KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
-KAFKA_TOPIC = 'topic.ai.ops.wx'
-KAFKA_GROUP_ID = 'ai-ops-wx'
-
-redis_host=conf().get("redis_host")
-redis_port=conf().get("redis_port")
-redis_password=conf().get("redis_password")
-redis_db=conf().get("redis_db")
-encoded_password = quote(redis_password)
-# 配置 Celery
-celery_app = Celery(
- "worker",
- broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
- backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
- include=['tasks']
-)
-
-# 配置 redbeat 作为 Celery Beat 调度器
-celery_app.conf.update(
- timezone="Asia/Shanghai", # 设定时区
- beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器
- redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis
- ,
- redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突
- beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务
-)
+import asyncio,os,random,sys
+#from tasks import add_friends_task
+from common.log import logger
+
+
+# load_config()
+
+# KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
+# KAFKA_TOPIC = 'topic.ai.ops.wx'
+# KAFKA_GROUP_ID = 'ai-ops-wx'
+
+# redis_host=conf().get("redis_host")
+# redis_port=conf().get("redis_port")
+# redis_password=conf().get("redis_password")
+# redis_db=conf().get("redis_db")
+# encoded_password = quote(redis_password)
+# # 配置 Celery
+# celery_app = Celery(
+# "worker",
+# broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
+# backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
+# include=['tasks']
+# )
+
+# # 配置 redbeat 作为 Celery Beat 调度器
+# celery_app.conf.update(
+# timezone="Asia/Shanghai", # 设定时区
+# beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器
+# redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis
+# ,
+# redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突
+# beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务
+# )
# task_name = "tasks.scheduled_task"
# # 任务执行间隔(每 10 秒执行一次)
@@ -76,46 +80,165 @@ celery_app.conf.update(
# )
# redbeat_entry.save()
-# 获取配置文件中的 redis_config、kafka_config、gewe_config
-redis_config = {
- 'host': redis_host,
- 'port': redis_port,
- 'password': redis_password,
- 'db': redis_db,
-}
-
-
-kafka_config = {
- 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
- 'topic': KAFKA_TOPIC,
- 'group_id': KAFKA_GROUP_ID,
-}
-gewe_config = {
- 'api_url': "http://api.geweapi.com/gewe",
-}
-
-scheduled_task_sync_wx_info_interval = 10
+# # 获取配置文件中的 redis_config、kafka_config、gewe_config
+# redis_config = {
+# 'host': redis_host,
+# 'port': redis_port,
+# 'password': redis_password,
+# 'db': redis_db,
+# }
+
+
+# kafka_config = {
+# 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
+# 'topic': KAFKA_TOPIC,
+# 'group_id': KAFKA_GROUP_ID,
+# }
+# gewe_config = {
+# 'api_url': "http://api.geweapi.com/gewe",
+# }
+
+scheduled_task_sync_wx_info_interval = 6000
environment = os.environ.get('environment', 'default')
if environment != 'default':
scheduled_task_sync_wx_info_interval = 60*11
+
# 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数)
-tasks_schedule = [
- #("redbeat:scheduled_task", "tasks.scheduled_task", 3, []),
- #("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15, []),
- ("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", scheduled_task_sync_wx_info_interval, [redis_config, kafka_config, gewe_config]),
+# tasks_schedule = [
+# #("redbeat:scheduled_task", "tasks.scheduled_task", 3, []),
+# #("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15, []),
+# ("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", scheduled_task_sync_wx_info_interval, [redis_config, kafka_config, gewe_config]),
+# #("redbeat:add_friends_task", "tasks.add_friends_task", random.randint(1, 10), [redis_config]), # 10分钟执行一次
+# ]
+
+
+# # 注册 RedBeat 任务
+# for task_id, task_name, interval, task_args in tasks_schedule:
+# redbeat_entry = RedBeatSchedulerEntry(
+# name=task_id,
+# task=task_name,
+# schedule=celery.schedules.schedule(timedelta(seconds=interval)),
+# args=task_args,
+# app=celery_app
+# )
+# redbeat_entry.save()
-]
+# # 为 add_friends_task 生成随机的分钟和小时
+random_minute = random.randint(0, 59)
+random_hour = random.randint(0, 23) # 可以根据需要调整小时范围
-# 注册 RedBeat 任务
-for task_id, task_name, interval, task_args in tasks_schedule:
+# 定义定时任务列表 (任务 ID, 任务名称, 执行间隔或调度, 任务参数)
+tasks_schedule = [
+ # 其他任务保持不变
+ ("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info",
+ celery.schedules.schedule(timedelta(seconds=scheduled_task_sync_wx_info_interval)),
+ [redis_config, kafka_config, gewe_config]),
+
+ # # add_friends_task 使用 crontab 随机时间
+ # ("redbeat:add_friends_task", "tasks.add_friends_task",
+ # crontab(minute=random_minute, hour='*'), # 每小时在随机分钟执行
+ # # 如果想每天在随机时间执行,可以用: crontab(minute=random_minute, hour=random_hour)
+ # [redis_config]),
+]
+
+# # 注册 RedBeat 任务
+for task_id, task_name, schedule_obj, task_args in tasks_schedule:
redbeat_entry = RedBeatSchedulerEntry(
name=task_id,
task=task_name,
- schedule=celery.schedules.schedule(timedelta(seconds=interval)),
- args=task_args,
+ schedule=schedule_obj, # 现在使用 schedule_obj 而不是创建新的 schedule
+ args=task_args,
app=celery_app
)
redbeat_entry.save()
+ # 如果是 add_friends_task,打印其随机调度信息
+ if task_name == "tasks.add_friends_task":
+ if isinstance(schedule_obj, crontab):
+ print(f"已注册 `{task_name}` 任务,将在每小时的第 {random_minute} 分钟执行")
+ logger.info(f"已注册 `{task_name}` 任务,将在每小时的第 {random_minute} 分钟执行")
+ else:
+ print('scheduled_task_sync_wx_info 定时任务执行成功!')
+ logger.info(f"scheduled_task_sync_wx_info 定时任务执行成功!")
+
+
+# def setup_schedule():
+# # 初始化第一次执行(例如:15秒后执行)
+# initial_run_in = 5
+# entry = RedBeatSchedulerEntry(
+# name='random-task',
+# task='tasks.random_scheduled_task',
+# schedule=timedelta(seconds=initial_run_in),
+# app=celery_app
+# )
+# entry.save()
+# print(f"Initial task scheduled in {initial_run_in} seconds")
+
+# setup_schedule()
+
+# for task_id, task_name, schedule_obj, task_args in tasks_schedule:
+# entry_key = f'redbeat:{task_id}' # RedBeat 任务的键
+# existing_entry = None
+
+
+# try:
+# existing_entry = RedBeatSchedulerEntry.from_key(entry_key, app=celery_app)
+# except KeyError:
+# # 任务不存在
+# pass
+
+# print(existing_entry)
+
+# if existing_entry:
+# print(f"任务 `{task_name}` 已存在,跳过注册")
+# logger.info(f"任务 `{task_name}` 已存在,跳过注册")
+# else:
+# redbeat_entry = RedBeatSchedulerEntry(
+# name=task_id,
+# task=task_name,
+# schedule=schedule_obj, # 现在使用 schedule_obj 而不是创建新的 schedule
+# args=task_args,
+# app=celery_app
+# )
+# redbeat_entry.save()
+
+# if task_name == "tasks.add_friends_task":
+# if isinstance(schedule_obj, crontab):
+# print(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行")
+# logger.info(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行")
+# else:
+# print("scheduled_task_sync_wx_info 定时任务执行成功!")
+# logger.info("scheduled_task_sync_wx_info 定时任务执行成功!")
+
+
+
+
+
+
+# # **仅在首次启动时,手动注册 RedBeat 任务**
+# def register_initial_tasks():
+# try:
+# # 生成一个随机的首次任务执行间隔(5-15 分钟)
+# initial_interval = random.randint(1, 3)
+# #initial_interval=
+
+# # RedBeat 任务注册
+# redbeat_entry = RedBeatSchedulerEntry(
+# name="redbeat:add_friends_task",
+# task="tasks.add_friends_task",
+# schedule=celery.schedules.schedule(timedelta(seconds=initial_interval)),
+# args=[redis_config],
+# app=celery_app
+# )
+# redbeat_entry.save()
+# print(f"已注册 `tasks.add_friends_task` 任务,首次将在 {initial_interval} 秒后执行")
+# except Exception as e:
+# print(f"任务注册失败: {e}")
+
+# register_initial_tasks()
+
+#add_friends_task.apply_async(args=[redis_config])
+
+#trigger_initial_task()
\ No newline at end of file
diff --git a/celery_config.py b/celery_config.py
new file mode 100644
index 0000000..e984dcd
--- /dev/null
+++ b/celery_config.py
@@ -0,0 +1,55 @@
+from celery import Celery
+
+
+from config import load_config,conf
+from urllib.parse import quote
+
+
+load_config()
+
+KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
+KAFKA_TOPIC = 'topic.ai.ops.wx'
+KAFKA_GROUP_ID = 'ai-ops-wx'
+
+redis_host=conf().get("redis_host")
+redis_port=conf().get("redis_port")
+redis_password=conf().get("redis_password")
+redis_db=conf().get("redis_db")
+encoded_password = quote(redis_password)
+
+# 配置 Celery
+celery_app = Celery(
+ "worker",
+ broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
+ backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
+ include=['tasks']
+)
+
+# 配置 redbeat 作为 Celery Beat 调度器
+celery_app.conf.update(
+ timezone="Asia/Shanghai", # 设定时区
+ beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器
+ redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis
+ ,
+ redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突
+ beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务
+)
+
+
+# 获取配置文件中的 redis_config、kafka_config、gewe_config
+redis_config = {
+ 'host': redis_host,
+ 'port': redis_port,
+ 'password': redis_password,
+ 'db': redis_db,
+}
+
+
+kafka_config = {
+ 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
+ 'topic': KAFKA_TOPIC,
+ 'group_id': KAFKA_GROUP_ID,
+}
+gewe_config = {
+ 'api_url': "http://api.geweapi.com/gewe",
+}
\ No newline at end of file
diff --git a/run.py b/run.py
index cefe7a1..de0e476 100644
--- a/run.py
+++ b/run.py
@@ -1,6 +1,11 @@
import subprocess
import sys
import os
+import time
+import signal
+
+processes = []
+
def start_fastapi():
""" 启动 FastAPI 服务 """
@@ -14,14 +19,17 @@ def start_fastapi():
def start_celery_worker():
""" 启动 Celery Worker """
if sys.platform == "win32":
- process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info", "-P", "solo"])
+ process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info", "-P", "solo"],
+ stdout=None, stderr=None)
else:
- process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info"])
+ process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info"],
+ stdout=None, stderr=None)
return process
def start_celery_beat():
""" 启动 Celery Beat,使用 RedBeat 作为调度器 """
- process = subprocess.Popen(["celery", "-A", "celery_app", "beat", "--scheduler", "redbeat.RedBeatScheduler", "--loglevel=info"])
+ process = subprocess.Popen(["celery", "-A", "celery_app", "beat", "--scheduler", "redbeat.RedBeatScheduler", "--loglevel=info"],
+ stdout=None, stderr=None)
return process
if __name__ == "__main__":
@@ -34,3 +42,37 @@ if __name__ == "__main__":
fastapi_process.wait()
celery_worker_process.wait()
celery_beat_process.wait()
+
+
+# def signal_handler(sig, frame):
+# """处理退出信号,确保子进程也被终止"""
+# print('正在关闭所有服务...')
+# for process in processes:
+# process.terminate() # 尝试优雅终止
+
+# # 给进程一点时间来优雅终止
+# time.sleep(2)
+
+# # 检查是否有进程仍在运行,如果有则强制终止
+# for process in processes:
+# if process.poll() is None: # 进程仍在运行
+# process.kill() # 强制终止
+
+# sys.exit(0)
+
+# if __name__ == "__main__":
+# # 注册信号处理程序
+# signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
+# signal.signal(signal.SIGTERM, signal_handler) # 终止信号
+
+# # 启动 FastAPI、Celery Worker 和 Celery Beat
+# #fastapi_process = start_fastapi()
+# celery_worker_process = start_celery_worker()
+# celery_beat_process = start_celery_beat()
+
+# # 让主进程保持运行,但不阻塞在特定子进程上
+# try:
+# while True:
+# time.sleep(1) # 简单地循环等待,让信号处理程序有机会工作
+# except KeyboardInterrupt:
+# pass # 如果收到 KeyboardInterrupt,信号处理程序会处理
diff --git a/services/biz_service.py b/services/biz_service.py
index 7c32c5e..62ab11d 100644
--- a/services/biz_service.py
+++ b/services/biz_service.py
@@ -113,6 +113,7 @@ class BizService():
self.wxchat.forward_video_aeskey = ''
self.wxchat.forward_video_cdnvideourl = ''
self.wxchat.forward_video_length = 0
+ self.wxchat.video_duration = 0
for intersection_wxid in intersection_wxids:
for wx_content in wx_content_list:
@@ -243,8 +244,9 @@ class BizService():
self.wxchat.forward_video_aeskey = res["aesKey"]
self.wxchat.forward_video_cdnvideourl = res["cdnThumbUrl"]
self.wxchat.forward_video_length = res["length"]
+ self.wxchat.video_duration=video_duration
else:
- ret,ret_msg,res = await self.wxchat.forward_video_async(token_id, app_id, t, self.wxchat.forward_video_aeskey, self.wxchat.forward_video_cdnvideourl, self.wxchat.forward_video_length)
+ ret,ret_msg,res = await self.wxchat.forward_video_async(token_id, app_id, t, self.wxchat.forward_video_aeskey, self.wxchat.forward_video_cdnvideourl, self.wxchat.forward_video_length,self.wxchat.video_duration)
print('转发视频')
if ret==200:
logger.info(f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}')
diff --git a/services/gewe_service.py b/services/gewe_service.py
index 3346034..4210ba7 100644
--- a/services/gewe_service.py
+++ b/services/gewe_service.py
@@ -377,7 +377,7 @@ class GeWeService:
response_object = await response.json()
return response_object.get('data', None), response_object.get('ret', None), response_object.get('msg', None)
- async def forward_video_async(self, token_id, app_id, to_wxid, aeskey, cdnvideourl, length):
+ async def forward_video_async(self, token_id, app_id, to_wxid, aeskey, cdnvideourl, length,video_duration):
api_url = f"{self.base_url}/v2/api/message/forwardVideo"
headers = {
'X-GEWE-TOKEN': token_id,
@@ -386,7 +386,7 @@ class GeWeService:
data = {
"appId": app_id,
"toWxid": to_wxid,
- "xml": f"\n\n\t\n"
+ "xml": f"\n\n\t\n"
}
async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, json=data) as response:
diff --git a/services/redis_service.py b/services/redis_service.py
index 8d7a9fd..8940a89 100644
--- a/services/redis_service.py
+++ b/services/redis_service.py
@@ -193,6 +193,38 @@ class RedisService:
await self.client.delete(queue_name)
print(f"Cleared queue {queue_name}")
+ async def increment_hash_field(self, hash_key, field, amount=1):
+ """
+ 对哈希表中的指定字段进行递增操作(原子性)。
+
+ :param hash_key: 哈希表的 key
+ :param field: 要递增的字段
+ :param amount: 递增的数值(默认为 1)
+ :return: 递增后的值
+ """
+ return await self.client.hincrby(hash_key, field, amount)
+
+ async def expire(self, key, timeout):
+ """
+ 设置 Redis 键的过期时间(单位:秒)
+
+ :param key: Redis 键
+ :param timeout: 过期时间(秒)
+ """
+ await self.client.expire(key, timeout)
+
+ async def expire_field(self, hash_key, field, timeout):
+ """
+ 通过辅助键方式设置哈希表中某个字段的过期时间
+
+ :param hash_key: 哈希表的 key
+ :param field: 要设置过期的字段
+ :param timeout: 过期时间(秒)
+ """
+ expire_key = f"{hash_key}:{field}:expire"
+ await self.client.set(expire_key, "1")
+ await self.client.expire(expire_key, timeout)
+
# Dependency injection helper function
async def get_redis_service(request: Request) -> RedisService:
return request.app.state.redis_serive
\ No newline at end of file
diff --git a/tasks.py b/tasks.py
index 3be0b7e..b562bc3 100644
--- a/tasks.py
+++ b/tasks.py
@@ -1,12 +1,16 @@
from celery_app import celery_app
from fastapi import Request,FastAPI
-import time
+import time,datetime
+from celery import Celery
+import celery.schedules
+from redbeat import RedBeatSchedulerEntry
+from datetime import timedelta
from services.redis_service import RedisService
from services.kafka_service import KafkaService
from services.gewe_service import GeWeService
from common.log import logger
-import asyncio
+import asyncio,random
@celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
@@ -190,3 +194,108 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
asyncio.set_event_loop(loop)
loop.run_until_complete(task()) # 在现有事件循环中运行任务
+
+
+
+REDIS_KEY_PATTERN = "friend_add_limit:{date}"
+REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
+
+@celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
+def add_friends_task(self,redis_config):
+ """
+ 限制每天最多 15 个,每 2 小时最多 8 个
+ """
+ async def task():
+ redis_service = RedisService()
+ await redis_service.init(**redis_config)
+ today_str = datetime.datetime.now().strftime("%Y%m%d")
+ redis_key = REDIS_KEY_PATTERN.format(date=today_str)
+
+ # 获取当前总添加数量
+ total_added = await redis_service.get_hash_field(redis_key, "total") or 0
+ last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
+
+ total_added = int(total_added)
+ last_2h_added = int(last_2h_added)
+
+ logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
+
+ # 判断是否超过限制
+ if total_added >= 15:
+ logger.warning("今日好友添加已达上限!")
+ return
+
+ if last_2h_added >= 8:
+ logger.warning("过去2小时添加已达上限!")
+ return
+
+ # 计算本次要添加的好友数量 (控制每天 5-15 个)
+ max_add = min(15 - total_added, 8 - last_2h_added)
+ if max_add <= 0:
+ return
+
+ num_to_add = min(max_add, 1) # 每次最多加 1 个
+ logger.info(f"本次添加 {num_to_add} 位好友")
+
+ # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
+ # success = add_friends(num_to_add)
+
+ success = num_to_add # 假设成功添加 num_to_add 个
+
+ # 更新 Redis 计数
+ if success > 0:
+ await redis_service.increment_hash_field(redis_key, "total", success)
+ await redis_service.increment_hash_field(redis_key, "last_2h", success)
+
+ # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
+ await redis_service.expire(redis_key, 86400) # 24小时
+ await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
+
+ logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
+
+ # 生成一个新的随机时间(5-15 分钟之间)
+ # next_interval = random.randint(10, 20)
+
+ # # 计算新的执行时间
+ # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
+
+ # # 重新注册 RedBeat 任务,确保下次执行时间不同
+ # redbeat_entry = RedBeatSchedulerEntry(
+ # name="redbeat:add_friends_task",
+ # task="tasks.add_friends_task",
+ # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
+ # args=[redis_config],
+ # app=celery_app
+ # )
+
+ # # 设置任务的下次执行时间
+ # redbeat_entry.last_run_at = next_run_time
+ # redbeat_entry.save()
+
+ # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
+
+ loop = asyncio.get_event_loop()
+
+ if loop.is_closed():
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ loop.run_until_complete(task()) # 在现有事件循环中运行任务
+
+
+@celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
+def random_scheduled_task(self,):
+ print(f"Task executed at {datetime.datetime.now()}")
+ # 随机生成下次执行时间(例如:10-60秒内的随机时间)
+ next_run_in = random.randint(10, 60)
+ print(f"Next execution will be in {next_run_in} seconds")
+
+ # 设置下次执行时间
+ entry = RedBeatSchedulerEntry(
+ name='random-task',
+ task='tasks.random_scheduled_task',
+ schedule=timedelta(seconds=next_run_in),
+ app=celery_app
+ )
+ entry.save()
+ return f"Scheduled next run in {next_run_in} seconds"
\ No newline at end of file