diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index 73d9c23..fcb68c5 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -788,6 +788,11 @@ async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid, logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) + + # 单个群信息推送到kafka + group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + k_message=wx_mod_group_info_members_message(wxid,group_info_members) + await request.app.state.kafka_service.send_message_async(k_message) else: logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}') diff --git a/services/gewe_service.py b/services/gewe_service.py index 5fd61b3..47227e3 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -884,7 +884,6 @@ class GeWeService: await self.redis_service.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False)) await asyncio.sleep(0.1) - async def update_group_members_to_cache_async(self, token_id, app_id, wxid, chatroom_id: str): """ 更新将群信息保存到 Redis 缓存。 @@ -915,29 +914,25 @@ class GeWeService: return groups async def get_groups_info_members_from_cache_async(self, wxid)->list: - group_info_hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" - group_info_cache = await self.redis_service.get_hash(group_info_hash_key) + groups_info_hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" + groups_info_cache = await self.redis_service.get_hash(groups_info_hash_key) - group_menbers_hash_key= f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}" - group_menbers_cache = await self.redis_service.get_hash(group_menbers_hash_key) + groups_menbers_hash_key= f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}" + groups_menbers_cache = await self.redis_service.get_hash(groups_menbers_hash_key) - group_info_cache_list=[{"chatroom_id": field, "value": json.loads(value)} for field, value in group_info_cache.items()] - group_menbers_cache_list=[{"chatroom_id": field, "value": json.loads(value)} for field, value in group_menbers_cache.items()] - # print(group_info_cache_list) - # print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~') - # print(group_menbers_cache_list) - # return group_info_cache_list,group_menbers_cache_list + groups_info_cache_list=[{"chatroom_id": field, "value": json.loads(value)} for field, value in groups_info_cache.items()] + groups_menbers_cache_list=[{"chatroom_id": field, "value": json.loads(value)} for field, value in groups_menbers_cache.items()] # 合并逻辑 merged_data = [] # 遍历 group_info - for info in group_info_cache_list: + for info in groups_info_cache_list: chatroom_id = info['chatroom_id'] group_data = info['value'] # 查找对应的 group_members - members = next((m['value'] for m in group_menbers_cache_list if m['chatroom_id'] == chatroom_id), None) + members = next((m['value'] for m in groups_menbers_cache_list if m['chatroom_id'] == chatroom_id), None) if members: # 合并数据 @@ -952,7 +947,18 @@ class GeWeService: return merged_data - + async def get_group_info_members_from_cache_async(self, wxid,chatroom_id)->dict: + group_info_cache= await self.get_group_info_from_cache_async(wxid,chatroom_id) + group_menbers_cache= await self.get_group_members_from_cache_async(wxid,chatroom_id) + group_info_members = { + "nickName": group_info_cache['nickName'], + "chatroomId": group_info_cache['chatroomId'], + "memberList": group_menbers_cache['memberList'], + "chatroomOwner": group_menbers_cache['chatroomOwner'], + "adminWxid": group_menbers_cache['adminWxid'] + } + return group_info_members + async def update_group_info_to_cache_async(self, token_id, app_id, wxid, chatroom_id: str): """ 更新将群信息保存到 Redis 缓存。