diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index fcb68c5..dd54d42 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -252,7 +252,13 @@ async def handle_del_contacts_async(request: Request,token_id,msg,wxid): wxid = msg.get("Wxid") chatroom_id=username await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) + await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) + logger.info(f'清除 chatroom_id{chatroom_id} 数据') + + # 推送删除群资料到kafka + k_message=wx_del_group_message(wxid,chatroom_id) + await request.app.state.kafka_service.send_message_async(k_message) else: logger.info('删除好友通知') # 推送到kafka @@ -886,22 +892,35 @@ async def handle_10002_msg(request: Request,token_id,app_id, wxid,msg_data,from_ logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) + + #推送群资料到kafka + group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + k_message=wx_mod_group_info_members_message(wxid,group_info_members) + await request.app.state.kafka_service.send_message_async(k_message) if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2) logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) + await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) logger.info(f'清除 chatroom_id{chatroom_id} 数据') + # 推送删除群资料到kafka + k_message=wx_del_group_message(wxid,chatroom_id) + await request.app.state.kafka_service.send_message_async(k_message) + if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2) logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) + await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) logger.info(f'清除 chatroom_id{chatroom_id} 数据') + # 推送删除群资料到kafka + k_message=wx_del_group_message(wxid,chatroom_id) + await request.app.state.kafka_service.send_message_async(k_message) - print('撤回消息,拍一拍消息,地理位置') except ET.ParseError as e: logger.error(f"解析XML失败: {e}") except Exception as e: diff --git a/common/utils.py b/common/utils.py index e63be51..c8a3d99 100644 --- a/common/utils.py +++ b/common/utils.py @@ -118,6 +118,11 @@ def wx_mod_group_info_members_message(wxid:str,data:dict|list)->str: data=kafka_base_message("mod-group",content) return json.dumps(data, separators=(',', ':'), ensure_ascii=False) +def wx_del_group_message(wxid:str,chatroom_id:str)->str: + content = {"wxid": wxid,"chatroom_id":chatroom_id} + data=kafka_base_message("del-group",content) + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + def login_qrcode_message(token_id: str,agent_tel:str,qr_code_img_base64:str,qr_code_url:list)->str: """ 构造消息的 JSON 数据