Przeglądaj źródła

调整微信资料同步任务

1257
H Vs 2 tygodni temu
rodzic
commit
87a949fc3d
1 zmienionych plików z 77 dodań i 30 usunięć
  1. +77
    -30
      tasks.py

+ 77
- 30
tasks.py Wyświetl plik

@@ -156,46 +156,95 @@ def background_worker_task(self, redis_config, kafka_config, gewe_config):






# @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
# def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
# '''
# 定时获取微信号资料
# '''
# async def task():
# try:
# redis_service = RedisService()
# await redis_service.init(**redis_config)
# gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])

# login_keys = []
# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
# login_keys.append(key)

# #print(login_keys)
# for k in login_keys:
# r = await redis_service.get_hash(k)
# app_id = r.get("appId")
# token_id = r.get("tokenId")
# wxid = r.get("wxid")
# status = r.get('status')
# if status == '0':
# logger.warning(f"微信号 {wxid} 已经离线")
# continue
# ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
# if ret != 200:
# logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
# continue
# nickname=profile.get("nickName")
# head_img_url=profile.get("smallHeadImgUrl")
# # print(nickname)

# nickname=profile.get("nickName")
# head_img_url=profile.get("smallHeadImgUrl")
# r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
# cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
# await redis_service.set_hash(k, cleaned_login_info)
# logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")

# except Exception as e:
# logger.error(f"任务执行过程中发生异常: {e}")

# loop = asyncio.get_event_loop()
# if loop.is_closed():
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)

# loop.run_until_complete(task()) # 在现有事件循环中运行任务


@celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
''' '''
定时获取微信号资料 定时获取微信号资料
''' '''
async def process_key(redis_service, gewe_service, key):
try:
r = await redis_service.get_hash(key)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线")
return
ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
if ret != 200:
logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
return
nickname = profile.get("nickName")
head_img_url = profile.get("smallHeadImgUrl")
r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())})
cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
await redis_service.set_hash(key, cleaned_login_info)
logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
except Exception as e:
logger.error(f"处理键 {key} 时发生异常: {e}")

async def task(): async def task():
try: try:
redis_service = RedisService() redis_service = RedisService()
await redis_service.init(**redis_config) await redis_service.init(**redis_config)
gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])

gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
login_keys = [] login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key) login_keys.append(key)

print(login_keys)
for k in login_keys:
r = await redis_service.get_hash(k)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线")
continue
ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
if ret != 200:
logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
continue
nickname=profile.get("nickName")
head_img_url=profile.get("smallHeadImgUrl")
# print(nickname)

nickname=profile.get("nickName")
head_img_url=profile.get("smallHeadImgUrl")
r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
await redis_service.set_hash(k, cleaned_login_info)
logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")

# 使用 asyncio.gather 并发处理所有键
await asyncio.gather(*[process_key(redis_service, gewe_service, key) for key in login_keys])
except Exception as e: except Exception as e:
logger.error(f"任务执行过程中发生异常: {e}") logger.error(f"任务执行过程中发生异常: {e}")


@@ -203,10 +252,8 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
if loop.is_closed(): if loop.is_closed():
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)

loop.run_until_complete(task()) # 在现有事件循环中运行任务 loop.run_until_complete(task()) # 在现有事件循环中运行任务



@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):




Ładowanie…
Anuluj
Zapisz