diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index 6ec221b..26270bd 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -266,11 +266,15 @@ async def handle_mod_contacts_async(request: Request,token_id,msg,wxid): logger.info('群信息变更通知') chatroom_id=user_name ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3) + logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) - group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) - k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + # k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # 全量推送 + all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) await request.app.state.kafka_service.send_message_async(k_message) async def handle_del_contacts_async(request: Request,token_id,msg,wxid): @@ -827,10 +831,14 @@ async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid, await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) - # 单个群信息推送到kafka - group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) - k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # # 单个群信息推送到kafka + # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + # k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # 全量群信息推送到kafka + all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) await request.app.state.kafka_service.send_message_async(k_message) + else: logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}') @@ -925,9 +933,14 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) - #推送群资料到kafka - group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) - k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # #推送群资料到kafka + # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + # k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # await request.app.state.kafka_service.send_message_async(k_message) + + # 全量群信息推送到kafka + all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) await request.app.state.kafka_service.send_message_async(k_message) if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : @@ -959,9 +972,14 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) - #推送群资料到kafka - group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) - k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # #推送群资料到kafka + # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + # k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # await request.app.state.kafka_service.send_message_async(k_message)、 + + # 全量群信息推送到kafka + all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) await request.app.state.kafka_service.send_message_async(k_message) if 'roomtoolstips' in msg_content_xml : @@ -970,9 +988,14 @@ async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) - #推送群资料到kafka - group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) - k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # #推送群资料到kafka + # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + # k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # await request.app.state.kafka_service.send_message_async(k_message) + + # 全量群信息推送到kafka + all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) await request.app.state.kafka_service.send_message_async(k_message) except ET.ParseError as e: @@ -997,8 +1020,13 @@ async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) - group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) - k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + # k_message=wx_mod_group_info_members_message(wxid,group_info_members) + # await request.app.state.kafka_service.send_message_async(k_message) + + # 全量群信息推送到kafka + all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) + k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) await request.app.state.kafka_service.send_message_async(k_message) return