|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- from celery import Celery
- import celery.schedules
- from redbeat import RedBeatSchedulerEntry
- from datetime import timedelta
-
-
- from services.redis_service import RedisService
- from services.kafka_service import KafkaService
- from services.biz_service import BizService
-
- from config import load_config,conf
- from urllib.parse import quote
- import asyncio,os
-
-
- load_config()
-
- KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
- KAFKA_TOPIC = 'topic.ai.ops.wx'
- KAFKA_GROUP_ID = 'ai-ops-wx'
-
- redis_host=conf().get("redis_host")
- redis_port=conf().get("redis_port")
- redis_password=conf().get("redis_password")
- redis_db=conf().get("redis_db")
- encoded_password = quote(redis_password)
-
- celery_app = Celery(
- "worker",
- broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
- backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}",
- include=['tasks']
- )
-
-
- celery_app.conf.update(
- timezone="Asia/Shanghai",
- beat_scheduler="redbeat.RedBeatScheduler",
- redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}"
- ,
- redbeat_lock_timeout=60,
- beat_max_loop_interval=5
- )
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- redis_config = {
- 'host': redis_host,
- 'port': redis_port,
- 'password': redis_password,
- 'db': redis_db,
- }
-
-
- kafka_config = {
- 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
- 'topic': KAFKA_TOPIC,
- 'group_id': KAFKA_GROUP_ID,
- }
- gewe_config = {
- 'api_url': "http://api.geweapi.com/gewe",
- }
-
- scheduled_task_sync_wx_info_interval = 10
- environment = os.environ.get('environment', 'default')
- if environment != 'default':
- scheduled_task_sync_wx_info_interval = 60*11
-
-
- tasks_schedule = [
-
-
- ("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", scheduled_task_sync_wx_info_interval, [redis_config, kafka_config, gewe_config]),
-
- ]
-
-
-
-
- for task_id, task_name, interval, task_args in tasks_schedule:
- redbeat_entry = RedBeatSchedulerEntry(
- name=task_id,
- task=task_name,
- schedule=celery.schedules.schedule(timedelta(seconds=interval)),
- args=task_args,
- app=celery_app
- )
- redbeat_entry.save()
|