Przeglądaj źródła

群全量和联系人全量使用key

1257
H Vs 3 tygodni temu
rodzic
commit
9d3db737fd
4 zmienionych plików z 44 dodań i 19 usunięć
  1. +5
    -3
      app/endpoints/agent_endpoint.py
  2. +20
    -13
      app/endpoints/pipeline_endpoint.py
  3. +5
    -3
      app/main.py
  4. +14
    -0
      common/utils.py

+ 5
- 3
app/endpoints/agent_endpoint.py Wyświetl plik

@@ -183,12 +183,14 @@ async def waitting_login_result(request: Request, token_id, app_id,region_id, ag
await request.app.state.gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms) await request.app.state.gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms)
logger.info(f'{wxid} 全量好友信息推送到kafka') logger.info(f'{wxid} 全量好友信息推送到kafka')
# 联系人推送到kafka # 联系人推送到kafka
k_message=wx_all_contacts_message(wxid,data)
#k_message = wx_all_contacts_message(wxid, data)
k_message = wx_all_contacts_key_message(wxid)
await request.app.state.kafka_service.send_message_async(k_message) await request.app.state.kafka_service.send_message_async(k_message)
# 全量群信息推送到kafka # 全量群信息推送到kafka
logger.info(f'{wxid} 全量群信息推送到kafka') logger.info(f'{wxid} 全量群信息推送到kafka')
all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
#all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
#k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
k_message=wx_groups_info_members_key_message(wxid)
await request.app.state.kafka_service.send_message_async(k_message) await request.app.state.kafka_service.send_message_async(k_message)


break break


+ 20
- 13
app/endpoints/pipeline_endpoint.py Wyświetl plik

@@ -325,8 +325,9 @@ async def handle_mod_contacts_async(request: Request,token_id,msg,wxid):
# group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
# k_message=wx_mod_group_info_members_message(wxid,group_info_members) # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
# 全量推送 # 全量推送
all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
#all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
#k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
k_message=wx_groups_info_members_key_message(wxid)
await request.app.state.kafka_service.send_message_async(k_message) await request.app.state.kafka_service.send_message_async(k_message)


async def handle_del_contacts_async(request: Request,token_id,msg,wxid): async def handle_del_contacts_async(request: Request,token_id,msg,wxid):
@@ -381,7 +382,8 @@ async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid,
print(f'{wxid}的好友资料变更,数量 {len(friend_wxids)}') print(f'{wxid}的好友资料变更,数量 {len(friend_wxids)}')
data = await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids) data = await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
k_message = wx_all_contacts_message(wxid, data)
#k_message = wx_all_contacts_message(wxid, data)
k_message = wx_all_contacts_key_message(wxid)
await request.app.state.kafka_service.send_message_async(k_message) await request.app.state.kafka_service.send_message_async(k_message)
else: else:
@@ -895,8 +897,9 @@ async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid,
# group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
# k_message=wx_mod_group_info_members_message(wxid,group_info_members) # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
# 全量群信息推送到kafka # 全量群信息推送到kafka
all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
#all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
#k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
k_message=wx_groups_info_members_key_message(wxid)
await request.app.state.kafka_service.send_message_async(k_message) await request.app.state.kafka_service.send_message_async(k_message)


else: else:
@@ -999,8 +1002,9 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data
# await request.app.state.kafka_service.send_message_async(k_message) # await request.app.state.kafka_service.send_message_async(k_message)


# 全量群信息推送到kafka # 全量群信息推送到kafka
all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
#all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
#k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
k_message=wx_groups_info_members_key_message(wxid)
await request.app.state.kafka_service.send_message_async(k_message) await request.app.state.kafka_service.send_message_async(k_message)


if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
@@ -1038,8 +1042,9 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data
# await request.app.state.kafka_service.send_message_async(k_message)、 # await request.app.state.kafka_service.send_message_async(k_message)、
# 全量群信息推送到kafka # 全量群信息推送到kafka
all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
#all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
#k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
k_message=wx_groups_info_members_key_message(wxid)
await request.app.state.kafka_service.send_message_async(k_message) await request.app.state.kafka_service.send_message_async(k_message)


if 'roomtoolstips' in msg_content_xml : if 'roomtoolstips' in msg_content_xml :
@@ -1054,8 +1059,9 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data
# await request.app.state.kafka_service.send_message_async(k_message) # await request.app.state.kafka_service.send_message_async(k_message)


# 全量群信息推送到kafka # 全量群信息推送到kafka
all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
#all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
#k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
k_message=wx_groups_info_members_key_message(wxid)
await request.app.state.kafka_service.send_message_async(k_message) await request.app.state.kafka_service.send_message_async(k_message)


except ET.ParseError as e: except ET.ParseError as e:
@@ -1083,8 +1089,9 @@ async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data
# await request.app.state.kafka_service.send_message_async(k_message) # await request.app.state.kafka_service.send_message_async(k_message)


# 全量群信息推送到kafka # 全量群信息推送到kafka
all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
#all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
#k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
k_message=wx_groups_info_members_key_message(wxid)
await request.app.state.kafka_service.send_message_async(k_message) await request.app.state.kafka_service.send_message_async(k_message)


return return


+ 5
- 3
app/main.py Wyświetl plik

@@ -116,11 +116,13 @@ async def startup_sync_data_task_async(redis_service: RedisService, kafka_servic
await gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms) await gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms)
logger.info(f'{wxid} 好友信息推送到kafka') logger.info(f'{wxid} 好友信息推送到kafka')
# 联系人推送到kafka # 联系人推送到kafka
k_message = wx_all_contacts_message(wxid, data)
#k_message = wx_all_contacts_message(wxid, data)
k_message = wx_all_contacts_key_message(wxid)
await kafka_service.send_message_async(k_message) await kafka_service.send_message_async(k_message)
# 全量群信息推送到kafka # 全量群信息推送到kafka
all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid)
k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
#all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid)
#k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
k_message=wx_groups_info_members_key_message(wxid)
await kafka_service.send_message_async(k_message) await kafka_service.send_message_async(k_message)


except Exception as e: except Exception as e:


+ 14
- 0
common/utils.py Wyświetl plik

@@ -113,6 +113,20 @@ def wx_groups_info_members_message(wxid:str,data:dict|list)->str:
data=kafka_base_message("all-groups",content) data=kafka_base_message("all-groups",content)
return json.dumps(data, separators=(',', ':'), ensure_ascii=False) return json.dumps(data, separators=(',', ':'), ensure_ascii=False)


def wx_all_contacts_key_message(wxid:str)->str:
content = {"wxid": wxid,"key":f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" }
data=kafka_base_message("all-contacts-key",content)
return json.dumps(data, separators=(',', ':'), ensure_ascii=False)

def wx_groups_info_members_key_message(wxid:str)->str:
content = {
"wxid": wxid,
"info_key" :f"__AI_OPS_WX__:GROUPS_INFO:{wxid}",
"members_key":f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
}
data=kafka_base_message("all-groups-key",content)
return json.dumps(data, separators=(',', ':'), ensure_ascii=False)

def wx_mod_group_info_members_message(wxid:str,data:dict|list)->str: def wx_mod_group_info_members_message(wxid:str,data:dict|list)->str:
content = {"wxid": wxid,"group_info":data} content = {"wxid": wxid,"group_info":data}
data=kafka_base_message("mod-group",content) data=kafka_base_message("mod-group",content)


Ładowanie…
Anuluj
Zapisz