from voice.ali.ali_voice import AliVoice
from common.log import logger
import xml.etree.ElementTree as ET
import os,json,asyncio,aiohttp,random
from aiohttp import ClientError

from voice import audio_convert

from fastapi import APIRouter,Request
from pydantic import BaseModel
from fastapi import APIRouter, Depends
from typing import Dict, Any
from model.models import AgentConfig,OperationType
from common.utils import *
from common.memory import *

import traceback
import sys

timeout_duration = 8.0 

messages_router = APIRouter()



#WX_BACKLIST=['fmessage', 'medianote','weixin','weixingongzhong','tmessage']
WX_BACKLIST=['medianote','weixin','weixingongzhong','tmessage']
@messages_router.post("/messages",response_model=None)
async def get_messages(request: Request, body: Dict[str, Any]):
    logger.info(f"收到微信回调消息: {json.dumps(body, separators=(',', ':'),ensure_ascii=False)}")
    try:
        msg = body
       
        type_name =msg.get("TypeName")
        app_id = msg.get("Appid")
        k, loginfo = await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
        if not k:
            logger.warning('找不到登录信息,不处理')
            return {"message": f"收到微信回调消息: {type_name}"}
        
        token_id=loginfo.get('tokenId','')
        wxid = msg.get("Wxid",'')

        if type_name == 'AddMsg':
            await  handle_self_cmd_async(request,wxid,msg)
            msg_data = msg.get("Data")
            from_wxid = msg_data["FromUserName"]["string"]
            config=await request.app.state.redis_service.get_hash(f"__AI_OPS_WX__:WXCHAT_CONFIG") 
            wxids = list(config.keys())
            meged_backlist_wxids=wxids+WX_BACKLIST
            # 公众号ID已gh_开头,屏蔽元宝
            if (from_wxid in meged_backlist_wxids and from_wxid != wxid) or 'gh_' in from_wxid or  'wxid_wi_' in from_wxid:
                logger.warning(f'来自微信ID {from_wxid} 在黑名单,发送给 {wxid} ,不处理')
                return {"message": "收到微信回调消息"}

        await handle_messages_async(request,token_id,msg)
        
        return {"message": "收到微信回调消息,处理完成"}
    except Exception as e:
        # 获取当前的堆栈跟踪
        tb = sys.exc_info()[2]
        # 为异常附加堆栈跟踪
        e = e.with_traceback(tb)
        # 输出详细的错误信息
        logger.error(f"处理微信回调消息出错: {json.dumps(body, separators=(',', ':'),ensure_ascii=False)}")
        logger.error(f"异常类型: {type(e).__name__}")
        logger.error(f"异常信息: {str(e)}")
        logger.error(f"堆栈跟踪: {traceback.format_exc()}")

    return {"message": "处理微信回调消息错误"}

    


async def handle_self_cmd_async(request: Request,wxid,msg):
    '''
    个人微信命令处理
    如果是个人微信的指令,from_wxid == to_wxid
    commands = {
        '启用托管': True,
        '关闭托管': False
    }
    '''
    msg_data=msg.get("Data")
    to_wxid=msg_data["ToUserName"]["string"]
    from_wxid=msg_data["FromUserName"]["string"]
    msg_content=msg_data["Content"]["string"]
    if from_wxid == to_wxid and wxid == to_wxid:
        commands = {
            '启用托管': True,
            '关闭托管': False
        }
        if msg_content in commands:
            c_dict = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
            if c_dict:
                agent_config=AgentConfig.model_validate(c_dict)
                agent_config.agentEnabled=commands[msg_content]
                
                await request.app.state.gewe_service.save_wxchat_config_async(wxid, agent_config.model_dump())
                logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管')
        return {"message": "收到微信回调消息"}

   

async def handle_messages_async(request: Request,token_id,msg):
    #msg_data=msg.get("Data")
    type_name =msg.get("TypeName")

    wxid = msg.get("Wxid",'')

    match type_name:
        case 'AddMsg':
            await  handle_add_messages_async(request,token_id,msg,wxid)
        case 'ModContacts':
            await  handle_mod_contacts_async(request,token_id,msg,wxid)
        case 'DelContacts':
            await handle_del_contacts_async(request,token_id,msg,wxid)
        case 'Offline':
            await handle_offline_async(request,token_id,msg,wxid)
        case _:
            logger.warning(f'未知消息类型:{type_name}')
        
async def gpt_client_async(request, messages: list, wixd: str, friend_wxid: str):
    max_retries = 3
    retry_delay = 5  # 重试间隔时间(秒)

    for attempt in range(max_retries):
        try:
            c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd)
            api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
            base_url="http://106.15.182.218:3000"
            environment = os.environ.get('environment', 'default') 
            if environment != 'default':
               base_url="http://172.19.42.59:3000"
            api_url = f"{base_url}/api/v1/chat/completions"
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {api_key}"
            }
            session_id = f'{wixd}-{friend_wxid}'
            data = {
                "model": "",
                "messages": messages,
                "chatId": session_id,
                "detail": True
            }
            logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
            async with aiohttp.ClientSession() as session:
                async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=1200) as response:
                    response.raise_for_status()
                    response_data = await response.json()
                    return response_data
        except (ClientError, asyncio.TimeoutError) as e:
            logger.error(f"[CHATGPT] 请求失败(尝试 {attempt + 1}/{max_retries}): {e}")
            if attempt < max_retries - 1:
                await asyncio.sleep(retry_delay)
            else:
                raise

async def handle_add_messages_async(request: Request,token_id,msg,wxid):

    wx_config =await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    # if not bool(wx_config.get("agentEnabled",False)):
    #     logger.info(f'微信ID {wxid} 未托管,不处理')
    #     return   


   
    app_id = msg.get("Appid")
    msg_data = msg.get("Data")
    msg_type = msg_data.get("MsgType",None)
    from_wxid = msg_data["FromUserName"]["string"]
    to_wxid = msg_data["ToUserName"]["string"]
    msg_push_content=msg_data.get("PushContent") 

    validated_config = AgentConfig.model_validate(wx_config)

    if not validated_config.agentEnabled:
        logger.info(f'微信ID {wxid} 未托管,不处理 {msg_type}')
        return   

    handlers = {
        1:  handle_text_async,
        3:  handle_image_async,
        34: handle_voice_async,
        42: handle_name_card_async,
        43: handle_video_async,
        49: handle_xml_async, 
        51: handle_file_message_async,
        37: handle_add_friend_notice_async,
        10002: handle_10002_msg_async,
        10000: handle_10000_msg_async
    }

    # (扫码进群情况)判断受否是群聊,并添加到通信录
    if check_chatroom(from_wxid) or check_chatroom(to_wxid):
        logger.info('群信息')
        chatroom_id=from_wxid
        ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
        logger.info(f'保存到通讯录 chatroom_id {chatroom_id}  {msg}')
        await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
        await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
        handlers[1]=handle_text_group_async
        handlers[3]=handle_image_group_async
        handlers[34]=handle_voice_group_async
        handlers[43]=handle_video_group_async

    handler = handlers.get(msg_type)

    if handler:
        return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
    else:
        logger.warning(f"微信回调消息类型 {msg_type} 未处理")

async def handle_mod_contacts_async(request: Request,token_id,msg,wxid):
        '''
        好友通过验证及好友资料变更的通知消息
        '''
        wxid = msg.get("Wxid")
        msg_data = msg.get("Data")
        app_id = msg.get("Appid")
        # 
        #handle_mod_contacts(token_id,app_id,wxid,msg_data)
        
        # 
        user_name=msg_data.get("UserName",{}).get("string","")

        if not check_chatroom(user_name):
            # loop = asyncio.get_event_loop()
            # future = asyncio.run_coroutine_threadsafe(
            #         handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data),
            #         loop
            # )
            await handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data)


            #判断是否好友关系
            # is_friends=True
            # if is_friends:

            #     loop = asyncio.get_event_loop()
            #     future = asyncio.run_coroutine_threadsafe(
            #         handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data),
            #         loop
            #     )

            #     contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"]
            #     nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"]
            #     city=msg_data.get("City",None)
            #     signature=msg_data.get("Signature",None)
            #     province=msg_data.get("Province",None)
            #     bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"]
            #     country=msg_data.get("Country",None)
            #     sex=msg_data.get("Sex",None)
            #     pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"]
            #     quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"]
            #     remark=msg_data.get("Remark",{}).get("string",None)
            #     remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None)
            #     remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None)
            #     smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None)
            #     #推动到kafka
            #     contact_data = {
            #         "alias": None,
            #         "bigHeadImgUrl": bigHeadImgUrl,
            #         "cardImgUrl": None,
            #         "city": city,
            #         "country": country,
            #         "description": None,
            #         "labelList": None,
            #         "nickName": nickname,
            #         "phoneNumList": None,
            #         "province": province,
            #         "pyInitial": pyInitial,
            #         "quanPin": quanPin,
            #         "remark": remark,
            #         "remarkPyInitial": remarkPyInitial,
            #         "remarkQuanPin": remarkQuanPin,
            #         "sex": sex,
            #         "signature": signature,
            #         "smallHeadImgUrl": smallHeadImgUrl,
            #         "snsBgImg": None,
            #         "userName": contact_wxid
            #     }
            #     input_message=wx_mod_contact_message(wxid,contact_data)
            #     await request.app.state.kafka_service.send_message_async(input_message)
            # else:
            #     logger.info('删除好友通知')
            #     contact_wxid =msg_data.get("UserName",{}).get("string","")
            #     # 推送到kafka
            #     input_message=wx_del_contact_message(wxid,contact_wxid)
            #     await request.app.state.kafka_service.send_message_async(input_message)
        else:
            logger.info('群信息变更通知')
            chatroom_id=user_name
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
            logger.info(f'保存到通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # 全量推送
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

async def handle_del_contacts_async(request: Request,token_id,msg,wxid):
    '''
    删除好友通知/退出群聊
    '''
    msg_data = msg.get("Data")
    username=msg_data["UserName"]["string"]
    if check_chatroom(username):
        logger.info('退出群聊')
        wxid = msg.get("Wxid")
        chatroom_id=username
        await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
        await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)

        logger.info(f'清除 chatroom_id{chatroom_id} 数据')
        
        # 推送删除群资料到kafka
        k_message=wx_del_group_message(wxid,chatroom_id)
        await request.app.state.kafka_service.send_message_async(k_message)
    else:
        logger.info('删除好友通知')
        # 推送到kafka
        input_message=wx_del_contact_message(wxid,username)
        await request.app.state.kafka_service.send_message_async(input_message)

async def handle_offline_async(request: Request,token_id,msg,wxid):
    '''
    已经离线
    '''
    wxid = msg.get("Wxid")
    app_id = msg.get("Appid")
    logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
    k,r=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
    print(k)
    await request.app.state.redis_service.update_hash_field(k,'status',0)
    await request.app.state.redis_service.update_hash_field(k,'modify_at',int(time.time())) 
    
    # 推送到kafka
    input_message=wx_offline_message(app_id,wxid)
    await request.app.state.kafka_service.send_message_async(input_message)

async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid,msg_data):
    '''
     好友通过验证及好友资料变更的通知消息
    '''
    logger.info('好友通过验证及好友资料变更的通知消息')
    if not check_chatroom(msg_data["UserName"]["string"]):
        contact_wxid = msg_data["UserName"]["string"]
        ret,msg,contacts_list = await request.app.state.gewe_service.fetch_contacts_list_async(token_id, app_id)
        friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong','tmessage']] # 可以调整截取范围
        logger.info(f'{wxid}的好友资料变更,数量 {len(friend_wxids)}')
        data = await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
        
        #k_message = wx_all_contacts_message(wxid, data)
        k_message = wx_all_contacts_key_message(wxid)
        await request.app.state.kafka_service.send_message_async(k_message)
   
    else:
        logger.info('群聊好友通过验证及好友资料变更的通知消息')         

async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    私聊文本消息
    '''
    config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    validated_config = AgentConfig.model_validate(config)
    if not validated_config.privateGroupChatEnabled:
        logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_async.__name__} 不回复消息")
        return 
    
    # 判断是否转人工处理功能
    is_human_handle_msg= await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_async(wxid,from_wxid)
    if is_human_handle_msg:
        logger.warning(f'微信号 {wxid} 发送到微信号{to_wxid} 暂时工人接管30分钟中')
        return



    msg_content=msg_data["Content"]["string"]
    if wxid == from_wxid:  #手动发送消息
        logger.info(f"{wxid} 手动发送消息")
        
        await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
        callback_to_user=msg_data["ToUserName"]["string"]
        # 转人工处理功能
        await request.app.state.gewe_service.set_human_handle_msg_with_contact_wxid_async(wxid,to_wxid,60*30)

        # 推送到kafka
        input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
        input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
        await request.app.state.kafka_service.send_message_async(input_message)
        logger.info("发送对话 %s",input_message)
    else:
        callback_to_user=msg_data["FromUserName"]["string"]

        # 判断哪些关键词存在于 msg_content 中
        #keywords = ["预约", "报价", "购买", "价钱","价格", "多少钱", "下单"]
        keywords =["报价","预约","买","价钱","价格","多少钱","下单","优惠","打折","折扣","几钱","便宜","贵","最低","续费","试用","体验","免费","费用","开票","开通","注册"]
        found_keywords = [keyword for keyword in keywords if keyword in msg_content]
        if found_keywords:
            await request.app.state.gewe_service.set_human_handle_msg_with_contact_wxid_key_word_async(wxid,callback_to_user,60*30)
            logger.info(f"{wxid} 收到 {callback_to_user} 私聊消息匹配到关键词:{', '.join(found_keywords)}")

            # 回复好友
            await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "我这边目前有点忙,稍后回复您好吗?")
            await asyncio.sleep(random.uniform(1.5,3))
            # 推送到助理
            print('推送到助理')
            '''
            长服AI商机提醒助理
            管家助理
            18664262743
            wxid_9pocbage7cdb22
            '''
            contacts_brief= await request.app.state.gewe_service.get_contacts_brief_from_cache_async(wxid)
            
            contact_nick_name=next(filter(lambda x:x.get("userName") == callback_to_user,contacts_brief),{}).get("nickName","")
            assistant_msg_content=f'AI管家转人工提醒:【{contact_nick_name}】有{found_keywords[0]}需求'
            await request.app.state.gewe_service.post_text_async(token_id, app_id, 'wxid_9pocbage7cdb22', assistant_msg_content)
            
            k,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async('wxid_9pocbage7cdb22')
            if not k:
                logger.warning(f'助理微信号 wxid_9pocbage7cdb22 不存在')
                return 

            staus=loginfo.get('status','0')
            if staus != '1':
                logger.warning(f'助理微信号 wxid_9pocbage7cdb22 不在线')
                return
            
            kf_token_id=loginfo.get('tokenId','')
            kf_appid=loginfo.get('appId','')
            await asyncio.sleep(random.uniform(1.5,3))
            assistant_msg_content=f'收到,{assistant_msg_content}'
            await request.app.state.gewe_service.post_text_async(kf_token_id, kf_appid, wxid, assistant_msg_content)
            return
        

        # 是否在转人工处理
        is_human_handle_msg_with_contact_wxid= await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_key_word_async(wxid,callback_to_user)
        if is_human_handle_msg_with_contact_wxid:
            logger.warning(f'微信号 {wxid} 与 {callback_to_user} 有关键字匹配,暂时工人接管30分钟中,请查看长服AI商机提醒助理')

            return
        
        request.app.state.message_lock[app_id]= asyncio.Lock()

        # 创建并启动任务协程,将参数传递给 ai_chat_text 函数
        task = asyncio.create_task(
            ai_chat_text_async(request,token_id, app_id, wxid, msg_data, msg_content)
        )
        
        # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
        timeout_timer = asyncio.create_task(
            check_timeout_async(task, request,token_id, wxid, app_id, callback_to_user)
        )
        
        # 等待任务协程完成
        await task
        # 取消定时器
        timeout_timer.cancel()
        
#message_lock = asyncio.Lock()

async def check_timeout_async(task: asyncio.Task, request: Request,token_id, wxid, app_id, callback_to_user):
    await asyncio.sleep(timeout_duration)  # 等待超时时间
    if not task.done():
        message_lock=request.app.state.message_lock[app_id]
        async with message_lock:
            print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
            wx_config = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
            if bool(wx_config.get("chatWaitingMsgEnabled", True)):
                # if callback_to_user == 'wxid_mesh33pw13e721':
                #     logger.info(f'wxid_mesh33pw13e721 不发送到微信有关等待的AI回复到微信')
                # else:
                #     phrases = ["稍等一下", "辛苦等等", "马上就好", "很快就好", "请稍后", "请等会", "稍等1分钟","请别急,在整理","就好了"]

                #     # 随机选择一个短语
                #     random_phrase = random.choice(phrases)
                #     await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, random_phrase) 
                phrases = ["稍等一下", "辛苦等等", "马上就好", "很快就好", "请稍后", "请等会", "稍等1分钟","请别急,在整理","就好了"]  
                random_phrase = random.choice(phrases) 
                await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, random_phrase)        
                #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "亲,我正在组织回复的信息,请稍等一会")

async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, msg_content):

    start_time = time.time()  # 记录任务开始时间

    callback_to_user = msg_data["FromUserName"]["string"]

    k,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
    wxid_nickname=loginfo.get("nickName")

    contacts_brief:list=await request.app.state.gewe_service.get_contacts_brief_from_cache_async(wxid)
    #callback_to_user_nickname=next(filter(lambda x:x.get("userName") == callback_to_user,contacts_brief),None).get("nickName")
    callback_to_user_nickname = next(filter(lambda x: x.get("userName") == callback_to_user, contacts_brief), {}).get("nickName", "")
    
    hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
    prompt = {"role": "user", "content": [{
        "type": "text",
        "text": msg_content
    }]}
    messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
    # 收到的对话
    input_wx_content_dialogue_message = [{"type": "text", "text": msg_content}]
    input_message = dialogue_message(callback_to_user, wxid, input_wx_content_dialogue_message)
    await request.app.state.kafka_service.send_message_async(input_message)
    logger.info("发送对话 %s", input_message)
    
    cache_data = USER_INTERACTIVE_CACHE.get(wxid)
    if cache_data and cache_data.get('interactive'):
        o = get_first_char_if_digit(msg_content)
        if o is not None:
            userSelectOptions = cache_data.get('options')  
            if o < len(userSelectOptions):
                o = o - 1
                msg_content = userSelectOptions[o].get("value")
                messages_to_send = [{"role": "user", "content": msg_content}]
            else:    
                messages_to_send = [{"role": "user", "content": msg_content}] 
        else:    
            messages_to_send = [{"role": "user", "content": msg_content}]
    res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
    reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
    description = ''
    userSelectOptions = []
    if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
        for item in reply_content:
            if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
                params = item["interactive"]["params"]
                description = params.get("description")
                userSelectOptions = params.get("userSelectOptions", [])
        values_string = "\n".join(option["value"] for option in userSelectOptions)
        
        if description is not None:
            USER_INTERACTIVE_CACHE[wxid] = {
            "interactive": True,
            "options": userSelectOptions,
            }          
            reply_content = description + '------------------------------\n' + values_string
        
    elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
        USER_INTERACTIVE_CACHE[wxid] = {
            "interactive": False
        }
        text = ''                        
        for item in reply_content:
            if item["type"] == "text":
                text = item["text"]["content"]
        if text == '':
            # 去除上次上一轮对话再次请求
            cache_messages_str = await request.app.state.redis_service.get_hash_field(hash_key, "data")
            cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
            
            if len(cache_messages) >= 3:
                cache_messages = cache_messages[:-3]  
            await request.app.state.redis_service.update_hash_field(hash_key, "data", json.dumps(cache_messages, ensure_ascii=False))
            messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
            res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
            reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])

            if isinstance(reply_content, list):
                reply_content = remove_markdown_symbol(reply_content[0].get('text').get("content"))
        else:
            reply_content = text
    else:
        USER_INTERACTIVE_CACHE[wxid] = {
            "interactive": False
        }     
        reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])

    


    if reply_content == '不回复':
        end_time = time.time()    # 记录任务结束时间
        execution_time = end_time - start_time  # 计算执行时间
        logger.warning(f"AI回答任务完成,耗时 {execution_time:.2f} 秒,AI回答<不回复>,跳过微信回复")
        await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
        return    

    message_lock=request.app.state.message_lock[app_id]
    # 昵称替换
    replacements = {
        '{昵称}': wxid_nickname,
        '{好友昵称}': callback_to_user_nickname
    }
    reply_content=replace_placeholders(reply_content, replacements)

    # 判断图片url
    img_urls,reply_content=extract_and_replace_image_urls(reply_content)
    if img_urls:
        
        #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
        async with message_lock:
            await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content)
            await asyncio.sleep(random.uniform(1.5, 3)) 
            for img_url in img_urls:
                await request.app.state.gewe_service.post_image_async(token_id, app_id, callback_to_user, img_url)
                await asyncio.sleep(random.uniform(1.5, 3)) 

    # 判断视频url
    video_urls,reply_content=extract_and_replace_video_urls(reply_content)
    if video_urls:
        #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
        async with message_lock:
            await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content)
            await asyncio.sleep(random.uniform(1.5, 3)) 
            for video_url in video_urls:
                parsed_url = urlparse(video_url)
                filename = os.path.basename(parsed_url.path)
                tmp_file_path = os.path.join(os.getcwd(),'tmp', filename)  # 拼接完整路径
                thumbnail_path=tmp_file_path.replace('.mp4','.jpg')
                
                video_thumb_url,video_duration =download_video_and_get_thumbnail(video_url,thumbnail_path)
                logger.info(f'{wxid} 视频缩略图 {video_thumb_url} 时长 {video_duration}')
                ret,ret_msg,res = await request.app.state.gewe_service.post_video_async(token_id, app_id, callback_to_user, video_url,video_thumb_url,video_duration)
                if ret!=200:
                    logger.warning(f'{wxid} 发送视频{video_url} 到 {callback_to_user} 失败,{ret_msg}')
                await asyncio.sleep(random.uniform(1.5, 3)) 
    # 发送AI微信回复
    #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
    if (not video_urls) and (not img_urls):
        async with message_lock:
            await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content)
    
    await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
    # 回复的对话
    input_wx_content_dialogue_message = [{"type": "text", "text": reply_content}]
    input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message, True)
    await request.app.state.kafka_service.send_message_async(input_message)
    logger.info("发送对话 %s", input_message)
    
    end_time = time.time()    # 记录任务结束时间
    execution_time = end_time - start_time  # 计算执行时间
    logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")

# async def ai_post_text_split_async_async(request: Request, token_id, app_id, callback_to_user, reply_content):
#     parts = reply_content.split('\n\n')
#     for part in parts:
#         await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, part)
#         await asyncio.sleep(random.uniform(1.5, 3))

async def ai_post_text_split_async_async(request: Request, token_id, app_id, callback_to_user, reply_content):
    parts = reply_content.split('\n\n')
    i = 0
    while i < len(parts):
        current_part = parts[i].strip()  # 去除首尾空白字符
        # 如果当前段长度小于30个汉字,则尝试与下一段合并
        while len(current_part) < 30 and i + 1 < len(parts):
            i += 1
            current_part += "\n " + parts[i].strip()  # 合并下一段
        # 打印合并后的内容
        await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, current_part)
        await asyncio.sleep(random.uniform(1.5, 3))
        i += 1

async def handle_text_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    群聊文本消息
    '''
    msg_content=msg_data["Content"]["string"]
    msg_push_content=msg_data.get("PushContent")
    k,login_info=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
    nickname=login_info.get("nickName")

    config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    validated_config = AgentConfig.model_validate(config)
    if not validated_config.privateGroupChatEnabled:
        logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_group_async.__name__} 不回复消息")
        return     

    if from_wxid == to_wxid:  #手动发送消息
        logger.info("Active message sending detected")
        
        await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
        callback_to_user=msg_data["ToUserName"]["string"]

        input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
        input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
        await request.app.state.kafka_service.send_message_async(input_message)
        logger.info("发送对话 %s",input_message)
    else:
        c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
        chatroom_id_white_list = c.get("chatroomIdWhiteList", [])

        if not chatroom_id_white_list:
            logger.info('白名单为空或未定义,不处理')
            return

        if from_wxid not in chatroom_id_white_list:
            logger.info(f'群ID {from_wxid} 不在白名单中,不处理')
            return
        
        if '在群聊中@了你' in msg_push_content or '@'+nickname  in msg_push_content:

            callback_to_user=msg_data["FromUserName"]["string"]
            hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'

            prompt={"role": "user", "content": [{
                "type": "text",
                "text": msg_content
            }]}
            messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
            # 收到的对话
            input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
            input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
            await request.app.state.kafka_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)
            
            cache_data = USER_INTERACTIVE_CACHE.get(wxid)
            if cache_data and cache_data.get('interactive') :
                o=get_first_char_if_digit(msg_content)
                if o is not None:
                    userSelectOptions=cache_data.get('options')  
                    if o < len(userSelectOptions):
                        o=o-1
                        msg_content=userSelectOptions[o].get("value")
                        messages_to_send=[{"role": "user", "content": msg_content}]
                    else:    
                        messages_to_send=[{"role": "user", "content": msg_content}] 
                else:    
                    messages_to_send=[{"role": "user", "content": msg_content}]
            
            res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
            reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])

            description = ''
            userSelectOptions = []

            if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
                for item in reply_content:
                    if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
                        params = item["interactive"]["params"]
                        description = params.get("description")
                        userSelectOptions = params.get("userSelectOptions", [])
                values_string = "\n".join(option["value"] for option in userSelectOptions)
                
                if description is not None:
                    USER_INTERACTIVE_CACHE[wxid] = {
                    "interactive":True,
                    "options": userSelectOptions,
                    }          
                    reply_content=description + '------------------------------\n'+values_string
                
            elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
                USER_INTERACTIVE_CACHE[wxid] = {
                    "interactive":False
                }
                text=''
                for item in reply_content:
                    if item["type"] == "text":
                        text=item["text"]["content"]
                if text=='':
                    # 去除上次上一轮对话再次请求
                    cache_messages_str=await request.app.state.redis_service.get_hash_field(hash_key,"data")
                    cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
                    
                    if len(cache_messages) >= 3:
                        cache_messages = cache_messages[:-3]  

                    await request.app.state.redis_service.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
                    messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
                    res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
                    reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
                else:
                    reply_content=text
            else:
                USER_INTERACTIVE_CACHE[wxid] = {
                    "interactive":False
                }     
                reply_content=res["choices"][0]["message"]["content"]

        
            reply_content='@'+extract_nickname(msg_push_content) + reply_content

            #await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content)
            if '不回复' in reply_content:
              
                logger.warning(f"{from_wxid} 群聊中 AI回答<不回复>,跳过微信回复")
                await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
                return      

            k,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
            wxid_nickname=loginfo.get("nickName")          

            message_lock=request.app.state.message_lock[app_id]
            # 昵称替换
            replacements = {
                '{昵称}': wxid_nickname,
                '{好友昵称}': extract_nickname(msg_push_content)
            }
            reply_content=replace_placeholders(reply_content, replacements)

            # 判断图片url
            img_urls,reply_content=extract_and_replace_image_urls(reply_content)
            if img_urls:
                async with message_lock:
                    await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content)
                    await asyncio.sleep(random.uniform(1.5, 3)) 
                    for img_url in img_urls:
                        await request.app.state.gewe_service.post_image_async(token_id, app_id, callback_to_user, img_url)
                        await asyncio.sleep(random.uniform(1.5, 3)) 

            # 判断视频url
            video_urls,reply_content=extract_and_replace_video_urls(reply_content)
            if video_urls:
                async with message_lock:
                    await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content)
                    await asyncio.sleep(random.uniform(1.5, 3)) 
                    for video_url in video_urls:
                        parsed_url = urlparse(video_url)
                        filename = os.path.basename(parsed_url.path)
                        tmp_file_path = os.path.join(os.getcwd(),'tmp', filename)  # 拼接完整路径
                        thumbnail_path=tmp_file_path.replace('.mp4','.jpg')
                        
                        video_thumb_url,video_duration =download_video_and_get_thumbnail(video_url,thumbnail_path)
                        logger.info(f'{wxid} 视频缩略图 {video_thumb_url} 时长 {video_duration}')
                        ret,ret_msg,res = await request.app.state.gewe_service.post_video_async(token_id, app_id, callback_to_user, video_url,video_thumb_url,video_duration)
                        if ret!=200:
                            logger.warning(f'{wxid} 发送视频{video_url} 到 {callback_to_user} 失败,{ret_msg}')
                        await asyncio.sleep(random.uniform(1.5, 3)) 
           
            # 发送AI微信回复
            if (not video_urls) and (not img_urls):
                async with message_lock:
                    await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content)

            
            
            await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
            # 回复的对话
            reply_content=f'{wxid}:\n'+ reply_content
            input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
            input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
            await request.app.state.kafka_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)
        else:
            logger.info('群聊公开消息')
            callback_to_user=msg_data["FromUserName"]["string"]
            group_dialogue_message=[{"type": "text", "text": msg_content}]
            input_message=dialogue_message(callback_to_user,wxid,group_dialogue_message)
            await request.app.state.kafka_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)
            return
    
async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    私聊图片消息
    '''
    config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    validated_config = AgentConfig.model_validate(config)
    if not validated_config.privateGroupChatEnabled:
        logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_image_async.__name__} 不回复消息")
        return 
    
    msg_content=msg_data["Content"]["string"]

    callback_to_user=from_wxid
    hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
    wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content)
    if not wx_img_url:
        logger.warning(f'{wxid} 下载 {callback_to_user} 图片失败')
        return 
    oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
    oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
    oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
    oss_bucket_name="cow-agent"
    oss_prefix="cow"

    img_url=upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)

    prompt={
        "role": "user", 
        "content": [{
            "type": "image_url",
            "image_url": {"url": img_url}
            }]
    }

    await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
    await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
    logger.info(f"上传图片 URL: {img_url}")

    wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
    input_message=dialogue_message(from_wxid,to_wxid,wx_content_dialogue_message)
    await request.app.state.kafka_service.send_message_async(input_message)
    logger.info("发送对话 %s",input_message)

async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
        logger.info('群聊图片消息')
        msg_content=msg_data["Content"]["string"]
        callback_to_user=msg_data["FromUserName"]["string"]

        config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
        validated_config = AgentConfig.model_validate(config)
        if not validated_config.privateGroupChatEnabled:
            logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_image_group_async.__name__} 不回复消息")
            return         

        # aeskey = re.search(r'aeskey="([^"]+)"', msg_content).group(1)
        # cdnthumburl = re.search(r'cdnthumburl="([^"]+)"', msg_content).group(1)
        # md5 = re.search(r'md5="([^"]+)"', msg_content).group(1)
        # cdnthumblength = re.search(r'cdnthumblength="([^"]+)"', msg_content).group(1)
        # cdnthumbheight = re.search(r'cdnthumbheight="([^"]+)"', msg_content).group(1)
        # cdnthumbwidth = re.search(r'cdnthumbwidth="([^"]+)"', msg_content).group(1)
        # length = re.search(r'length="([^"]+)"', msg_content).group(1)

        # img_xml=f'<?xml version=\"1.0\"?>\n<msg>\n\t<img aeskey=\"{aeskey}\" encryver=\"1\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnthumburl}\" cdnthumblength=\"{cdnthumblength}\" cdnthumbheight=\"{cdnthumbheight}\" cdnthumbwidth=\"{cdnthumbwidth}\" cdnmidheight=\"0\" cdnmidwidth=\"0\" cdnhdheight=\"0\" cdnhdwidth=\"0\" cdnmidimgurl=\"{cdnthumburl}\" length=\"{length}\" md5=\"{md5}\" />\n\t<platform_signature></platform_signature>\n\t<imgdatahash></imgdatahash>\n</msg>'
        xml_match = re.search(r'<\?xml.*</msg>', msg_content, re.DOTALL)
        if not xml_match:
            logger.info(f'找不到图片地址')
            return 
        img_xml=xml_match.group(0)
        
        wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,img_xml)
        if not wx_img_url:
            logger.warning(f'{wxid} 下载 {callback_to_user} 图片失败')
            return   
              
        oss_url=wx_img_url_to_oss_url(wx_img_url)

        reply_content = re.sub(r'<\?xml.*', f'{oss_url}', msg_content, flags=re.DOTALL)
        #reply_content=f'{wxid}:\n{oss_url}'
        input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
        input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message,False)
        await request.app.state.kafka_service.send_message_async(input_message)
        logger.info("发送对话 %s",input_message)


async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    私聊语音消息
    '''
    callback_to_user=from_wxid
    msg_content=msg_data["Content"]["string"]
    msg_id=msg_data["MsgId"]

    config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    validated_config = AgentConfig.model_validate(config)
    if not validated_config.privateGroupChatEnabled:
        logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_voice_async.__name__} 不回复消息")
        return 
    

    request.app.state.message_lock[app_id]=asyncio.Lock()

    config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
    validated_config = AgentConfig.model_validate(config)
    if not validated_config.privateGroupChatEnabled:
        logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_voice_async.__name__} 不回复消息")
        return     


    file_url=await request.app.state.gewe_service.download_audio_msg_async(token_id,app_id,msg_id,msg_content)
    react_silk_path=await save_to_local_from_url_async(file_url)
    react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
    audio_convert.any_to_wav(react_silk_path,react_wav_path)
    react_voice_text=AliVoice().voiceToText(react_wav_path)

    os.remove(react_silk_path)
    os.remove(react_wav_path)

    hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
    messages=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "user", "content": react_voice_text})
    ai_res=await gpt_client_async(request,messages,wxid,callback_to_user)
    ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"])

    if ai_res_content == '不回复':
        logger.warning(f"语音消息,AI回答任务不回复,AI回答<不回复>,跳过微信语音回复")
        await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": ai_res_content})
        return 

    has_url=contains_url(ai_res_content)
    if not has_url:    
        voice_during,voice_url=wx_voice(ai_res_content)
        
        if voice_during < 60 * 1000:
            ret,ret_msg,res=await request.app.state.gewe_service.post_voice_async(token_id,app_id,callback_to_user,voice_url,voice_during)
            
        else:
            ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)    
            logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
        if ret==200:
            logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))

        else:
            logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
            ret,ret_msg,res==await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
            logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
            
    else:
        # logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
        # ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
        message_lock=request.app.state.message_lock[app_id]
        reply_content=ai_res_content
        # 判断图片url
        img_urls,reply_content=extract_and_replace_image_urls(reply_content)
        if img_urls:

            #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
            async with message_lock:
                await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content)
                await asyncio.sleep(random.uniform(1.5, 3)) 
                for img_url in img_urls:
                    await request.app.state.gewe_service.post_image_async(token_id, app_id, callback_to_user, img_url)
                    await asyncio.sleep(random.uniform(1.5, 3)) 

        # 判断视频url
        video_urls,reply_content=extract_and_replace_video_urls(reply_content)
        if video_urls:
            #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
            async with message_lock:
                await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content)
                await asyncio.sleep(random.uniform(1.5, 3)) 
                for video_url in video_urls:
                    parsed_url = urlparse(video_url)
                    filename = os.path.basename(parsed_url.path)
                    tmp_file_path = os.path.join(os.getcwd(),'tmp', filename)  # 拼接完整路径
                    thumbnail_path=tmp_file_path.replace('.mp4','.jpg')
                    
                    video_thumb_url,video_duration =download_video_and_get_thumbnail(video_url,thumbnail_path)
                    logger.info(f'{wxid} 视频缩略图 {video_thumb_url} 时长 {video_duration}')
                    ret,ret_msg,res = await request.app.state.gewe_service.post_video_async(token_id, app_id, callback_to_user, video_url,video_thumb_url,video_duration)
                    if ret!=200:
                        logger.warning(f'{wxid} 发送视频{video_url} 到 {callback_to_user} 失败,{ret_msg}')
                    await asyncio.sleep(random.uniform(1.5, 3)) 
        # 发送AI微信回复
        #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
        if (not video_urls) and (not img_urls):
            async with message_lock:
                await ai_post_text_split_async_async(request, token_id, app_id, callback_to_user, reply_content)
        


   
    await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": ai_res_content})
    # 构造对话消息并发送到 Kafka
    input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
    input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
    await request.app.state.kafka_service.send_message_async(input_message)
    logger.info("发送对话 %s", input_message)

async def handle_voice_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
        logger.info('群聊语音消息')

async def handle_name_card_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    logger.info('名片消息')

    try:

        if from_wxid == wxid:
            logger.info(f'自己发的名片消息,不处理')
            return

        msg_content_xml=msg_data["Content"]["string"]
        # 解析XML字符串
        root = ET.fromstring(msg_content_xml)

        # 提取alias属性
        alias_value = root.get("alias")
        # 加好友资料
        scene = int(root.get("scene"))
        v3 = root.get("username")
        v4 = root.get("antispamticket")

        logger.info(f"alias_value: {alias_value}, scene: {scene}, v3: {v3}, v4: {v4}")
        # 判断appid 是否已经创建3天
        k, login_info = await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
        creation_timestamp=int(login_info.get('create_at',time.time()))
        current_timestamp = time.time()
        three_days_seconds = 3 * 24 * 60 * 60  # 三天的秒数
        diff_flag=(current_timestamp - creation_timestamp) >= three_days_seconds
        if not diff_flag:
            log_content=f'名片添加好友功能,{wxid} 用户创建不够三天,不能使用该功能'
            logger.warning(log_content)
            return 

        # 将加好友资料添加到待加好友队列
        #gewe_chat.wxchat.enqueue_to_add_contacts(wxid,scene,v3,v4)
        _,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)

        nickname=loginfo.get('nickName')
        add_contact_content=f'您好,我是{nickname}'
        #gewe_chat.wxchat.add_contacts(token_id,app_id,scene,Models.OperationType.ADD_FRIEND,v3,v4,add_contact_content)
        await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,OperationType.ADD_FRIEND.value,v3,v4,add_contact_content)

    except ET.ParseError as e:
        logger.error(f"XML解析错误: {e}")
    except KeyError as e:
        logger.error(f"字典键错误: {e}")
    except Exception as e:
        logger.error(f"未知错误: {e}")

async def handle_video_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    logger.info(f'{wxid} 视频消息')
    try:
        msg_content_xml=msg_data["Content"]["string"]
        wx_video_url=await request.app.state.gewe_service.download_video_msg_async(token_id,app_id,msg_content_xml)
        if not wx_video_url:
            logger.warning(f'处理微信视频消息异常')
            return 
        callback_to_user=from_wxid
        print(wx_video_url)

        file_url=url_file_to_oss(wx_video_url)
        if not file_url:
            logger.warning(f'处理微信视频上传到oss异常')
            return
        wx_content_dialogue_message = [{"type": "file", "file_url": {"url":file_url}}]
        k_message = dialogue_message(callback_to_user,wxid, wx_content_dialogue_message)
        await request.app.state.kafka_service.send_message_async(k_message)
        logger.info("发送对话 %s",k_message)
    except Exception as e:
        logger.error(f"出现错误: {e}")

async def handle_video_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    logger.info('群聊视频消息')
    try:
        msg_content=msg_data["Content"]["string"]
        xml_match = re.search(r'<\?xml.*</msg>', msg_content, re.DOTALL)
        if not xml_match:
            logger.info(f'找不到视频地址')
            return 
        
        msg_content_xml=xml_match.group(0)
        wx_video_url=await request.app.state.gewe_service.download_video_msg_async(token_id,app_id,msg_content_xml)
        if not wx_video_url:
            logger.warning(f'处理微信视频消息异常')
            return 
        callback_to_user=from_wxid
        print(wx_video_url)

        file_url=url_file_to_oss(wx_video_url)
        if not file_url:
            logger.warning(f'处理微信视频上传到oss异常')
            return
        reply_content = re.sub(r'<\?xml.*', f'{file_url}', msg_content, flags=re.DOTALL)
        
        input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
        k_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message,False)
        await request.app.state.kafka_service.send_message_async(k_message)
        logger.info("发送对话 %s",k_message)
    except Exception as e:
        logger.error(f"出现错误: {e}")       

async def handle_xml_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    处理xml
    '''
    try:
        msg_content_xml=msg_data["Content"]["string"]
        root = ET.fromstring(msg_content_xml)
        type_value = int(root.find(".//appmsg/type").text)
        handlers = {
                57: handle_xml_reference_async,
                5:  handle_xml_invite_group_async
            }
        handler = handlers.get(type_value)
        if handler:
            return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
        # elif "邀请你加入了群聊" in msg_content_xml:  # 邀请加入群聊
        #     logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
        else:
            print(f"xml消息 {type_value} 未解析")
    except ET.ParseError as e:
        logger.error(f"解析XML失败: {e}")
    except Exception as e:
        logger.error(f"未知错误: {e}")
    return
    
async def handle_xml_reference_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
        '''
        引用消息
        判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
        '''       
        callback_to_user=from_wxid
        hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'

        msg_content= msg_data["PushContent"]   

        prompt={"role": "user", "content": [{
            "type": "text",
            "text": msg_content
        }]}
        
        # 收到的对话
        messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
        input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
        input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
        await request.app.state.kafka_service.send_message_async(input_message)
        logger.info("发送对话 %s",input_message)

        # 回复的对话
        res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
        reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])             
        input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
        input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
        await request.app.state.kafka_service.kafka_client.produce_message(input_message)
        logger.info("发送对话 %s",input_message)
        await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
        await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content)

async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    群聊邀请
    判断此类消息的逻辑:$.Data.MsgType=49 
    并且 解析$.Data.Content.string中的xml msg.appmsg.title=邀请你加入群聊(根据手机设置的系统语言title会有调整,不同语言关键字不同)
    '''
    logger.info(f'{wxid} 群聊邀请')
    msg_content_xml=msg_data["Content"]["string"]
    root = ET.fromstring(msg_content_xml)
    title_value = root.find(".//appmsg/title").text
    
    if '邀请你加入群聊' in title_value:
        invite_url = root.find('.//url').text
        ret,msg,data=await request.app.state.gewe_service.agree_join_room_async(token_id,app_id,invite_url)
        if ret==200:
            logger.info(f'群聊邀请,同意加入群聊 {msg} {data}')
            chatroom_id=data.get('chatroomId','')
            # if not chatroom_id:
            #     logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
            #     return
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
            logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # # 单个群信息推送到kafka
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # 全量群信息推送到kafka
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

        else:
            logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')   

async def handle_file_message_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    logger.info(f'{wxid} 微信文件消息处理')
    try:
        msg_content_xml=msg_data["Content"]["string"]
        callback_to_user=from_wxid
        wx_file_url=await request.app.state.gewe_service.download_file_msg_async(token_id,app_id,msg_content_xml)
        if not wx_file_url:
            logger.warning(f'处理微信文件消息异常')
            return 
        file_url=url_file_to_oss(wx_file_url)
        wx_content_dialogue_message = [{"type": "file", "file_url": {"url":file_url}}]
        k_message = dialogue_message(callback_to_user,wxid, wx_content_dialogue_message)
        await request.app.state.kafka_service.send_message_async(k_message)
        logger.info("发送对话 %s",k_message)        
    except Exception as e:
        logger.error(f"出现错误: {e}")


async def handle_file_message_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    logger.info('群聊文件消息处理')
    try:
        msg_content=msg_data["Content"]["string"]
        xml_match = re.search(r'<\?xml.*</msg>', msg_content, re.DOTALL)
        if not xml_match:
            logger.info(f'找不到文件地址')
            return 
        
        msg_content_xml=xml_match.group(0)
        wx_file_url=await request.app.state.gewe_service.download_video_msg_async(token_id,app_id,msg_content_xml)
        if not wx_file_url:
            logger.warning(f'群聊文件消息处理异常')
            return 
        callback_to_user=from_wxid
        print(wx_file_url)

        file_url=url_file_to_oss(wx_file_url)
        if not file_url:
            logger.warning(f'处理微信视频上传到oss异常')
            return
        reply_content = re.sub(r'<\?xml.*', f'{file_url}', msg_content, flags=re.DOTALL)
        
        input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
        k_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message,False)
        await request.app.state.kafka_service.send_message_async(k_message)
        logger.info("发送对话 %s",k_message)
    except Exception as e:
        logger.error(f"出现错误: {e}")       

async def handle_add_friend_notice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    好友添加请求通知
    '''
    logger.info('好友添加请求通知')
    try:
        msg_content_xml=msg_data["Content"]["string"]
        
        root = ET.fromstring(msg_content_xml)

        msg_content = root.attrib.get('content', None)
        v3= root.attrib.get('encryptusername', None)
        v4= root.attrib.get('ticket', None)
        scene=root.attrib.get('scene', None)

        
        to_contact_wxid=root.attrib.get('fromusername', None)
        wxid=msg_data["ToUserName"]["string"]

        # 自动同意好友
        # print(v3)
        # print(v4)
        # print(scene)
        # print(msg_content)
        # 操作类型,2添加好友 3同意好友 4拒绝好友
        #option=2
        option=3
        reply_add_contact_contact="亲,我是你的好友"
        ret,ret_msg=await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
        if ret==200:
            logger.info('自动添加好友成功')

            # 好友发送的文字
            hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
            prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
            messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)

            input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
            input_message=dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
            await request.app.state.kafka_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)

            callback_to_user=to_contact_wxid
            res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
            reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])

            #保存好友信息
            await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id, wxid,[to_contact_wxid])
           
            if reply_content == '不回复':
                logger.warning(f"加好友消息,AI回答<不回复>,跳过微信加好友打招呼消息回复")
                await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
                return 
            
            # 保存到缓存
            await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
           

            

            # 发送信息
            await request.app.state.gewe_service.post_text_async(token_id,app_id, to_contact_wxid,reply_content)
        
            # 发送到kafka
            input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
            input_message=dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
            await request.app.state.kafka_service.send_message_async(input_message)
            logger.info("发送对话 %s",input_message)

        else:
            logger.warning("添加好友失败")
    except ET.ParseError as e:
        logger.error(f"解析XML失败: {e}")
    except Exception as e:
        logger.error(f"未知错误: {e}")
    return

async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):

    '''
    群聊邀请
    撤回消息
    拍一拍消息
    地理位置
    踢出群聊通知
    解散群聊通知
    发布群公告
    '''

    try:
        msg_content_xml=msg_data["Content"]["string"]
        # 群聊邀请
        if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
            chatroom_id=msg_data["FromUserName"]["string"]
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
            logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # #推送群资料到kafka
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # await request.app.state.kafka_service.send_message_async(k_message)

            # 全量群信息推送到kafka
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

        if  '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
            chatroom_id=msg_data["FromUserName"]["string"]
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
            logger.info(f'移出群聊,移除从通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
            await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
            logger.info(f'清除 chatroom_id{chatroom_id} 数据')
            # 推送删除群资料到kafka
            k_message=wx_del_group_message(wxid,chatroom_id)
            await request.app.state.kafka_service.send_message_async(k_message)

        
        if  '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
            chatroom_id=msg_data["FromUserName"]["string"]
            ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
            logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id}  {msg}')
            await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
            await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
            logger.info(f'清除 chatroom_id{chatroom_id} 数据')
            # 推送删除群资料到kafka
            k_message=wx_del_group_message(wxid,chatroom_id)
            await request.app.state.kafka_service.send_message_async(k_message)

        if 'mmchatroombarannouncememt' in msg_content_xml :
            chatroom_id=msg_data["FromUserName"]["string"]
            logger.info(f'发布群公告 chatroom_id {chatroom_id}')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # #推送群资料到kafka
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # await request.app.state.kafka_service.send_message_async(k_message)、
        
            # 全量群信息推送到kafka
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

        if 'roomtoolstips' in msg_content_xml :
            chatroom_id=msg_data["FromUserName"]["string"]
            logger.info(f'群待办 chatroom_id {chatroom_id} ')
            await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
            await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
            
            # #推送群资料到kafka
            # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
            # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
            # await request.app.state.kafka_service.send_message_async(k_message)

            # 全量群信息推送到kafka
            #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
            #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
            k_message=wx_groups_info_members_key_message(wxid)
            await request.app.state.kafka_service.send_message_async(k_message)

    except ET.ParseError as e:
        logger.error(f"解析XML失败: {e}")
    except Exception as e:
        logger.error(f"未知错误: {e}")
    return

async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
    '''
    修改群名称
    更换群主通知
    被移除群聊通知
    '''
    content=msg_data.get("Content","").get("string","")

    if '修改群名' or '新群主' or '被移除群聊通知' in content and check_chatroom(from_wxid):
        chatroom_id=from_wxid
        logger.info(f'{content} chatroom_id {chatroom_id}  ')
        await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
        await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)

        # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
        # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
        # await request.app.state.kafka_service.send_message_async(k_message)

        # 全量群信息推送到kafka
        #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
        #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
        k_message=wx_groups_info_members_key_message(wxid)
        await request.app.state.kafka_service.send_message_async(k_message)

    return