from voice.ali.ali_voice import AliVoice from common.log import logger import xml.etree.ElementTree as ET import os,json,asyncio,aiohttp from voice import audio_convert from fastapi import APIRouter,Request from pydantic import BaseModel from fastapi import APIRouter, Depends from typing import Dict, Any from model.models import AgentConfig,OperationType from common.utils import * from common.memory import * timeout_duration = 2.0 messages_router = APIRouter() WX_BACKLIST=['fmessage', 'medianote','weixin','weixingongzhong','tmessage'] @messages_router.post("/messages",response_model=None) async def get_chatroominfo(request: Request, body: Dict[str, Any]): logger.info(f"收到微信回调消息: {json.dumps(msg, separators=(',', ':'),ensure_ascii=False)}") try: msg = body type_name =msg.get("TypeName") app_id = msg.get("Appid") k, loginfo = await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id) if not k: logger.warning('找不到登录信息,不处理') return {"message": f"收到微信回调消息: {type_name}"} token_id=loginfo.get('tokenId','') wxid = msg.get("Wxid",'') if type_name == 'AddMsg': await handle_self_cmd_async(request,wxid,msg) msg_data = msg.get("Data") from_wxid = msg_data["FromUserName"]["string"] config=await request.app.state.redis_service.get_hash(f"__AI_OPS_WX__:WXCHAT_CONFIG") wxids=config.keys() WX_BACKLIST.extend(wxids) # 公众号ID已gh_开头 if from_wxid in WX_BACKLIST or 'gh_' in from_wxid: logger.warning(f'微信ID {wxid} 在黑名单,不处理') return {"message": "收到微信回调消息"} await handle_messages_async(request,token_id,msg) return {"message": "收到微信回调消息,处理完成"} except Exception as e: logger.error(f"无法解析微信回调消息: {body} {e}") return {"message": "无法解析微信回调消息"} async def handle_self_cmd_async(request: Request,wxid,msg): ''' 个人微信命令处理 如果是个人微信的指令,wxid == from_wxid commands = { '启用托管': True, '关闭托管': False } ''' msg_data=msg.get("Data") from_wxid=msg_data["FromUserName"]["string"] msg_content=msg_data["Content"]["string"] if wxid == from_wxid: commands = { '启用托管': True, '关闭托管': False } if msg_content in commands: c_dict = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) if c_dict: agent_config=AgentConfig.model_validate(c_dict) agent_config.agentEnabled=commands[msg_content] await request.app.state.gewe_service.wxchat.save_wxchat_config_async(wxid, agent_config.model_dump()) logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管') async def handle_messages_async(request: Request,token_id,msg): #msg_data=msg.get("Data") type_name =msg.get("TypeName") # app_id = msg.get("Appid") # from_wxid=msg_data["FromUserName"]["string"] # msg_content=msg_data["Content"]["string"] wxid = msg.get("Wxid",'') match type_name: case 'AddMsg': await handle_add_messages_async(request,token_id,msg,wxid) case 'ModContacts': await handle_mod_contacts_async(request,token_id,msg,wxid) case 'DelContacts': await handle_del_contacts_async(request,token_id,msg,wxid) case 'Offline': await handle_offline_async(request,token_id,msg,wxid) case _: logger.warning(f'未知消息类型:{type_name}') async def gpt_client_async(request,messages: list, wixd: str, friend_wxid: str): c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd) api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH") print(f'流程key:{api_key}\n') #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试 #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2 api_url = "http://106.15.182.218:3000/api/v1/chat/completions" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } session_id = f'{wixd}-{friend_wxid}' data = { "model": "", "messages": messages, "chatId": session_id, "detail": True } logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False))) async with aiohttp.ClientSession() as session: try: async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) as response: response.raise_for_status() response_data = await response.json() logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'), ensure_ascii=False))) return response_data except aiohttp.ClientError as e: logger.error(f"[CHATGPT] 请求失败: {e}") raise async def handle_add_messages_async(request: Request,token_id,msg,wxid): wx_config =await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) if not bool(wx_config.get("agentEnabled",False)): logger.info(f'微信ID {wxid} 未托管,不处理') return app_id = msg.get("Appid") msg_data = msg.get("Data") msg_type = msg_data.get("MsgType",None) from_wxid = msg_data["FromUserName"]["string"] to_wxid = msg_data["ToUserName"]["string"] msg_push_content=msg_data.get("PushContent") handlers = { 1: handle_text_async, 3: handle_image_async, 34: handle_voice_async, 42: handle_name_card_async, 49: handle_xml_async, 37: handle_add_friend_notice_async, 10002: handle_10002_msg_async, 10000: handle_10000_msg_async } # (扫码进群情况)判断受否是群聊,并添加到通信录 if check_chatroom(from_wxid) or check_chatroom(to_wxid): logger.info('群信息') chatroom_id=from_wxid ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3) logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) handlers[1]=handle_text_group_async handlers[3]=handle_image_group_async handlers[34]=handle_voice_group_async handler = handlers.get(msg_type) if handler: return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid) else: logger.warning(f"微信回调消息类型 {msg_type} 未处理") async def handle_mod_contacts_async(request: Request,token_id,msg,wxid): ''' 好友通过验证及好友资料变更的通知消息 ''' wxid = msg.get("Wxid") msg_data = msg.get("Data") app_id = msg.get("Appid") # #handle_mod_contacts(token_id,app_id,wxid,msg_data) # user_name=msg_data.get("UserName",{}).get("string","") if not check_chatroom(user_name): loop = asyncio.get_event_loop() future = asyncio.run_coroutine_threadsafe( handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data), loop ) contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"] nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"] city=msg_data.get("City",None) signature=msg_data.get("Signature",None) province=msg_data.get("Province",None) bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"] country=msg_data.get("Country",None) sex=msg_data.get("Sex",None) pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"] quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"] remark=msg_data.get("Remark",{}).get("string",None) remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None) remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None) smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None) # data=gewe_chat.wxchat.get_brief_info(token_id,app_id,[contact_wxid]) # contact=data[0] # alias=contact.get("alias") #推动到kafka contact_data = { "alias": None, "bigHeadImgUrl": bigHeadImgUrl, "cardImgUrl": None, "city": city, "country": country, "description": None, "labelList": None, "nickName": nickname, "phoneNumList": None, "province": province, "pyInitial": pyInitial, "quanPin": quanPin, "remark": remark, "remarkPyInitial": remarkPyInitial, "remarkQuanPin": remarkQuanPin, "sex": sex, "signature": signature, "smallHeadImgUrl": smallHeadImgUrl, "snsBgImg": None, "userName": contact_wxid } input_message=wx_mod_contact_message(wxid,contact_data) await request.app.state.kafka_service.send_message_async(input_message) else: logger.info('群信息变更通知') chatroom_id=user_name await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) k_message=wx_mod_group_info_members_message(wxid,group_info_members) await request.app.state.kafka_service.send_message_async(k_message) async def handle_del_contacts_async(request: Request,token_id,msg,wxid): ''' 删除好友通知/退出群聊 ''' msg_data = msg.get("Data") username=msg_data["UserName"]["string"] if check_chatroom(username): logger.info('退出群聊') wxid = msg.get("Wxid") chatroom_id=username await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) logger.info(f'清除 chatroom_id{chatroom_id} 数据') # 推送删除群资料到kafka k_message=wx_del_group_message(wxid,chatroom_id) await request.app.state.kafka_service.send_message_async(k_message) else: logger.info('删除好友通知') # 推送到kafka input_message=wx_del_contact_message(wxid,username) await request.app.state.kafka_service.send_message_async(input_message) async def handle_offline_async(request: Request,token_id,msg,wxid): ''' 已经离线 ''' wxid = msg.get("Wxid") app_id = msg.get("Appid") logger.warning(f'微信ID {wxid}在设备{app_id}已经离线') k,r=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id) print(k) await request.app.state.redis_service.update_hash_field(k,'status',0) await request.app.state.redis_service.update_hash_field(k,'modify_at',int(time.time())) # 推送到kafka input_message=wx_offline_message(app_id,wxid) await request.app.state.kafka_service.send_message_async(input_message) async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid,msg_data): ''' 好友通过验证及好友资料变更的通知消息 ''' logger.info('好友通过验证及好友资料变更的通知消息') if not check_chatroom(msg_data["UserName"]["string"]): contact_wxid = msg_data["UserName"]["string"] # 更新好友信息 # 检查好友关系,不是好友则删除 # ret,msg,check_relation=gewe_chat.wxchat.check_relation(token_id, app_id,[contact_wxid]) # first_item = check_relation[0] # check_relation_status=first_item.get('relation') # logger.info(f'{wxid} 好友 {contact_wxid} 关系检查:{check_relation_status}') # if check_relation_status != 0: # gewe_chat.wxchat.delete_contacts_brief_from_cache(wxid, [contact_wxid]) # logger.info(f'好友关系异常:{check_relation_status},删除好友 {contact_wxid} 信息') # else: # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, [contact_wxid]) ret,msg,contacts_list = await request.app.state.gewe_service.fetch_contacts_list_async(token_id, app_id) # friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围 # print(friend_wxids) #friend_wxids.remove('fmessage') #friend_wxids.remove('weixin') friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong','tmessage']] # 可以调整截取范围 print(f'{wxid}的好友数量 {len(friend_wxids)}') await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids) else: logger.info('群聊好友通过验证及好友资料变更的通知消息') async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 私聊文本消息 ''' msg_content=msg_data["Content"]["string"] if wxid == from_wxid: #手动发送消息 logger.info("Active message sending detected") await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid]) callback_to_user=msg_data["ToUserName"]["string"] input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message) await request.app.state.kafaka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) else: callback_to_user=msg_data["FromUserName"]["string"] # 创建并启动任务协程,将参数传递给 ai_chat_text 函数 task = asyncio.create_task( ai_chat_text_async(request,token_id, app_id, wxid, msg_data, msg_content) ) # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数 timeout_timer = asyncio.create_task( check_timeout_async(task, request,token_id, wxid, app_id, callback_to_user) ) # 等待任务协程完成 await task # 取消定时器 timeout_timer.cancel() async def check_timeout_async(task: asyncio.Task, request: Request,token_id, wxid, app_id, callback_to_user): await asyncio.sleep(timeout_duration) # 等待超时时间 if not task.done(): print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}") wx_config = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) if bool(wx_config.get("chatWaitingMsgEnabled", True)): await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "亲,我正在组织回复的信息,请稍等一会") async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, msg_content): start_time = time.time() # 记录任务开始时间 callback_to_user = msg_data["FromUserName"]["string"] hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' prompt = {"role": "user", "content": [{ "type": "text", "text": msg_content }]} messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) # 收到的对话 input_wx_content_dialogue_message = [{"type": "text", "text": msg_content}] input_message = dialogue_message(callback_to_user, wxid, input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) cache_data = USER_INTERACTIVE_CACHE.get(wxid) if cache_data and cache_data.get('interactive'): o = get_first_char_if_digit(msg_content) if o is not None: userSelectOptions = cache_data.get('options') if o < len(userSelectOptions): o = o - 1 msg_content = userSelectOptions[o].get("value") messages_to_send = [{"role": "user", "content": msg_content}] else: messages_to_send = [{"role": "user", "content": msg_content}] else: messages_to_send = [{"role": "user", "content": msg_content}] res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user) reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"]) description = '' userSelectOptions = [] if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): for item in reply_content: if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": params = item["interactive"]["params"] description = params.get("description") userSelectOptions = params.get("userSelectOptions", []) values_string = "\n".join(option["value"] for option in userSelectOptions) if description is not None: USER_INTERACTIVE_CACHE[wxid] = { "interactive": True, "options": userSelectOptions, } reply_content = description + '------------------------------\n' + values_string elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): USER_INTERACTIVE_CACHE[wxid] = { "interactive": False } text = '' for item in reply_content: if item["type"] == "text": text = item["text"]["content"] if text == '': # 去除上次上一轮对话再次请求 cache_messages_str = await request.app.state.redis_service.get_hash_field(hash_key, "data") cache_messages = json.loads(cache_messages_str) if cache_messages_str else [] if len(cache_messages) >= 3: cache_messages = cache_messages[:-3] await request.app.state.redis_service.update_hash_field(hash_key, "data", json.dumps(cache_messages, ensure_ascii=False)) messages_to_send = await request.app.state.redis_service.save_session_messages_to_cache_async(hash_key, prompt) res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user) reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"]) if isinstance(reply_content, list): reply_content = remove_markdown_symbol(reply_content[0].get('text').get("content")) else: reply_content = text else: USER_INTERACTIVE_CACHE[wxid] = { "interactive": False } reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"]) if callback_to_user not in 'wx_nBGqh6_Rw6pW8KXg0AudW': logger.info(f'{callback_to_user} 不发送到微信有关{msg_content}的AI回复到微信') await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content) await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) # 回复的对话 input_wx_content_dialogue_message = [{"type": "text", "text": reply_content}] input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message, True) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) end_time = time.time() # 记录任务结束时间 execution_time = end_time - start_time # 计算执行时间 logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒") async def handle_text_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 群聊文本消息 ''' msg_content=msg_data["Content"]["string"] msg_push_content=msg_data.get("PushContent") k,login_info=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id) nickname=login_info.get("nickName") if wxid == from_wxid: #手动发送消息 logger.info("Active message sending detected") await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid]) callback_to_user=msg_data["ToUserName"]["string"] input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) else: c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) chatroom_id_white_list = c.get("chatroomIdWhiteList", []) if not chatroom_id_white_list: logger.info('白名单为空或未定义,不处理') return if from_wxid not in chatroom_id_white_list: logger.info(f'群ID {from_wxid} 不在白名单中,不处理') return if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content: callback_to_user=msg_data["FromUserName"]["string"] hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' prompt={"role": "user", "content": [{ "type": "text", "text": msg_content }]} messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) # 收到的对话 input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) cache_data = USER_INTERACTIVE_CACHE.get(wxid) if cache_data and cache_data.get('interactive') : o=get_first_char_if_digit(msg_content) if o is not None: userSelectOptions=cache_data.get('options') if o < len(userSelectOptions): o=o-1 msg_content=userSelectOptions[o].get("value") messages_to_send=[{"role": "user", "content": msg_content}] else: messages_to_send=[{"role": "user", "content": msg_content}] else: messages_to_send=[{"role": "user", "content": msg_content}] res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user) reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"]) description = '' userSelectOptions = [] if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): for item in reply_content: if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": params = item["interactive"]["params"] description = params.get("description") userSelectOptions = params.get("userSelectOptions", []) values_string = "\n".join(option["value"] for option in userSelectOptions) if description is not None: USER_INTERACTIVE_CACHE[wxid] = { "interactive":True, "options": userSelectOptions, } reply_content=description + '------------------------------\n'+values_string elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): USER_INTERACTIVE_CACHE[wxid] = { "interactive":False } text='' for item in reply_content: if item["type"] == "text": text=item["text"]["content"] if text=='': # 去除上次上一轮对话再次请求 cache_messages_str=await request.app.state.redis_service.get_hash_field(hash_key,"data") cache_messages = json.loads(cache_messages_str) if cache_messages_str else [] if len(cache_messages) >= 3: cache_messages = cache_messages[:-3] await request.app.state.redis_service.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False)) messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user) reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"]) else: reply_content=text else: USER_INTERACTIVE_CACHE[wxid] = { "interactive":False } reply_content=res["choices"][0]["message"]["content"] reply_content='@'+extract_nickname(msg_push_content) + reply_content await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content) await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) # 回复的对话 input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) else: logger.info('群聊公开消息') callback_to_user=msg_data["FromUserName"]["string"] group_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(callback_to_user,wxid,group_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) return async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 私聊图片消息 ''' msg_content=msg_data["Content"]["string"] callback_to_user=from_wxid hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content) oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" oss_bucket_name="cow-agent" oss_prefix="cow" img_url=upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix) prompt={ "role": "user", "content": [{ "type": "image_url", "image_url": {"url": img_url} }] } await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务') logger.info(f"上传图片 URL: {img_url}") wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}] input_message=dialogue_message(wxid,callback_to_user,wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): logger.info('群聊图片消息') async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 私聊语音消息 ''' callback_to_user=from_wxid msg_content=msg_data["Content"]["string"] msg_id=msg_data["MsgId"] file_url=await request.app.state.gewe_service.download_audio_msg_async(token_id,app_id,msg_id,msg_content) react_silk_path=await save_to_local_from_url_async(file_url) react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav" audio_convert.any_to_wav(react_silk_path,react_wav_path) react_voice_text=AliVoice().voiceToText(react_wav_path) os.remove(react_silk_path) os.remove(react_wav_path) hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' messages=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "user", "content": react_voice_text}) ai_res=await gpt_client_async(request,messages,wxid,callback_to_user) ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"]) has_url=contains_url(ai_res_content) if not has_url: voice_during,voice_url=wx_voice(ai_res_content) if voice_during < 60 * 1000: ret,ret_msg,res=await request.app.state.gewe_service.post_voice_async(token_id,app_id,callback_to_user,voice_url,voice_during) else: ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content) logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复') if ret==200: logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}')) else: logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}')) ret,ret_msg,res==await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content) logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}')) else: logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}") ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content) await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": ai_res_content}) # 构造对话消息并发送到 Kafka input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}] input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) async def handle_voice_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): logger.info('群聊语音消息') async def handle_name_card_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): logger.info('名片消息') try: msg_content_xml=msg_data["Content"]["string"] # 解析XML字符串 root = ET.fromstring(msg_content_xml) # 提取alias属性 alias_value = root.get("alias") # 加好友资料 scene = int(root.get("scene")) v3 = root.get("username") v4 = root.get("antispamticket") logger.info(f"alias_value: {alias_value}, scene: {scene}, v3: {v3}, v4: {v4}") # 判断appid 是否已经创建3天 k, login_info = await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid) creation_timestamp=int(login_info.get('create_at',time.time())) current_timestamp = time.time() three_days_seconds = 3 * 24 * 60 * 60 # 三天的秒数 diff_flag=(current_timestamp - creation_timestamp) >= three_days_seconds if not diff_flag: log_content=f'名片添加好友功能,{wxid} 用户创建不够三天,不能使用该功能' logger.warning(log_content) return # 将加好友资料添加到待加好友队列 #gewe_chat.wxchat.enqueue_to_add_contacts(wxid,scene,v3,v4) _,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid) nickname=loginfo.get('nickName') add_contact_content=f'您好,我是{nickname}' #gewe_chat.wxchat.add_contacts(token_id,app_id,scene,Models.OperationType.ADD_FRIEND,v3,v4,add_contact_content) await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,OperationType.ADD_FRIEND.value,v3,v4,add_contact_content) except ET.ParseError as e: logger.error(f"XML解析错误: {e}") except KeyError as e: logger.error(f"字典键错误: {e}") except Exception as e: logger.error(f"未知错误: {e}") async def handle_xml_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 处理xml ''' try: msg_content_xml=msg_data["Content"]["string"] root = ET.fromstring(msg_content_xml) type_value = int(root.find(".//appmsg/type").text) handlers = { 57: handle_xml_reference_async, 5: handle_xml_invite_group_async } handler = handlers.get(type_value) if handler: return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid) # elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊 # logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo") else: print(f"xml消息 {type_value} 未解析") except ET.ParseError as e: logger.error(f"解析XML失败: {e}") except Exception as e: logger.error(f"未知错误: {e}") return async def handle_xml_reference_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 引用消息 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57 ''' callback_to_user=from_wxid hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' msg_content= msg_data["PushContent"] prompt={"role": "user", "content": [{ "type": "text", "text": msg_content }]} # 收到的对话 messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) # 回复的对话 res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user) reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"]) input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) await request.app.state.kafka_service.kafka_client.produce_message(input_message) logger.info("发送对话 %s",input_message) await request.app.state.kafka_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) await request.app.state.kafka_service.post_text_async(token_id,app_id,callback_to_user,reply_content) async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 群聊邀请 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.title=邀请你加入群聊(根据手机设置的系统语言title会有调整,不同语言关键字不同) ''' logger.info(f'{wxid} 群聊邀请') msg_content_xml=msg_data["Content"]["string"] root = ET.fromstring(msg_content_xml) title_value = root.find(".//appmsg/title").text if '邀请你加入群聊' in title_value: invite_url = root.find('.//url').text ret,msg,data=await request.app.state.gewe_service.agree_join_room_async(token_id,app_id,invite_url) if ret==200: logger.info(f'群聊邀请,同意加入群聊 {msg} {data}') chatroom_id=data.get('chatroomId','') # if not chatroom_id: # logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}') # return ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3) logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) # 单个群信息推送到kafka group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) k_message=wx_mod_group_info_members_message(wxid,group_info_members) await request.app.state.kafka_service.send_message_async(k_message) else: logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}') async def handle_add_friend_notice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 好友添加请求通知 ''' logger.info('好友添加请求通知') try: msg_content_xml=msg_data["Content"]["string"] root = ET.fromstring(msg_content_xml) msg_content = root.attrib.get('content', None) v3= root.attrib.get('encryptusername', None) v4= root.attrib.get('ticket', None) scene=root.attrib.get('scene', None) to_contact_wxid=root.attrib.get('fromusername', None) wxid=msg_data["ToUserName"]["string"] # 自动同意好友 # print(v3) # print(v4) # print(scene) # print(msg_content) # 操作类型,2添加好友 3同意好友 4拒绝好友 #option=2 option=3 reply_add_contact_contact="亲,我是你的好友" ret,ret_msg=await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact) if ret==200: logger.info('自动添加好友成功') # 好友发送的文字 hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}' prompt={"role": "user", "content": [{"type": "text","text": msg_content}]} messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message) await request.app.state.gewe_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) callback_to_user=to_contact_wxid res=await gpt_client_async(messages_to_send,wxid,callback_to_user) reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"]) #保存好友信息 await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id, wxid,[to_contact_wxid]) # 保存到缓存 await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) # 发送信息 await request.app.state.gewe_service.post_text_async(token_id,app_id, to_contact_wxid,reply_content) # 发送到kafka input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] input_message=dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True) request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) else: logger.warning("添加好友失败") except ET.ParseError as e: logger.error(f"解析XML失败: {e}") except Exception as e: logger.error(f"未知错误: {e}") return async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 群聊邀请 撤回消息 拍一拍消息 地理位置 踢出群聊通知 解散群聊通知 发布群公告 ''' try: msg_content_xml=msg_data["Content"]["string"] # 群聊邀请 if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]): chatroom_id=msg_data["FromUserName"]["string"] ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3) logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) #推送群资料到kafka group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) k_message=wx_mod_group_info_members_message(wxid,group_info_members) await request.app.state.kafka_service.send_message_async(k_message) if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2) logger.info(f'移出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) logger.info(f'清除 chatroom_id{chatroom_id} 数据') # 推送删除群资料到kafka k_message=wx_del_group_message(wxid,chatroom_id) await request.app.state.kafka_service.send_message_async(k_message) if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2) logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) logger.info(f'清除 chatroom_id{chatroom_id} 数据') # 推送删除群资料到kafka k_message=wx_del_group_message(wxid,chatroom_id) await request.app.state.kafka_service.send_message_async(k_message) if 'mmchatroombarannouncememt' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] logger.info(f'发布群公告 chatroom_id {chatroom_id}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) #推送群资料到kafka group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) k_message=wx_mod_group_info_members_message(wxid,group_info_members) await request.app.state.kafka_service.send_message_async(k_message) if 'roomtoolstips' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] logger.info(f'群待办 chatroom_id {chatroom_id} ') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) #推送群资料到kafka group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) k_message=wx_mod_group_info_members_message(wxid,group_info_members) await request.app.state.kafka_service.send_message_async(k_message) except ET.ParseError as e: logger.error(f"解析XML失败: {e}") except Exception as e: logger.error(f"未知错误: {e}") return async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 修改群名称 更换群主通知 被移除群聊通知 ''' content=msg_data.get("Content","").get("string","") if '修改群名' or '新群主' or '被移除群聊通知' in content and check_chatroom(from_wxid): logger.info(f'{content} chatroom_id {chatroom_id} ') chatroom_id=from_wxid await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) k_message=wx_mod_group_info_members_message(wxid,group_info_members) await request.app.state.kafka_service.send_message_async(k_message) return