from celery_app import celery_app from fastapi import Request,FastAPI import time from services.redis_service import RedisService from services.kafka_service import KafkaService from services.gewe_service import GeWeService from common.log import logger import asyncio @celery_app.task(name='tasks.add_task', bind=True, acks_late=True) def add_task(self, x, y): time.sleep(5) # 模拟长时间计算 logger.info('add') return x + y @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True) def mul_task(self, x, y): time.sleep(5) # 模拟长时间计算 return x * y # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True) # async def sync_contacts_task(self,app): # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) # return login_keys # # for k in login_keys: # # print(k) @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True) async def sync_contacts_task(self, redis_service): # Use the redis_service passed as an argument login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) return login_keys @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True) def background_worker_task(self, redis_config, kafka_config, gewe_config): async def task(): redis_service = RedisService() await redis_service.init(**redis_config) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) print(login_keys) asyncio.run(task()) # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True) # async def background_worker_task(self, redis_config, kafka_config, gewe_config): # # Initialize services inside the task # redis_service = RedisService() # await redis_service.init(**redis_config) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器 # login_keys.append(key) # print(login_keys) # kafka_service = KafkaService(**kafka_config) # await kafka_service.start() # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) # # Task logic # lock_name = "background_wxchat_thread_lock" # lock_identifier = str(time.time()) # while True: # if await redis_service.acquire_lock(lock_name, timeout=10): # try: # logger.info("分布式锁已成功获取") # # Perform task logic # finally: # await redis_service.release_lock(lock_name, lock_identifier) # break # else: # logger.info("获取分布式锁失败,等待10秒后重试...") # await asyncio.sleep(10) @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) def scheduled_task(self): print("🚀 定时任务执行成功!") return "Hello from Celery Beat + RedBeat!"