You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

пре 3 недеља
пре 1 недеља
пре 3 недеља
пре 5 дана
пре 3 недеља
пре 5 дана
пре 5 дана
пре 1 недеља
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from celery import Celery
  2. from config import load_config,conf
  3. from urllib.parse import quote
  4. import logging,os
  5. load_config()
  6. KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
  7. KAFKA_TOPIC = 'topic.ai.ops.wx'
  8. KAFKA_GROUP_ID = 'ai-ops-wx'
  9. redis_host=conf().get("redis_host")
  10. redis_port=conf().get("redis_port")
  11. redis_password=conf().get("redis_password")
  12. redis_db=conf().get("redis_db")
  13. encoded_password = quote(redis_password)
  14. wx_chat_api=conf().get("wx_chat_api")
  15. # 配置 Celery
  16. celery_app = Celery(
  17. "worker",
  18. broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
  19. backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
  20. include=['tasks']
  21. )
  22. # 配置 redbeat 作为 Celery Beat 调度器
  23. celery_app.conf.update(
  24. timezone="Asia/Shanghai", # 设定时区
  25. beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器
  26. redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis
  27. ,
  28. redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突
  29. beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务
  30. )
  31. # 获取配置文件中的 redis_config、kafka_config、gewe_config
  32. redis_config = {
  33. 'host': redis_host,
  34. 'port': redis_port,
  35. 'password': redis_password,
  36. 'db': redis_db,
  37. }
  38. kafka_config = {
  39. 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
  40. 'topic': KAFKA_TOPIC,
  41. 'group_id': KAFKA_GROUP_ID,
  42. }
  43. gewe_config = {
  44. #'api_url': "http://api.geweapi.com/gewe",
  45. 'api_url':wx_chat_api,
  46. }
  47. # 确保 logs 文件夹存在
  48. log_dir = 'logs'
  49. if not os.path.exists(log_dir):
  50. os.makedirs(log_dir)
  51. # 配置日志记录
  52. logging.basicConfig(
  53. level=logging.INFO,
  54. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  55. filename=os.path.join(log_dir, 'redbeat.log'), # 日志文件路径
  56. filemode='a' # 追加模式
  57. )
  58. logger = logging.getLogger('redbeat')