from celery import Celery from config import load_config,conf from urllib.parse import quote import logging,os load_config() KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") KAFKA_TOPIC = 'topic.ai.ops.wx' KAFKA_GROUP_ID = 'ai-ops-wx' redis_host=conf().get("redis_host") redis_port=conf().get("redis_port") redis_password=conf().get("redis_password") redis_db=conf().get("redis_db") encoded_password = quote(redis_password) wx_chat_api=conf().get("wx_chat_api") # 配置 Celery celery_app = Celery( "worker", broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", include=['tasks'] ) # 配置 redbeat 作为 Celery Beat 调度器 celery_app.conf.update( timezone="Asia/Shanghai", # 设定时区 beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis , redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突 beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务 ) # 获取配置文件中的 redis_config、kafka_config、gewe_config redis_config = { 'host': redis_host, 'port': redis_port, 'password': redis_password, 'db': redis_db, } kafka_config = { 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, 'topic': KAFKA_TOPIC, 'group_id': KAFKA_GROUP_ID, } gewe_config = { #'api_url': "http://api.geweapi.com/gewe", 'api_url':wx_chat_api, } # 确保 logs 文件夹存在 log_dir = 'logs' if not os.path.exists(log_dir): os.makedirs(log_dir) # 配置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename=os.path.join(log_dir, 'redbeat.log'), # 日志文件路径 filemode='a' # 追加模式 ) logger = logging.getLogger('redbeat')