From fefe1f5b40c7cc0d06c67c731d75a9584c0bcd40 Mon Sep 17 00:00:00 2001 From: H Vs Date: Tue, 25 Mar 2025 18:09:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=B0=83=E5=BA=A6=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/main.py | 28 +++++++++- app/tasks.py | 28 ---------- app/celery_app.py => celery_app.py | 33 ++++++++++- requirements.txt | 3 +- run.py | 21 +++++-- tasks.py | 88 ++++++++++++++++++++++++++++++ 6 files changed, 160 insertions(+), 41 deletions(-) delete mode 100644 app/tasks.py rename app/celery_app.py => celery_app.py (57%) create mode 100644 tasks.py diff --git a/app/main.py b/app/main.py index 47d6291..d28fe9a 100644 --- a/app/main.py +++ b/app/main.py @@ -20,6 +20,7 @@ from app.endpoints.groups_endpoint import groups_router from app.endpoints.sns_endpoint import sns_router from app.endpoints.agent_endpoint import agent_router from app.endpoints.pipeline_endpoint import messages_router +from tasks import background_worker_task @@ -31,7 +32,7 @@ from app.middleware import http_context from celery.result import AsyncResult -from app.tasks import add_task,sync_contacts_task +from tasks import add_task,sync_contacts_task from config import load_config from config import conf from common.utils import * @@ -164,8 +165,29 @@ async def lifespan(app: FastAPI): # await kafka_service.stop() #task = asyncio.create_task(kafka_consumer()) - task=asyncio.create_task(background_worker(redis_service,kafka_service,gewe_service)) - background_tasks.add(task) + + redis_config = { + 'host': conf().get("redis_host"), + 'port': conf().get("redis_port"), + 'password': conf().get("redis_password"), + 'db': conf().get("redis_db"), + } + kafka_config = { + 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, + 'topic': KAFKA_TOPIC, + 'group_id': KAFKA_GROUP_ID, + } + gewe_config = { + 'api_url': "http://api.geweapi.com/gewe", + } + + # Use Celery task + worker_task = background_worker_task.delay(redis_config, kafka_config, gewe_config) + background_tasks.add(worker_task) + environment = os.environ.get('environment', 'default') + if environment != 'default': + task=asyncio.create_task(background_worker(redis_service,kafka_service,gewe_service)) + background_tasks.add(task) try: yield # 应用程序运行期间 finally: diff --git a/app/tasks.py b/app/tasks.py deleted file mode 100644 index 5f1f369..0000000 --- a/app/tasks.py +++ /dev/null @@ -1,28 +0,0 @@ -from app.celery_app import celery -from fastapi import Request,FastAPI -import time - -@celery.task(name='app.tasks.add_task', bind=True, acks_late=True) -def add_task(self, x, y): - time.sleep(5) # 模拟长时间计算 - return x + y - - -@celery.task(name='app.tasks.mul_task', bind=True, acks_late=True) -def mul_task(self, x, y): - time.sleep(5) # 模拟长时间计算 - return x * y - - -# @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True) -# async def sync_contacts_task(self,app): -# login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) -# return login_keys -# # for k in login_keys: -# # print(k) - -@celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True) -async def sync_contacts_task(self, redis_service): - # Use the redis_service passed as an argument - login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) - return login_keys \ No newline at end of file diff --git a/app/celery_app.py b/celery_app.py similarity index 57% rename from app/celery_app.py rename to celery_app.py index ea7b020..48b5e91 100644 --- a/app/celery_app.py +++ b/celery_app.py @@ -41,14 +41,41 @@ # celery = make_celery(app) from celery import Celery +import celery.schedules +from redbeat import RedBeatSchedulerEntry +from datetime import timedelta # 配置 Celery -celery = Celery( +celery_app = Celery( "worker", broker="redis://:telpo%231234@192.168.2.121:8090/3", backend="redis://:telpo%231234@192.168.2.121:8090/3", - include=['app.tasks'] + include=['tasks'] ) +# 配置 redbeat 作为 Celery Beat 调度器 +celery_app.conf.update( + timezone="Asia/Shanghai", # 设定时区 + beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 + redbeat_redis_url="redis://:telpo%231234@192.168.2.121:8090/3" # redbeat 存储任务调度信息的 Redis +) + +task_name = "tasks.scheduled_task" +# 任务执行间隔(每 10 秒执行一次) +schedule = celery.schedules.schedule(timedelta(seconds=10)) + +# RedBeat 任务唯一 ID +redbeat_entry = RedBeatSchedulerEntry( + name="redbeat:scheduled_task", # 任务 ID + task=task_name, # 任务名称 + schedule=schedule, # 任务调度时间 + args=[], + app=celery_app + +) + +# 保存任务到 Redis +redbeat_entry.save() + # 自动发现任务 -celery.autodiscover_tasks(['app.tasks']) +#celery.autodiscover_tasks(['tasks']) diff --git a/requirements.txt b/requirements.txt index dbc722f..749cc08 100644 --- a/requirements.txt +++ b/requirements.txt @@ -55,4 +55,5 @@ pydantic aioredis>=2.0.0 requests aiokafka -aiofiles \ No newline at end of file +aiofiles +celery-redbeat \ No newline at end of file diff --git a/run.py b/run.py index 9c2b445..cefe7a1 100644 --- a/run.py +++ b/run.py @@ -3,6 +3,7 @@ import sys import os def start_fastapi(): + """ 启动 FastAPI 服务 """ environment = os.environ.get('environment', 'default') if environment == 'default': process = subprocess.Popen(["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]) @@ -10,18 +11,26 @@ def start_fastapi(): process = subprocess.Popen(["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "5000"]) return process -def start_celery(): +def start_celery_worker(): + """ 启动 Celery Worker """ if sys.platform == "win32": - process = subprocess.Popen(["celery", "-A", "app.celery_app", "worker", "--loglevel=info", "-P", "solo"]) + process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info", "-P", "solo"]) else: - process = subprocess.Popen(["celery", "-A", "app.celery_app", "worker", "--loglevel=info"]) + process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info"]) + return process + +def start_celery_beat(): + """ 启动 Celery Beat,使用 RedBeat 作为调度器 """ + process = subprocess.Popen(["celery", "-A", "celery_app", "beat", "--scheduler", "redbeat.RedBeatScheduler", "--loglevel=info"]) return process if __name__ == "__main__": - # 启动 FastAPI 和 Celery + # 启动 FastAPI、Celery Worker 和 Celery Beat fastapi_process = start_fastapi() - # celery_process = start_celery() + celery_worker_process = start_celery_worker() + celery_beat_process = start_celery_beat() # 等待子进程完成 fastapi_process.wait() - # celery_process.wait() \ No newline at end of file + celery_worker_process.wait() + celery_beat_process.wait() diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..999ff41 --- /dev/null +++ b/tasks.py @@ -0,0 +1,88 @@ +from celery_app import celery_app +from fastapi import Request,FastAPI +import time + +from services.redis_service import RedisService +from services.kafka_service import KafkaService +from services.gewe_service import GeWeService +from common.log import logger +import asyncio + + +@celery_app.task(name='tasks.add_task', bind=True, acks_late=True) +def add_task(self, x, y): + time.sleep(5) # 模拟长时间计算 + logger.info('add') + return x + y + + +@celery_app.task(name='tasks.mul_task', bind=True, acks_late=True) +def mul_task(self, x, y): + time.sleep(5) # 模拟长时间计算 + return x * y + + +# @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True) +# async def sync_contacts_task(self,app): +# login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) +# return login_keys +# # for k in login_keys: +# # print(k) + +@celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True) +async def sync_contacts_task(self, redis_service): + # Use the redis_service passed as an argument + login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) + return login_keys + + +@celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True) +def background_worker_task(self, redis_config, kafka_config, gewe_config): + async def task(): + redis_service = RedisService() + await redis_service.init(**redis_config) + login_keys = [] + async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): + login_keys.append(key) + print(login_keys) + + asyncio.run(task()) + +# @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True) +# async def background_worker_task(self, redis_config, kafka_config, gewe_config): +# # Initialize services inside the task +# redis_service = RedisService() +# await redis_service.init(**redis_config) + +# login_keys = [] +# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器 +# login_keys.append(key) + +# print(login_keys) + + # kafka_service = KafkaService(**kafka_config) + # await kafka_service.start() + + # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) + + # # Task logic + # lock_name = "background_wxchat_thread_lock" + # lock_identifier = str(time.time()) + # while True: + # if await redis_service.acquire_lock(lock_name, timeout=10): + # try: + # logger.info("分布式锁已成功获取") + # # Perform task logic + # finally: + # await redis_service.release_lock(lock_name, lock_identifier) + # break + # else: + # logger.info("获取分布式锁失败,等待10秒后重试...") + # await asyncio.sleep(10) + + + +@celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) +def scheduled_task(self): + print("🚀 定时任务执行成功!") + return "Hello from Celery Beat + RedBeat!" \ No newline at end of file