From 9d3db737fd9266db5f005d7729856a34728c1500 Mon Sep 17 00:00:00 2001 From: H Vs Date: Wed, 2 Apr 2025 15:16:19 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BE=A4=E5=85=A8=E9=87=8F=E5=92=8C=E8=81=94?= =?UTF-8?q?=E7=B3=BB=E4=BA=BA=E5=85=A8=E9=87=8F=E4=BD=BF=E7=94=A8key?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/endpoints/agent_endpoint.py | 8 +++++--- app/endpoints/pipeline_endpoint.py | 33 ++++++++++++++++++------------ app/main.py | 8 +++++--- common/utils.py | 14 +++++++++++++ 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/app/endpoints/agent_endpoint.py b/app/endpoints/agent_endpoint.py index cfdd478..4ea75b8 100644 --- a/app/endpoints/agent_endpoint.py +++ b/app/endpoints/agent_endpoint.py @@ -183,12 +183,14 @@ async def waitting_login_result(request: Request, token_id, app_id,region_id, ag await request.app.state.gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms) logger.info(f'{wxid} 全量好友信息推送到kafka') # 联系人推送到kafka - k_message=wx_all_contacts_message(wxid,data) + #k_message = wx_all_contacts_message(wxid, data) + k_message = wx_all_contacts_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka logger.info(f'{wxid} 全量群信息推送到kafka') - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) break diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index ebbf32c..03ee9a5 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -325,8 +325,9 @@ async def handle_mod_contacts_async(request: Request,token_id,msg,wxid): # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # 全量推送 - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) async def handle_del_contacts_async(request: Request,token_id,msg,wxid): @@ -381,7 +382,8 @@ async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid, print(f'{wxid}的好友资料变更,数量 {len(friend_wxids)}') data = await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids) - k_message = wx_all_contacts_message(wxid, data) + #k_message = wx_all_contacts_message(wxid, data) + k_message = wx_all_contacts_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) else: @@ -895,8 +897,9 @@ async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid, # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) else: @@ -999,8 +1002,9 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data # await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : @@ -1038,8 +1042,9 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data # await request.app.state.kafka_service.send_message_async(k_message)、 # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) if 'roomtoolstips' in msg_content_xml : @@ -1054,8 +1059,9 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data # await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) except ET.ParseError as e: @@ -1083,8 +1089,9 @@ async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data # await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka - all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) return diff --git a/app/main.py b/app/main.py index 93a8911..a817918 100644 --- a/app/main.py +++ b/app/main.py @@ -116,11 +116,13 @@ async def startup_sync_data_task_async(redis_service: RedisService, kafka_servic await gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms) logger.info(f'{wxid} 好友信息推送到kafka') # 联系人推送到kafka - k_message = wx_all_contacts_message(wxid, data) + #k_message = wx_all_contacts_message(wxid, data) + k_message = wx_all_contacts_key_message(wxid) await kafka_service.send_message_async(k_message) # 全量群信息推送到kafka - all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid) - k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + #all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid) + #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) + k_message=wx_groups_info_members_key_message(wxid) await kafka_service.send_message_async(k_message) except Exception as e: diff --git a/common/utils.py b/common/utils.py index 1d3e415..6f52fb8 100644 --- a/common/utils.py +++ b/common/utils.py @@ -113,6 +113,20 @@ def wx_groups_info_members_message(wxid:str,data:dict|list)->str: data=kafka_base_message("all-groups",content) return json.dumps(data, separators=(',', ':'), ensure_ascii=False) +def wx_all_contacts_key_message(wxid:str)->str: + content = {"wxid": wxid,"key":f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" } + data=kafka_base_message("all-contacts-key",content) + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + +def wx_groups_info_members_key_message(wxid:str)->str: + content = { + "wxid": wxid, + "info_key" :f"__AI_OPS_WX__:GROUPS_INFO:{wxid}", + "members_key":f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}" + } + data=kafka_base_message("all-groups-key",content) + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + def wx_mod_group_info_members_message(wxid:str,data:dict|list)->str: content = {"wxid": wxid,"group_info":data} data=kafka_base_message("mod-group",content)