|
- from celery_app import celery_app
- from fastapi import Request,FastAPI
- import time
-
- from services.redis_service import RedisService
- from services.kafka_service import KafkaService
- from services.gewe_service import GeWeService
- from common.log import logger
- import asyncio
-
-
- @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
- def add_task(self, x, y):
- time.sleep(5) # 模拟长时间计算
- logger.info('add')
- return x + y
-
-
- @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
- def mul_task(self, x, y):
- time.sleep(5) # 模拟长时间计算
- return x * y
-
-
- # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
- # async def sync_contacts_task(self,app):
- # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
- # return login_keys
- # # for k in login_keys:
- # # print(k)
-
- @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
- async def sync_contacts_task(self, redis_service):
- # Use the redis_service passed as an argument
- login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
- return login_keys
-
-
- @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
- def background_worker_task(self, redis_config, kafka_config, gewe_config):
- async def task():
- redis_service = RedisService()
- await redis_service.init(**redis_config)
- login_keys = []
- async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- login_keys.append(key)
- print(login_keys)
-
- asyncio.run(task())
-
- # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
- # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
- # # Initialize services inside the task
- # redis_service = RedisService()
- # await redis_service.init(**redis_config)
-
- # login_keys = []
- # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
- # login_keys.append(key)
-
- # print(login_keys)
-
- # kafka_service = KafkaService(**kafka_config)
- # await kafka_service.start()
-
- # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
-
- # # Task logic
- # lock_name = "background_wxchat_thread_lock"
- # lock_identifier = str(time.time())
- # while True:
- # if await redis_service.acquire_lock(lock_name, timeout=10):
- # try:
- # logger.info("分布式锁已成功获取")
- # # Perform task logic
- # finally:
- # await redis_service.release_lock(lock_name, lock_identifier)
- # break
- # else:
- # logger.info("获取分布式锁失败,等待10秒后重试...")
- # await asyncio.sleep(10)
-
-
-
- # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
- # def scheduled_task(self):
- # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
- # return "Hello from Celery Beat + RedBeat!"
-
-
- # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
- # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
- # print("scheduled_task_sync_wx 定时任务执行成功!")
- # return "Hello from Celery Beat + RedBeat!"
-
- # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
- # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
- # '''
- # 定时获取微信号资料
- # '''
- # loop = asyncio.new_event_loop()
- # asyncio.set_event_loop(loop)
- # async def task():
- # try:
- # redis_service = RedisService()
- # await redis_service.init(**redis_config)
- # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
- # login_keys = []
- # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- # login_keys.append(key)
-
- # print(login_keys)
- # # for k in login_keys:
- # # r = await redis_service.get_hash(k)
- # # app_id = r.get("appId")
- # # token_id = r.get("tokenId")
- # # wxid = r.get("wxid")
- # # status = r.get('status')
- # # if status == '0':
- # # continue
- # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
- # # if ret != 200:
- # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
- # # continue
- # # nickname=profile.get("nickName")
- # # head_img_url=profile.get("smallHeadImgUrl")
- # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
- # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
- # # await redis_service.set_hash(k, cleaned_login_info)
- # # logger.info(f"同步微信号 {wxid} 资料 成功")
- # # redis_service.update_hash_field(k,"nickName",nickname)
- # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
- # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
- # except Exception as e:
- # logger.error(f"任务执行过程中发生异常: {e}")
-
- # print("scheduled_task_sync_wx_info 定时任务执行成功!")
- # return "Hello from Celery Beat + RedBeat!"
-
- # loop.run_until_complete(task())
- # loop.close()
-
-
-
- @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
- def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
- '''
- 定时获取微信号资料
- '''
- async def task():
- try:
- redis_service = RedisService()
- await redis_service.init(**redis_config)
- gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
- login_keys = []
- async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- login_keys.append(key)
-
- print(login_keys)
- for k in login_keys:
- r = await redis_service.get_hash(k)
- app_id = r.get("appId")
- token_id = r.get("tokenId")
- wxid = r.get("wxid")
- status = r.get('status')
- if status == '0':
- logger.warning(f"微信号 {wxid} 已经离线: {ret}-{msg}")
- continue
- ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
- if ret != 200:
- logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
- continue
- nickname=profile.get("nickName")
- head_img_url=profile.get("smallHeadImgUrl")
- # print(nickname)
-
- nickname=profile.get("nickName")
- head_img_url=profile.get("smallHeadImgUrl")
- r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
- cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
- await redis_service.set_hash(k, cleaned_login_info)
- logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
-
- except Exception as e:
- logger.error(f"任务执行过程中发生异常: {e}")
-
- loop = asyncio.get_event_loop()
- if loop.is_closed():
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- loop.run_until_complete(task()) # 在现有事件循环中运行任务
|