from celery_app import celery_app from fastapi import Request,FastAPI import time from services.redis_service import RedisService from services.kafka_service import KafkaService from services.gewe_service import GeWeService from common.log import logger import asyncio @celery_app.task(name='tasks.add_task', bind=True, acks_late=True) def add_task(self, x, y): time.sleep(5) # 模拟长时间计算 logger.info('add') return x + y @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True) def mul_task(self, x, y): time.sleep(5) # 模拟长时间计算 return x * y # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True) # async def sync_contacts_task(self,app): # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) # return login_keys # # for k in login_keys: # # print(k) @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True) async def sync_contacts_task(self, redis_service): # Use the redis_service passed as an argument login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) return login_keys @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True) def background_worker_task(self, redis_config, kafka_config, gewe_config): async def task(): redis_service = RedisService() await redis_service.init(**redis_config) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) print(login_keys) asyncio.run(task()) # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True) # async def background_worker_task(self, redis_config, kafka_config, gewe_config): # # Initialize services inside the task # redis_service = RedisService() # await redis_service.init(**redis_config) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器 # login_keys.append(key) # print(login_keys) # kafka_service = KafkaService(**kafka_config) # await kafka_service.start() # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) # # Task logic # lock_name = "background_wxchat_thread_lock" # lock_identifier = str(time.time()) # while True: # if await redis_service.acquire_lock(lock_name, timeout=10): # try: # logger.info("分布式锁已成功获取") # # Perform task logic # finally: # await redis_service.release_lock(lock_name, lock_identifier) # break # else: # logger.info("获取分布式锁失败,等待10秒后重试...") # await asyncio.sleep(10) # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) # def scheduled_task(self): # print("定时任务执行成功!~~~~~~~~~~~~~~~~~") # return "Hello from Celery Beat + RedBeat!" # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True) # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service): # print("scheduled_task_sync_wx 定时任务执行成功!") # return "Hello from Celery Beat + RedBeat!" # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True) # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config): # ''' # 定时获取微信号资料 # ''' # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # async def task(): # try: # redis_service = RedisService() # await redis_service.init(**redis_config) # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # login_keys.append(key) # print(login_keys) # # for k in login_keys: # # r = await redis_service.get_hash(k) # # app_id = r.get("appId") # # token_id = r.get("tokenId") # # wxid = r.get("wxid") # # status = r.get('status') # # if status == '0': # # continue # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) # # if ret != 200: # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") # # continue # # nickname=profile.get("nickName") # # head_img_url=profile.get("smallHeadImgUrl") # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} # # await redis_service.set_hash(k, cleaned_login_info) # # logger.info(f"同步微信号 {wxid} 资料 成功") # # redis_service.update_hash_field(k,"nickName",nickname) # # redis_service.update_hash_field(k,"headImgUrl",head_img_url) # # redis_service.update_hash_field(k,"modify_at",int(time.time())) # except Exception as e: # logger.error(f"任务执行过程中发生异常: {e}") # print("scheduled_task_sync_wx_info 定时任务执行成功!") # return "Hello from Celery Beat + RedBeat!" # loop.run_until_complete(task()) # loop.close() @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): ''' 定时获取微信号资料 ''' async def task(): try: redis_service = RedisService() await redis_service.init(**redis_config) gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) print(login_keys) for k in login_keys: r = await redis_service.get_hash(k) app_id = r.get("appId") token_id = r.get("tokenId") wxid = r.get("wxid") status = r.get('status') if status == '0': logger.warning(f"微信号 {wxid} 已经离线: {ret}-{msg}") continue ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) if ret != 200: logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") continue nickname=profile.get("nickName") head_img_url=profile.get("smallHeadImgUrl") # print(nickname) nickname=profile.get("nickName") head_img_url=profile.get("smallHeadImgUrl") r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} await redis_service.set_hash(k, cleaned_login_info) logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") except Exception as e: logger.error(f"任务执行过程中发生异常: {e}") loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务