您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

96 行
2.5KB

  1. # from celery import Celery
  2. # # 创建 Celery 应用
  3. # celery_app = Celery(
  4. # 'ai_ops_wechat_app',
  5. # broker='redis://:telpo%231234@192.168.2.121:8090/3',
  6. # backend='redis://:telpo%231234@192.168.2.121:8090/3',
  7. # )
  8. # # 配置 Celery
  9. # celery_app.conf.update(
  10. # task_serializer='json',
  11. # accept_content=['json'],
  12. # result_serializer='json',
  13. # timezone='Asia/Shanghai',
  14. # enable_utc=True,
  15. # )
  16. # #celery_app.autodiscover_tasks(['app.tasks'])
  17. # from celery import Celery
  18. # def make_celery(app):
  19. # celery = Celery(
  20. # app.import_name,
  21. # backend=app.config['CELERY_RESULT_BACKEND'],
  22. # broker=app.config['CELERY_BROKER_URL']
  23. # )
  24. # celery.conf.update(app.config)
  25. # # 自动发现任务
  26. # celery.autodiscover_tasks(['app.tasks'])
  27. # return celery
  28. # # 初始化 Flask
  29. # app = Flask(__name__)
  30. # app.config.update(
  31. # CELERY_BROKER_URL='redis://:telpo%231234@192.168.2.121:8090/3',
  32. # CELERY_RESULT_BACKEND='redis://:telpo%231234@192.168.2.121:8090/3'
  33. # )
  34. # celery = make_celery(app)
  35. from celery import Celery
  36. import celery.schedules
  37. from redbeat import RedBeatSchedulerEntry
  38. from datetime import timedelta
  39. from config import load_config,conf
  40. load_config()
  41. redis_host=conf().get("redis_host")
  42. redis_port=conf().get("redis_port")
  43. redis_password=conf().get("redis_password")
  44. redis_db=conf().get("redis_db")
  45. # 配置 Celery
  46. celery_app = Celery(
  47. "worker",
  48. broker=f"redis://:telpo%231234@{redis_host}:{redis_port}/{redis_db}",
  49. backend=f"redis://:telpo%231234@{redis_host}:{redis_port}/{redis_db}",
  50. include=['tasks']
  51. )
  52. # 配置 redbeat 作为 Celery Beat 调度器
  53. celery_app.conf.update(
  54. timezone="Asia/Shanghai", # 设定时区
  55. beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器
  56. redbeat_redis_url=f"redis://:telpo%231234@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis
  57. ,
  58. redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突
  59. beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务
  60. )
  61. task_name = "tasks.scheduled_task"
  62. # 任务执行间隔(每 10 秒执行一次)
  63. schedule = celery.schedules.schedule(timedelta(seconds=3))
  64. # RedBeat 任务唯一 ID
  65. redbeat_entry = RedBeatSchedulerEntry(
  66. name="redbeat:scheduled_task", # 任务 ID
  67. task=task_name, # 任务名称
  68. schedule=schedule, # 任务调度时间
  69. args=[],
  70. app=celery_app
  71. )
  72. # 保存任务到 Redis
  73. redbeat_entry.save()
  74. # 自动发现任务
  75. #celery.autodiscover_tasks(['tasks'])