瀏覽代碼

调整

1257
H Vs 1 周之前
父節點
當前提交
98b31e861f
共有 2 個文件被更改,包括 30 次插入26 次删除
  1. +3
    -3
      celery_app.py
  2. +27
    -23
      tasks.py

+ 3
- 3
celery_app.py 查看文件

@@ -33,13 +33,13 @@ environment = os.environ.get('environment', 'default')


if environment == 'production':
scheduled_task_sync_wx_info_interval = 60*11
scheduled_task_sync_wx_info_interval = 60*10
scheduled_task_add_contacts_from_chatrooms_interval = 60*11
elif environment == 'test':
scheduled_task_sync_wx_info_interval = 60*11
scheduled_task_sync_wx_info_interval = 60*10
scheduled_task_add_contacts_from_chatrooms_interval = 60*11
else:
scheduled_task_sync_wx_info_interval = 6000
scheduled_task_sync_wx_info_interval = 10
scheduled_task_add_contacts_from_chatrooms_interval=6



+ 27
- 23
tasks.py 查看文件

@@ -212,28 +212,29 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
'''
定时获取微信号资料
'''
async def process_key(redis_service, gewe_service, key):
try:
r = await redis_service.get_hash(key)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线")
return
ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
if ret != 200:
logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
return
nickname = profile.get("nickName")
head_img_url = profile.get("smallHeadImgUrl")
r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())})
cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
await redis_service.set_hash(key, cleaned_login_info)
logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
except Exception as e:
logger.error(f"处理键 {key} 时发生异常: {e}")
async def process_key(redis_service, gewe_service, semaphore, key):
async with semaphore: # 使用 Semaphore 控制并发
try:
r = await redis_service.get_hash(key)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线")
return
ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
if ret != 200:
logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
return
nickname = profile.get("nickName")
head_img_url = profile.get("smallHeadImgUrl")
r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())})
cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
await redis_service.set_hash(key, cleaned_login_info)
logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
except Exception as e:
logger.error(f"处理键 {key} 时发生异常: {e}")

async def task():
try:
@@ -243,8 +244,11 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key)

# 设置 Semaphore,限制并发数为 10
semaphore = asyncio.Semaphore(10)
# 使用 asyncio.gather 并发处理所有键
await asyncio.gather(*[process_key(redis_service, gewe_service, key) for key in login_keys])
await asyncio.gather(*[process_key(redis_service, gewe_service, semaphore, key) for key in login_keys])
except Exception as e:
logger.error(f"任务执行过程中发生异常: {e}")



Loading…
取消
儲存