from voice.ali.ali_voice import AliVoice
from common.log import logger
import xml.etree.ElementTree as ET
import os,json,asyncio,aiohttp,random
from aiohttp import ClientError

from voice import audio_convert

from fastapi import APIRouter,Request
from pydantic import BaseModel
from fastapi import APIRouter, Depends
from typing import Dict, Any
from model.models import AgentConfig,OperationType
from common.utils import *
from common.memory import *

import traceback
import sys

timeout_duration = 8.0 

messages_router = APIRouter()



#WX_BACKLIST=['fmessage', 'medianote','weixin','weixingongzhong','tmessage']
WX_BACKLIST=['medianote','weixin','weixingongzhong','tmessage']
@messages_router.post("/messages",response_model=None)
async def get_messages(request: Request, body: Dict[str, Any]):
    logger.info(f"收到微信回调消息: {json.dumps(body, separators=(',', ':'),ensure_ascii=False)}")
    try:
        msg = body
       
        type_name =msg.get("TypeName")
        app_id = msg.get("Appid")
        k, loginfo = await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
        if not k:
            logger.warning('找不到登录信息,不处理')
            return {"message": f"收到微信回调消息: {type_name}"}
        
        token_id=loginfo.get('tokenId','')
        wxid = msg.get("Wxid",'')

        if type_name == 'AddMsg':
            await  handle_self_cmd_async(request,wxid,msg)
            msg_data = msg.get("Data")
            from_wxid = msg_data["FromUserName"]["string"]
            config=await request.app.state.redis_service.get_hash(f"__AI_OPS_WX__:WXCHAT_CONFIG") 
            wxids = list(config.keys())
            meged_backlist_wxids=wxids+WX_BACKLIST
            # 公众号ID已gh_开头
            if (from_wxid in meged_backlist_wxids and from_wxid != wxid) or 'gh_' in from_wxid:
                logger.warning(f'来自微信ID {from_wxid} 在黑名单,发送给 {wxid} ,不处理')
                return {"message": "收到微信回调消息"}

        await handle_messages_async(request,token_id,msg)
        
        return {"message": "收到微信回调消息,处理完成"}
    except Exception as e:
        # 获取当前的堆栈跟踪
        tb = sys.exc_info()[2]
        # 为异常附加堆栈跟踪
        e = e.with_traceback(tb)
        # 输出详细的错误信息
        logger.error(f"处理微信回调消息出错: {json.dumps(body, separators=(',', ':'),ensure_ascii=False)}")
        logger.error(f"异常类型: {type(e).__name__}")
        logger.error(f"异常信息: {str(e)}")
        logger.error(f"堆栈跟踪: {traceback.format_exc()}")

    return {"message": "处理微信回调消息错误"}

    


async def handle_self_cmd_async(request: Request,wxid,msg):
    '''
    个人微信命令处理
    如果是个人微信的指令,wxid == from_wxid
    commands = {
        '启用托管': True,
        '关闭托管': False
    }
    '''
    msg_data=msg.get("Data")
    from_wxid=msg_data["FromUserName"]["string"]
    msg_content=msg_data["Content"]["string"]
    if wxid == from_wxid:
        commands = {
            '启用托管': True,
            '关闭托管': False
        }
        if msg_content in commands:
            c_dict = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
            if c_dict:
                agent_config=AgentConfig.model_validate(c_dict)
                agent_config.agentEnabled=commands[msg_content]
                
                await request.app.state.gewe_service.wxchat.save_wxchat_config_async(wxid, agent_config.model_dump())
                logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管')

async def handle_messages_async(request: Request,token_id,msg):
    #msg_data=msg.get("Data")
    type_name =msg.get("TypeName")

    # app_id = msg.get("Appid")
    # from_wxid=msg_data["FromUserName"]["string"]
    # msg_content=msg_data["Content"]["string"]
    wxid = msg.get("Wxid",'')

    match type_name:
        case 'AddMsg':
            await  handle_add_messages_async(request,token_id,msg,wxid)
        case 'ModContacts':
            await  handle_mod_contacts_async(request,token_id,msg,wxid)
        case 'DelContacts':
            await handle_del_contacts_async(request,token_id,msg,wxid)
        case 'Offline':
            await handle_offline_async(request,token_id,msg,wxid)
        case _:
            logger.warning(f'未知消息类型:{type_name}')
        
# async def gpt_client_async(request,messages: list, wixd: str, friend_wxid: str):
#     c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd)
#     api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
#     print(f'流程key:{api_key}\n')
    
#     #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
#     #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
    
#     api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
#     headers = {
#         "Content-Type": "application/json",
#         "Authorization": f"Bearer {api_key}"
#     }
    
#     session_id = f'{wixd}-{friend_wxid}'
#     data = {
#         "model": "",
#         "messages": messages,
#         "chatId": session_id,
#         "detail": True
#     }
    
#     logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
    
#     async with aiohttp.ClientSession() as session:
#         try:
#             async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) as response:
#                 response.raise_for_status()
#                 response_data = await response.json()
#                 logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'), ensure_ascii=False)))
#                 return response_data
#         except aiohttp.ClientError as e:
#             logger.error(f"[CHATGPT] 请求失败: {e}")
#             raise



async def gpt_client_async(request, messages: list, wixd: str, friend_wxid: str):
    max_retries = 3
    retry_delay = 5  # 重试间隔时间(秒)

    for attempt in range(max_retries):
        try:
            c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd)
            api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
            base_url="http://106.15.182.218:3000"
            environment = os.environ.get('environment', 'default') 
            if environment != 'default':
               base_url="http://172.19.42.59:3000"
            api_url = f"{base_url}/api/v1/chat/completions"
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {api_key}"
            }
            session_id = f'{wixd}-{friend_wxid}'
            data = {
                "model": "",
                "messages": messages,
                "chatId": session_id,
                "detail": True
            }
            logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
            async with aiohttp.ClientSession() as session:
                async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=1200) as response:
                    response.raise_for_status()
                    response_data = await response.json()
                    return response_data
        except (ClientError, asyncio.TimeoutError) as e:
            logger.error(f"[CHATGPT] 请求失败(尝试 {attempt + 1}/{max_retries}): {e}")
            if attempt < max_retries - 1:
                await asyncio.sleep(retry_delay)
            else:
                raise

async def handle_add_messages_async(request: Request,token_id,msg,wxid):

    wx_config =await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    # if not bool(wx_config.get("agentEnabled",False)):
    #     logger.info(f'微信ID {wxid} 未托管,不处理')
    #     return   


   
    app_id = msg.get("Appid")
    msg_data = msg.get("Data")
    msg_type = msg_data.get("MsgType",None)
    from_wxid = msg_data["FromUserName"]["string"]
    to_wxid = msg_data["ToUserName"]["string"]
    msg_push_content=msg_data.get("PushContent") 

    validated_config = AgentConfig.model_validate(wx_config)

    if not validated_config.agentEnabled:
        logger.info(f'微信ID {wxid} 未托管,不处理 {msg_type}')
        return   

    handlers = {
        1:  handle_text_async,
        3:  handle_image_async,
        34: handle_voice_async,
        42: handle_name_card_async,
        49: handle_xml_async, 
        37: handle_add_friend_notice_async,
        10002: handle_10002_msg_async,
        10000: handle_10000_msg_async
    }

    # (扫码进群情况)判断受否是群聊,并添加到通信录
    if check_chatroom(from_wxid) or check_chatroom(to_wxid):
        logger.info('群信息')
        chatroom_id=from_wxid
        ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
        logger.info(f'保存到通讯录 chatroom_id {chatroom_id}  {msg}')
        await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
        await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
        handlers[1]=handle_text_group_async
        handlers[3]=handle_image_group_async
        handlers[34]=handle_voice_group_async

    handler = handlers.get(msg_type)

    if handler:
        return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
    else:
        logger.warning(f"微信回调消息类型 {msg_type} 未处理")

async def handle_mod_contacts_async(request: Request,token_id,msg,wxid):
        '''
        好友通过验证及好友资料变更的通知消息
        '''
        wxid = msg.get("Wxid")
        msg_data = msg.get("Data")
        app_id = msg.get("Appid")
        # 
        #handle_mod_contacts(token_id,app_id,wxid,msg_data)
        
        # 
        user_name=msg_data.get("UserName",{}).get("string","")

        if not check_chatroom(user_name):
            # loop = asyncio.get_event_loop()
            # future = asyncio.run_coroutine_threadsafe(
            #         handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data),
            #         loop
            # )
            await handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data)


            #判断是否好友关系
            # is_friends=True
            # if is_friends:

            #     loop = asyncio.get_event_loop()
            #     future = asyncio.run_coroutine_threadsafe(
            #         handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data),
            #         loop
            #     )

            #     contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"]
            #     nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"]
            #     city=msg_data.get("City",None)
            #     signature=msg_data.get("Signature",None)
            #     province=msg_data.get("Province",None)
            #     bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"]
            #     country=msg_data.get("Country",None)
            #     sex=msg_data.get("Sex",None)
            #     pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"]
            #     quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"]
            #     remark=msg_data.get("Remark",{}).get("string",None)
            #     remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None)
            #     remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None)
            #     smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None)
            #     #推动到kafka
            #     contact_data = {
            #         "alias": None,
            #         "bigHeadImgUrl": bigHeadImgUrl,
            #         "cardImgUrl": None,
            #         "city": city,
            #         "country": country,
            #         "description": None,
            #         "labelList": None,
            #         "nickName": nickname,
            #         "phoneNumList": None,
            #         "province": province,
            #         "pyInitial": pyInitial,
            #         "quanPin": quanPin,
            #         "remark": remark,
            #         "remarkPyInitial": remarkPyInitial,
            #         "remarkQuanPin": remarkQuanPin,
            #         "sex": sex,
            #         "signature": signature,
            #         "smallHeadImgUrl": smallHeadImgUrl,
            #         "snsBgImg": None,
            #         "userName": contact_wxid
            #     }
            #     input_message=wx_mod_contact_message(wxid,contact_data)
            #     await request.app.state.kafka_service.send_message_async(input_message)
            # else:
            #     logger.info('删除好友通知')
            #     contact_wxid =msg_data.get("UserName",{}).get("string","")
            #     # 推送到kafka
            #     input_message=wx_del_contact_message(wxid,contact_wxid)
            #     await request.app.state.kafka_service.send_message_async(input_message)
        else:
            logger.info('群信息变更通知')
            chatroom_id=user_name
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
            logger.info(f'保存到通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # 全量推送
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

async def handle_del_contacts_async(request: Request,token_id,msg,wxid):
    '''
    删除好友通知/退出群聊
    '''
    msg_data = msg.get("Data")
    username=msg_data["UserName"]["string"]
    if check_chatroom(username):
        logger.info('退出群聊')
        wxid = msg.get("Wxid")
        chatroom_id=username
        await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
        await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)

        logger.info(f'清除 chatroom_id{chatroom_id} 数据')
        
        # 推送删除群资料到kafka
        k_message=wx_del_group_message(wxid,chatroom_id)
        await request.app.state.kafka_service.send_message_async(k_message)
    else:
        logger.info('删除好友通知')
        # 推送到kafka
        input_message=wx_del_contact_message(wxid,username)
        await request.app.state.kafka_service.send_message_async(input_message)

async def handle_offline_async(request: Request,token_id,msg,wxid):
    '''
    已经离线
    '''
    wxid = msg.get("Wxid")
    app_id = msg.get("Appid")
    logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
    k,r=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
    print(k)
    await request.app.state.redis_service.update_hash_field(k,'status',0)
    await request.app.state.redis_service.update_hash_field(k,'modify_at',int(time.time())) 
    
    # 推送到kafka
    input_message=wx_offline_message(app_id,wxid)
    await request.app.state.kafka_service.send_message_async(input_message)

async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid,msg_data):
    '''
     好友通过验证及好友资料变更的通知消息
    '''
    logger.info('好友通过验证及好友资料变更的通知消息')
    if not check_chatroom(msg_data["UserName"]["string"]):
        contact_wxid = msg_data["UserName"]["string"]
        ret,msg,contacts_list = await request.app.state.gewe_service.fetch_contacts_list_async(token_id, app_id)
        friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong','tmessage']] # 可以调整截取范围
        print(f'{wxid}的好友资料变更,数量 {len(friend_wxids)}')
        data = await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
        
        #k_message = wx_all_contacts_message(wxid, data)
        k_message = wx_all_contacts_key_message(wxid)
        await request.app.state.kafka_service.send_message_async(k_message)
   
    else:
        logger.info('群聊好友通过验证及好友资料变更的通知消息')         

async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    私聊文本消息
    '''
    config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    validated_config = AgentConfig.model_validate(config)
    if not validated_config.privateGroupChatEnabled:
        logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_async.__name__} 不回复消息")
        return 
    
    msg_content=msg_data["Content"]["string"]
    if wxid == from_wxid:  #手动发送消息
        logger.info("Active message sending detected")
        
        await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
        callback_to_user=msg_data["ToUserName"]["string"]

        input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
        input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
        await request.app.state.kafka_service.send_message_async(input_message)
        logger.info("发送对话 %s",input_message)
    else:
        callback_to_user=msg_data["FromUserName"]["string"]
        
        # 创建并启动任务协程,将参数传递给 ai_chat_text 函数
        task = asyncio.create_task(
            ai_chat_text_async(request,token_id, app_id, wxid, msg_data, msg_content)
        )
        
        # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
        timeout_timer = asyncio.create_task(
            check_timeout_async(task, request,token_id, wxid, app_id, callback_to_user)
        )
        
        # 等待任务协程完成
        await task
        # 取消定时器
        timeout_timer.cancel()

async def check_timeout_async(task: asyncio.Task, request: Request,token_id, wxid, app_id, callback_to_user):
    await asyncio.sleep(timeout_duration)  # 等待超时时间
    if not task.done():
        print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
        wx_config = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
        if bool(wx_config.get("chatWaitingMsgEnabled", True)):
            # if callback_to_user == 'wxid_mesh33pw13e721':
            #     logger.info(f'wxid_mesh33pw13e721 不发送到微信有关等待的AI回复到微信')
            # else:
            #     phrases = ["稍等一下", "辛苦等等", "马上就好", "很快就好", "请稍后", "请等会", "稍等1分钟","请别急,在整理","就好了"]

            #     # 随机选择一个短语
            #     random_phrase = random.choice(phrases)
            #     await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, random_phrase) 
            phrases = ["稍等一下", "辛苦等等", "马上就好", "很快就好", "请稍后", "请等会", "稍等1分钟","请别急,在整理","就好了"]  
            random_phrase = random.choice(phrases) 
            await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, random_phrase)        
            #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "亲,我正在组织回复的信息,请稍等一会")

async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, msg_content):

    start_time = time.time()  # 记录任务开始时间
    callback_to_user = msg_data["FromUserName"]["string"]
    hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
    prompt = {"role": "user", "content": [{
        "type": "text",
        "text": msg_content
    }]}
    messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
    # 收到的对话
    input_wx_content_dialogue_message = [{"type": "text", "text": msg_content}]
    input_message = dialogue_message(callback_to_user, wxid, input_wx_content_dialogue_message)
    await request.app.state.kafka_service.send_message_async(input_message)
    logger.info("发送对话 %s", input_message)
    
    cache_data = USER_INTERACTIVE_CACHE.get(wxid)
    if cache_data and cache_data.get('interactive'):
        o = get_first_char_if_digit(msg_content)
        if o is not None:
            userSelectOptions = cache_data.get('options')  
            if o < len(userSelectOptions):
                o = o - 1
                msg_content = userSelectOptions[o].get("value")
                messages_to_send = [{"role": "user", "content": msg_content}]
            else:    
                messages_to_send = [{"role": "user", "content": msg_content}] 
        else:    
            messages_to_send = [{"role": "user", "content": msg_content}]
    res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
    reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
    description = ''
    userSelectOptions = []
    if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
        for item in reply_content:
            if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
                params = item["interactive"]["params"]
                description = params.get("description")
                userSelectOptions = params.get("userSelectOptions", [])
        values_string = "\n".join(option["value"] for option in userSelectOptions)
        
        if description is not None:
            USER_INTERACTIVE_CACHE[wxid] = {
            "interactive": True,
            "options": userSelectOptions,
            }          
            reply_content = description + '------------------------------\n' + values_string
        
    elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
        USER_INTERACTIVE_CACHE[wxid] = {
            "interactive": False
        }
        text = ''                        
        for item in reply_content:
            if item["type"] == "text":
                text = item["text"]["content"]
        if text == '':
            # 去除上次上一轮对话再次请求
            cache_messages_str = await request.app.state.redis_service.get_hash_field(hash_key, "data")
            cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
            
            if len(cache_messages) >= 3:
                cache_messages = cache_messages[:-3]  
            await request.app.state.redis_service.update_hash_field(hash_key, "data", json.dumps(cache_messages, ensure_ascii=False))
            messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
            res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
            reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])

            if isinstance(reply_content, list):
                reply_content = remove_markdown_symbol(reply_content[0].get('text').get("content"))
        else:
            reply_content = text
    else:
        USER_INTERACTIVE_CACHE[wxid] = {
            "interactive": False
        }     
        reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])

    # if callback_to_user == 'wxid_mesh33pw13e721':
    #      logger.info(f'wxid_mesh33pw13e721 不发送到微信有关{msg_content}的AI回复到微信')
    # else:
    #     await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
    img_urls,reply_content=extract_and_replace_image_urls(reply_content)
    if img_urls:
        await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
        await asyncio.sleep(random.uniform(1.5, 3)) 
        for img_url in img_urls:
            await request.app.state.gewe_service.post_image_async(token_id, app_id, callback_to_user, img_url)
            await asyncio.sleep(random.uniform(1.5, 3)) 
    else:
        
        if reply_content in '不回复':
            end_time = time.time()    # 记录任务结束时间
            execution_time = end_time - start_time  # 计算执行时间
            logger.warning(f"AI回答任务完成,耗时 {execution_time:.2f} 秒,AI回答<不回复>,跳过微信回复")
            await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
            return 
        else:     
            await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)  

    await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
    # 回复的对话
    input_wx_content_dialogue_message = [{"type": "text", "text": reply_content}]
    input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message, True)
    await request.app.state.kafka_service.send_message_async(input_message)
    logger.info("发送对话 %s", input_message)
    
    end_time = time.time()    # 记录任务结束时间
    execution_time = end_time - start_time  # 计算执行时间
    logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")

async def handle_text_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    群聊文本消息
    '''
    msg_content=msg_data["Content"]["string"]
    msg_push_content=msg_data.get("PushContent")
    k,login_info=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
    nickname=login_info.get("nickName")

    config=await request.aoo.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    validated_config = AgentConfig.model_validate(config)
    if not validated_config.privateGroupChatEnabled:
        logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_group_async.__name__} 不回复消息")
        return     

    if wxid == from_wxid:  #手动发送消息
        logger.info("Active message sending detected")
        
        await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
        callback_to_user=msg_data["ToUserName"]["string"]

        input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
        input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
        await request.app.state.kafka_service.send_message_async(input_message)
        logger.info("发送对话 %s",input_message)
    else:
        c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
        chatroom_id_white_list = c.get("chatroomIdWhiteList", [])

        if not chatroom_id_white_list:
            logger.info('白名单为空或未定义,不处理')
            return

        if from_wxid not in chatroom_id_white_list:
            logger.info(f'群ID {from_wxid} 不在白名单中,不处理')
            return
        
        if '在群聊中@了你' in msg_push_content or '@'+nickname  in msg_push_content:

            callback_to_user=msg_data["FromUserName"]["string"]
            hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'

            prompt={"role": "user", "content": [{
                "type": "text",
                "text": msg_content
            }]}
            messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
            # 收到的对话
            input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
            input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
            await request.app.state.kafka_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)
            
            cache_data = USER_INTERACTIVE_CACHE.get(wxid)
            if cache_data and cache_data.get('interactive') :
                o=get_first_char_if_digit(msg_content)
                if o is not None:
                    userSelectOptions=cache_data.get('options')  
                    if o < len(userSelectOptions):
                        o=o-1
                        msg_content=userSelectOptions[o].get("value")
                        messages_to_send=[{"role": "user", "content": msg_content}]
                    else:    
                        messages_to_send=[{"role": "user", "content": msg_content}] 
                else:    
                    messages_to_send=[{"role": "user", "content": msg_content}]
            
            res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
            reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])

            description = ''
            userSelectOptions = []

            if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
                for item in reply_content:
                    if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
                        params = item["interactive"]["params"]
                        description = params.get("description")
                        userSelectOptions = params.get("userSelectOptions", [])
                values_string = "\n".join(option["value"] for option in userSelectOptions)
                
                if description is not None:
                    USER_INTERACTIVE_CACHE[wxid] = {
                    "interactive":True,
                    "options": userSelectOptions,
                    }          
                    reply_content=description + '------------------------------\n'+values_string
                
            elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
                USER_INTERACTIVE_CACHE[wxid] = {
                    "interactive":False
                }
                text=''
                for item in reply_content:
                    if item["type"] == "text":
                        text=item["text"]["content"]
                if text=='':
                    # 去除上次上一轮对话再次请求
                    cache_messages_str=await request.app.state.redis_service.get_hash_field(hash_key,"data")
                    cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
                    
                    if len(cache_messages) >= 3:
                        cache_messages = cache_messages[:-3]  

                    await request.app.state.redis_service.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
                    messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
                    res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
                    reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
                else:
                    reply_content=text
            else:
                USER_INTERACTIVE_CACHE[wxid] = {
                    "interactive":False
                }     
                reply_content=res["choices"][0]["message"]["content"]

        
            reply_content='@'+extract_nickname(msg_push_content) + reply_content

            await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content)
            await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
            # 回复的对话
            reply_content=f'{wxid}:\n'+ reply_content
            input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
            input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
            await request.app.state.kafka_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)
        else:
            logger.info('群聊公开消息')
            callback_to_user=msg_data["FromUserName"]["string"]
            group_dialogue_message=[{"type": "text", "text": msg_content}]
            input_message=dialogue_message(callback_to_user,wxid,group_dialogue_message)
            await request.app.state.kafka_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)
            return
    
async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    私聊图片消息
    '''
    msg_content=msg_data["Content"]["string"]

    callback_to_user=from_wxid
    hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
    wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content)

    oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
    oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
    oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
    oss_bucket_name="cow-agent"
    oss_prefix="cow"

    img_url=upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)

    prompt={
        "role": "user", 
        "content": [{
            "type": "image_url",
            "image_url": {"url": img_url}
            }]
    }

    await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
    await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
    logger.info(f"上传图片 URL: {img_url}")

    wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
    input_message=dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
    await request.app.state.kafka_service.send_message_async(input_message)
    logger.info("发送对话 %s",input_message)

async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
        logger.info('群聊图片消息')
        msg_content=msg_data["Content"]["string"]
        callback_to_user=msg_data["FromUserName"]["string"]

        aeskey = re.search(r'aeskey="([^"]+)"', msg_content).group(1)
        cdnthumburl = re.search(r'cdnthumburl="([^"]+)"', msg_content).group(1)
        md5 = re.search(r'md5="([^"]+)"', msg_content).group(1)
        cdnthumblength = re.search(r'cdnthumblength="([^"]+)"', msg_content).group(1)
        cdnthumbheight = re.search(r'cdnthumbheight="([^"]+)"', msg_content).group(1)
        cdnthumbwidth = re.search(r'cdnthumbwidth="([^"]+)"', msg_content).group(1)
        length = re.search(r'length="([^"]+)"', msg_content).group(1)

        img_xml=f'<?xml version=\"1.0\"?>\n<msg>\n\t<img aeskey=\"{aeskey}\" encryver=\"1\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnthumburl}\" cdnthumblength=\"{cdnthumblength}\" cdnthumbheight=\"{cdnthumbheight}\" cdnthumbwidth=\"{cdnthumbwidth}\" cdnmidheight=\"0\" cdnmidwidth=\"0\" cdnhdheight=\"0\" cdnhdwidth=\"0\" cdnmidimgurl=\"{cdnthumburl}\" length=\"{length}\" md5=\"{md5}\" />\n\t<platform_signature></platform_signature>\n\t<imgdatahash></imgdatahash>\n</msg>'
        wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,img_xml)
        oss_url=wx_img_url_to_oss_url(wx_img_url)

        reply_content = re.sub(r'<\?xml.*', f'{oss_url}', msg_content, flags=re.DOTALL)
        #reply_content=f'{wxid}:\n{oss_url}'
        input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
        input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message,False)
        await request.app.state.kafka_service.send_message_async(input_message)
        logger.info("发送对话 %s",input_message)


async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    私聊语音消息
    '''
    callback_to_user=from_wxid
    msg_content=msg_data["Content"]["string"]
    msg_id=msg_data["MsgId"]

    config=await request.aoo.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    validated_config = AgentConfig.model_validate(config)
    if not validated_config.privateGroupChatEnabled:
        logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_voice_async.__name__} 不回复消息")
        return     


    file_url=await request.app.state.gewe_service.download_audio_msg_async(token_id,app_id,msg_id,msg_content)
    react_silk_path=await save_to_local_from_url_async(file_url)
    react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
    audio_convert.any_to_wav(react_silk_path,react_wav_path)
    react_voice_text=AliVoice().voiceToText(react_wav_path)

    os.remove(react_silk_path)
    os.remove(react_wav_path)

    hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
    messages=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "user", "content": react_voice_text})
    ai_res=await gpt_client_async(request,messages,wxid,callback_to_user)
    ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"])
    has_url=contains_url(ai_res_content)
    if not has_url:    
        voice_during,voice_url=wx_voice(ai_res_content)
        
        if voice_during < 60 * 1000:
            ret,ret_msg,res=await request.app.state.gewe_service.post_voice_async(token_id,app_id,callback_to_user,voice_url,voice_during)
            
        else:
            ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)    
            logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
        if ret==200:
            logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))

        else:
            logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
            ret,ret_msg,res==await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
            logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
            
    else:
        logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
        ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
        


   
    await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": ai_res_content})
    # 构造对话消息并发送到 Kafka
    input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
    input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
    await request.app.state.kafka_service.send_message_async(input_message)
    logger.info("发送对话 %s", input_message)

async def handle_voice_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
        logger.info('群聊语音消息')

async def handle_name_card_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    logger.info('名片消息')
    
    try:
        msg_content_xml=msg_data["Content"]["string"]
        # 解析XML字符串
        root = ET.fromstring(msg_content_xml)

        # 提取alias属性
        alias_value = root.get("alias")
        # 加好友资料
        scene = int(root.get("scene"))
        v3 = root.get("username")
        v4 = root.get("antispamticket")

        logger.info(f"alias_value: {alias_value}, scene: {scene}, v3: {v3}, v4: {v4}")
        # 判断appid 是否已经创建3天
        k, login_info = await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
        creation_timestamp=int(login_info.get('create_at',time.time()))
        current_timestamp = time.time()
        three_days_seconds = 3 * 24 * 60 * 60  # 三天的秒数
        diff_flag=(current_timestamp - creation_timestamp) >= three_days_seconds
        if not diff_flag:
            log_content=f'名片添加好友功能,{wxid} 用户创建不够三天,不能使用该功能'
            logger.warning(log_content)
            return 

        # 将加好友资料添加到待加好友队列
        #gewe_chat.wxchat.enqueue_to_add_contacts(wxid,scene,v3,v4)
        _,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)

        nickname=loginfo.get('nickName')
        add_contact_content=f'您好,我是{nickname}'
        #gewe_chat.wxchat.add_contacts(token_id,app_id,scene,Models.OperationType.ADD_FRIEND,v3,v4,add_contact_content)
        await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,OperationType.ADD_FRIEND.value,v3,v4,add_contact_content)

    except ET.ParseError as e:
        logger.error(f"XML解析错误: {e}")
    except KeyError as e:
        logger.error(f"字典键错误: {e}")
    except Exception as e:
        logger.error(f"未知错误: {e}")

async def handle_xml_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    处理xml
    '''
    try:
        msg_content_xml=msg_data["Content"]["string"]
        root = ET.fromstring(msg_content_xml)
        type_value = int(root.find(".//appmsg/type").text)
        handlers = {
                57: handle_xml_reference_async,
                5:  handle_xml_invite_group_async
            }
        handler = handlers.get(type_value)
        if handler:
            return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
        # elif "邀请你加入了群聊" in msg_content_xml:  # 邀请加入群聊
        #     logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
        else:
            print(f"xml消息 {type_value} 未解析")
    except ET.ParseError as e:
        logger.error(f"解析XML失败: {e}")
    except Exception as e:
        logger.error(f"未知错误: {e}")
    return
    
async def handle_xml_reference_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
        '''
        引用消息
        判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
        '''       
        callback_to_user=from_wxid
        hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'

        msg_content= msg_data["PushContent"]   

        prompt={"role": "user", "content": [{
            "type": "text",
            "text": msg_content
        }]}
        
        # 收到的对话
        messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
        input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
        input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
        await request.app.state.kafka_service.send_message_async(input_message)
        logger.info("发送对话 %s",input_message)

        # 回复的对话
        res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
        reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])             
        input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
        input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
        await request.app.state.kafka_service.kafka_client.produce_message(input_message)
        logger.info("发送对话 %s",input_message)
        await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
        await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content)

async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    群聊邀请
    判断此类消息的逻辑:$.Data.MsgType=49 
    并且 解析$.Data.Content.string中的xml msg.appmsg.title=邀请你加入群聊(根据手机设置的系统语言title会有调整,不同语言关键字不同)
    '''
    logger.info(f'{wxid} 群聊邀请')
    msg_content_xml=msg_data["Content"]["string"]
    root = ET.fromstring(msg_content_xml)
    title_value = root.find(".//appmsg/title").text
    
    if '邀请你加入群聊' in title_value:
        invite_url = root.find('.//url').text
        ret,msg,data=await request.app.state.gewe_service.agree_join_room_async(token_id,app_id,invite_url)
        if ret==200:
            logger.info(f'群聊邀请,同意加入群聊 {msg} {data}')
            chatroom_id=data.get('chatroomId','')
            # if not chatroom_id:
            #     logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
            #     return
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
            logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # # 单个群信息推送到kafka
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # 全量群信息推送到kafka
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

        else:
            logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')   

async def handle_add_friend_notice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    好友添加请求通知
    '''
    logger.info('好友添加请求通知')
    try:
        msg_content_xml=msg_data["Content"]["string"]
        
        root = ET.fromstring(msg_content_xml)

        msg_content = root.attrib.get('content', None)
        v3= root.attrib.get('encryptusername', None)
        v4= root.attrib.get('ticket', None)
        scene=root.attrib.get('scene', None)

        
        to_contact_wxid=root.attrib.get('fromusername', None)
        wxid=msg_data["ToUserName"]["string"]

        # 自动同意好友
        # print(v3)
        # print(v4)
        # print(scene)
        # print(msg_content)
        # 操作类型,2添加好友 3同意好友 4拒绝好友
        #option=2
        option=3
        reply_add_contact_contact="亲,我是你的好友"
        ret,ret_msg=await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
        if ret==200:
            logger.info('自动添加好友成功')

            # 好友发送的文字
            hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
            prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
            messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)

            input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
            input_message=dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
            await request.app.state.gewe_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)

            callback_to_user=to_contact_wxid
            res=await gpt_client_async(messages_to_send,wxid,callback_to_user)
            reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])

            #保存好友信息
            await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id, wxid,[to_contact_wxid])

            # 保存到缓存
            await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})

            # 发送信息
            await request.app.state.gewe_service.post_text_async(token_id,app_id, to_contact_wxid,reply_content)
        
            # 发送到kafka
            input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
            input_message=dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
            request.app.state.kafka_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)

        else:
            logger.warning("添加好友失败")
    except ET.ParseError as e:
        logger.error(f"解析XML失败: {e}")
    except Exception as e:
        logger.error(f"未知错误: {e}")
    return

async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):

    '''
    群聊邀请
    撤回消息
    拍一拍消息
    地理位置
    踢出群聊通知
    解散群聊通知
    发布群公告
    '''

    try:
        msg_content_xml=msg_data["Content"]["string"]
        # 群聊邀请
        if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
            chatroom_id=msg_data["FromUserName"]["string"]
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
            logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # #推送群资料到kafka
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # await request.app.state.kafka_service.send_message_async(k_message)

            # 全量群信息推送到kafka
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

        if  '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
            chatroom_id=msg_data["FromUserName"]["string"]
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
            logger.info(f'移出群聊,移除从通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
            await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
            logger.info(f'清除 chatroom_id{chatroom_id} 数据')
            # 推送删除群资料到kafka
            k_message=wx_del_group_message(wxid,chatroom_id)
            await request.app.state.kafka_service.send_message_async(k_message)

        
        if  '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
            chatroom_id=msg_data["FromUserName"]["string"]
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
            logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
            await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
            logger.info(f'清除 chatroom_id{chatroom_id} 数据')
            # 推送删除群资料到kafka
            k_message=wx_del_group_message(wxid,chatroom_id)
            await request.app.state.kafka_service.send_message_async(k_message)

        if 'mmchatroombarannouncememt' in msg_content_xml :
            chatroom_id=msg_data["FromUserName"]["string"]
            logger.info(f'发布群公告 chatroom_id {chatroom_id}')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # #推送群资料到kafka
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # await request.app.state.kafka_service.send_message_async(k_message)、
        
            # 全量群信息推送到kafka
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

        if 'roomtoolstips' in msg_content_xml :
            chatroom_id=msg_data["FromUserName"]["string"]
            logger.info(f'群待办 chatroom_id {chatroom_id} ')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # #推送群资料到kafka
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # await request.app.state.kafka_service.send_message_async(k_message)

            # 全量群信息推送到kafka
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

    except ET.ParseError as e:
        logger.error(f"解析XML失败: {e}")
    except Exception as e:
        logger.error(f"未知错误: {e}")
    return

async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    修改群名称
    更换群主通知
    被移除群聊通知
    '''
    content=msg_data.get("Content","").get("string","")

    if '修改群名' or '新群主' or '被移除群聊通知' in content and check_chatroom(from_wxid):
        chatroom_id=from_wxid
        logger.info(f'{content} chatroom_id {chatroom_id}  ')
        await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
        await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)

        # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
        # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
        # await request.app.state.kafka_service.send_message_async(k_message)

        # 全量群信息推送到kafka
        #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
        #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
        k_message=wx_groups_info_members_key_message(wxid)
        await request.app.state.kafka_service.send_message_async(k_message)

    return