|
- from celery import Celery
-
-
- from config import load_config,conf
- from urllib.parse import quote
- import logging,os
-
-
- load_config()
-
- KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
- KAFKA_TOPIC = 'topic.ai.ops.wx'
- KAFKA_GROUP_ID = 'ai-ops-wx'
-
- redis_host=conf().get("redis_host")
- redis_port=conf().get("redis_port")
- redis_password=conf().get("redis_password")
- redis_db=conf().get("redis_db")
- encoded_password = quote(redis_password)
-
- # 配置 Celery
- celery_app = Celery(
- "worker",
- broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
- backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
- include=['tasks']
- )
-
- # 配置 redbeat 作为 Celery Beat 调度器
- celery_app.conf.update(
- timezone="Asia/Shanghai", # 设定时区
- beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器
- redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis
- ,
- redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突
- beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务
- )
-
-
- # 获取配置文件中的 redis_config、kafka_config、gewe_config
- redis_config = {
- 'host': redis_host,
- 'port': redis_port,
- 'password': redis_password,
- 'db': redis_db,
- }
-
-
- kafka_config = {
- 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
- 'topic': KAFKA_TOPIC,
- 'group_id': KAFKA_GROUP_ID,
- }
- gewe_config = {
- 'api_url': "http://api.geweapi.com/gewe",
- }
-
-
- # 确保 logs 文件夹存在
- log_dir = 'logs'
- if not os.path.exists(log_dir):
- os.makedirs(log_dir)
-
- # 配置日志记录
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- filename=os.path.join(log_dir, 'redbeat.log'), # 日志文件路径
- filemode='a' # 追加模式
- )
- logger = logging.getLogger('redbeat')
|