diff --git a/app/endpoints/agent_endpoint.py b/app/endpoints/agent_endpoint.py index cfdd478..4ea75b8 100644 --- a/app/endpoints/agent_endpoint.py +++ b/app/endpoints/agent_endpoint.py @@ -183,12 +183,14 @@ async def waitting_login_result(request: Request, token_id, app_id,region_id, ag await request.app.state.gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms) logger.info(f'{wxid} 全量好友信息推送到kafka') # 联系人推送到kafka - k_message=wx_all_contacts_message(wxid,data) + #k_message = wx_all_contacts_message(wxid, data) + k_message = wx_all_contacts_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka logger.info(f'{wxid} 全量群信息推送到kafka') - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) break diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index ebbf32c..03ee9a5 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -325,8 +325,9 @@ async def handle_mod_contacts_async(request: Request,token_id,msg,wxid): # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # 全量推送 - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) async def handle_del_contacts_async(request: Request,token_id,msg,wxid): @@ -381,7 +382,8 @@ async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid, print(f'{wxid}的好友资料变更,数量 {len(friend_wxids)}') data = await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids) - k_message = wx_all_contacts_message(wxid, data) + #k_message = wx_all_contacts_message(wxid, data) + k_message = wx_all_contacts_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) else: @@ -895,8 +897,9 @@ async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid, # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) else: @@ -999,8 +1002,9 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data # await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : @@ -1038,8 +1042,9 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data # await request.app.state.kafka_service.send_message_async(k_message)、 # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) if 'roomtoolstips' in msg_content_xml : @@ -1054,8 +1059,9 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data # await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) except ET.ParseError as e: @@ -1083,8 +1089,9 @@ async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data # await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) return diff --git a/app/main.py b/app/main.py index 93a8911..a817918 100644 --- a/app/main.py +++ b/app/main.py @@ -116,11 +116,13 @@ async def startup_sync_data_task_async(redis_service: RedisService, kafka_servic await gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms) logger.info(f'{wxid} 好友信息推送到kafka') # 联系人推送到kafka - k_message = wx_all_contacts_message(wxid, data) + #k_message = wx_all_contacts_message(wxid, data) + k_message = wx_all_contacts_key_message(wxid) await kafka_service.send_message_async(k_message) # 全量群信息推送到kafka - all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await kafka_service.send_message_async(k_message) except Exception as e: diff --git a/common/utils.py b/common/utils.py index 1d3e415..6f52fb8 100644 --- a/common/utils.py +++ b/common/utils.py @@ -113,6 +113,20 @@ def wx_groups_info_members_message(wxid:str,data:dict|list)->str: data=kafka_base_message("all-groups",content) return json.dumps(data, separators=(',', ':'), ensure_ascii=False) +def wx_all_contacts_key_message(wxid:str)->str: + content = {"wxid": wxid,"key":f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" } + data=kafka_base_message("all-contacts-key",content) + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + +def wx_groups_info_members_key_message(wxid:str)->str: + content = { + "wxid": wxid, + "info_key" :f"__AI_OPS_WX__:GROUPS_INFO:{wxid}", + "members_key":f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}" + } + data=kafka_base_message("all-groups-key",content) + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + def wx_mod_group_info_members_message(wxid:str,data:dict|list)->str: content = {"wxid": wxid,"group_info":data} data=kafka_base_message("mod-group",content)