From 98b31e861f208565fda45ebd8231afa56f06d096 Mon Sep 17 00:00:00 2001 From: H Vs Date: Fri, 11 Apr 2025 11:36:12 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- celery_app.py | 6 +++--- tasks.py | 50 +++++++++++++++++++++++++++----------------------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/celery_app.py b/celery_app.py index 480504f..014cce9 100644 --- a/celery_app.py +++ b/celery_app.py @@ -33,13 +33,13 @@ environment = os.environ.get('environment', 'default') if environment == 'production': - scheduled_task_sync_wx_info_interval = 60*11 + scheduled_task_sync_wx_info_interval = 60*10 scheduled_task_add_contacts_from_chatrooms_interval = 60*11 elif environment == 'test': - scheduled_task_sync_wx_info_interval = 60*11 + scheduled_task_sync_wx_info_interval = 60*10 scheduled_task_add_contacts_from_chatrooms_interval = 60*11 else: - scheduled_task_sync_wx_info_interval = 6000 + scheduled_task_sync_wx_info_interval = 10 scheduled_task_add_contacts_from_chatrooms_interval=6 diff --git a/tasks.py b/tasks.py index c7f7f38..50e6f2b 100644 --- a/tasks.py +++ b/tasks.py @@ -212,28 +212,29 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): ''' 定时获取微信号资料 ''' - async def process_key(redis_service, gewe_service, key): - try: - r = await redis_service.get_hash(key) - app_id = r.get("appId") - token_id = r.get("tokenId") - wxid = r.get("wxid") - status = r.get('status') - if status == '0': - logger.warning(f"微信号 {wxid} 已经离线") - return - ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) - if ret != 200: - logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") - return - nickname = profile.get("nickName") - head_img_url = profile.get("smallHeadImgUrl") - r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())}) - cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} - await redis_service.set_hash(key, cleaned_login_info) - logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") - except Exception as e: - logger.error(f"处理键 {key} 时发生异常: {e}") + async def process_key(redis_service, gewe_service, semaphore, key): + async with semaphore: # 使用 Semaphore 控制并发 + try: + r = await redis_service.get_hash(key) + app_id = r.get("appId") + token_id = r.get("tokenId") + wxid = r.get("wxid") + status = r.get('status') + if status == '0': + logger.warning(f"微信号 {wxid} 已经离线") + return + ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) + if ret != 200: + logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") + return + nickname = profile.get("nickName") + head_img_url = profile.get("smallHeadImgUrl") + r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())}) + cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} + await redis_service.set_hash(key, cleaned_login_info) + logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") + except Exception as e: + logger.error(f"处理键 {key} 时发生异常: {e}") async def task(): try: @@ -243,8 +244,11 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) + + # 设置 Semaphore,限制并发数为 10 + semaphore = asyncio.Semaphore(10) # 使用 asyncio.gather 并发处理所有键 - await asyncio.gather(*[process_key(redis_service, gewe_service, key) for key in login_keys]) + await asyncio.gather(*[process_key(redis_service, gewe_service, semaphore, key) for key in login_keys]) except Exception as e: logger.error(f"任务执行过程中发生异常: {e}")