您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

72 行
1.9KB

  1. from celery import Celery
  2. from config import load_config,conf
  3. from urllib.parse import quote
  4. import logging,os
  5. load_config()
  6. KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
  7. KAFKA_TOPIC = 'topic.ai.ops.wx'
  8. KAFKA_GROUP_ID = 'ai-ops-wx'
  9. redis_host=conf().get("redis_host")
  10. redis_port=conf().get("redis_port")
  11. redis_password=conf().get("redis_password")
  12. redis_db=conf().get("redis_db")
  13. encoded_password = quote(redis_password)
  14. # 配置 Celery
  15. celery_app = Celery(
  16. "worker",
  17. broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
  18. backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
  19. include=['tasks']
  20. )
  21. # 配置 redbeat 作为 Celery Beat 调度器
  22. celery_app.conf.update(
  23. timezone="Asia/Shanghai", # 设定时区
  24. beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器
  25. redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis
  26. ,
  27. redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突
  28. beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务
  29. )
  30. # 获取配置文件中的 redis_config、kafka_config、gewe_config
  31. redis_config = {
  32. 'host': redis_host,
  33. 'port': redis_port,
  34. 'password': redis_password,
  35. 'db': redis_db,
  36. }
  37. kafka_config = {
  38. 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
  39. 'topic': KAFKA_TOPIC,
  40. 'group_id': KAFKA_GROUP_ID,
  41. }
  42. gewe_config = {
  43. 'api_url': "http://api.geweapi.com/gewe",
  44. }
  45. # 确保 logs 文件夹存在
  46. log_dir = 'logs'
  47. if not os.path.exists(log_dir):
  48. os.makedirs(log_dir)
  49. # 配置日志记录
  50. logging.basicConfig(
  51. level=logging.INFO,
  52. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  53. filename=os.path.join(log_dir, 'redbeat.log'), # 日志文件路径
  54. filemode='a' # 追加模式
  55. )
  56. logger = logging.getLogger('redbeat')