diff --git a/tasks.py b/tasks.py index c0a9ef6..c7f7f38 100644 --- a/tasks.py +++ b/tasks.py @@ -156,46 +156,95 @@ def background_worker_task(self, redis_config, kafka_config, gewe_config): +# @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) +# def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): +# ''' +# 定时获取微信号资料 +# ''' +# async def task(): +# try: +# redis_service = RedisService() +# await redis_service.init(**redis_config) +# gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) + +# login_keys = [] +# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): +# login_keys.append(key) + +# #print(login_keys) +# for k in login_keys: +# r = await redis_service.get_hash(k) +# app_id = r.get("appId") +# token_id = r.get("tokenId") +# wxid = r.get("wxid") +# status = r.get('status') +# if status == '0': +# logger.warning(f"微信号 {wxid} 已经离线") +# continue +# ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) +# if ret != 200: +# logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") +# continue +# nickname=profile.get("nickName") +# head_img_url=profile.get("smallHeadImgUrl") +# # print(nickname) + +# nickname=profile.get("nickName") +# head_img_url=profile.get("smallHeadImgUrl") +# r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) +# cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} +# await redis_service.set_hash(k, cleaned_login_info) +# logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") + +# except Exception as e: +# logger.error(f"任务执行过程中发生异常: {e}") + +# loop = asyncio.get_event_loop() +# if loop.is_closed(): +# loop = asyncio.new_event_loop() +# asyncio.set_event_loop(loop) + +# loop.run_until_complete(task()) # 在现有事件循环中运行任务 + + @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): ''' 定时获取微信号资料 ''' + async def process_key(redis_service, gewe_service, key): + try: + r = await redis_service.get_hash(key) + app_id = r.get("appId") + token_id = r.get("tokenId") + wxid = r.get("wxid") + status = r.get('status') + if status == '0': + logger.warning(f"微信号 {wxid} 已经离线") + return + ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) + if ret != 200: + logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") + return + nickname = profile.get("nickName") + head_img_url = profile.get("smallHeadImgUrl") + r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())}) + cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} + await redis_service.set_hash(key, cleaned_login_info) + logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") + except Exception as e: + logger.error(f"处理键 {key} 时发生异常: {e}") + async def task(): try: redis_service = RedisService() await redis_service.init(**redis_config) - gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) - + gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url']) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) - - print(login_keys) - for k in login_keys: - r = await redis_service.get_hash(k) - app_id = r.get("appId") - token_id = r.get("tokenId") - wxid = r.get("wxid") - status = r.get('status') - if status == '0': - logger.warning(f"微信号 {wxid} 已经离线") - continue - ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) - if ret != 200: - logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") - continue - nickname=profile.get("nickName") - head_img_url=profile.get("smallHeadImgUrl") - # print(nickname) - - nickname=profile.get("nickName") - head_img_url=profile.get("smallHeadImgUrl") - r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) - cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} - await redis_service.set_hash(k, cleaned_login_info) - logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") - + # 使用 asyncio.gather 并发处理所有键 + await asyncio.gather(*[process_key(redis_service, gewe_service, key) for key in login_keys]) except Exception as e: logger.error(f"任务执行过程中发生异常: {e}") @@ -203,10 +252,8 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - loop.run_until_complete(task()) # 在现有事件循环中运行任务 - @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):