|
- from voice.ali.ali_voice import AliVoice
- from common.log import logger
- import xml.etree.ElementTree as ET
- import os,json,asyncio,aiohttp
-
- from voice import audio_convert
-
- from fastapi import APIRouter,Request
- from pydantic import BaseModel
- from fastapi import APIRouter, Depends
- from typing import Dict, Any
- from model.models import AgentConfig,OperationType
- from common.utils import *
- from common.memory import *
-
- timeout_duration = 2.0
-
- messages_router = APIRouter()
-
-
-
- WX_BACKLIST=['fmessage', 'medianote','weixin','weixingongzhong','tmessage']
- @messages_router.post("/messages",response_model=None)
- async def get_chatroominfo(request: Request, body: Dict[str, Any]):
- msg = body
- logger.info(f"收到微信回调消息: {json.dumps(msg, separators=(',', ':'),ensure_ascii=False)}")
- type_name =msg.get("TypeName")
- app_id = msg.get("Appid")
- k, loginfo = await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
- if not k:
- logger.warning('找不到登录信息,不处理')
- return {"message": f"收到微信回调消息: {type_name}"}
-
- token_id=loginfo.get('tokenId','')
-
- wxid = msg.get("Wxid",'')
-
- if type_name == 'AddMsg':
- await handle_self_cmd_async(request,wxid,msg)
- msg_data = msg.get("Data")
- from_wxid = msg_data["FromUserName"]["string"]
- config=await request.app.state.redis_service.get_hash(f"__AI_OPS_WX__:WXCHAT_CONFIG")
- wxids=config.keys()
- WX_BACKLIST.extend(wxids)
- if from_wxid in WX_BACKLIST:
- logger.warning(f'微信ID {wxid} 在黑名单,不处理')
- return {"message": "收到微信回调消息"}
-
- await handle_messages_async(request,token_id,msg)
-
- return {"message": "收到微信回调消息"}
-
-
- async def handle_self_cmd_async(request: Request,wxid,msg):
- '''
- 个人微信命令处理
- 如果是个人微信的指令,wxid == from_wxid
- commands = {
- '启用托管': True,
- '关闭托管': False
- }
- '''
- msg_data=msg.get("Data")
- from_wxid=msg_data["FromUserName"]["string"]
- msg_content=msg_data["Content"]["string"]
- if wxid == from_wxid:
- commands = {
- '启用托管': True,
- '关闭托管': False
- }
- if msg_content in commands:
- c_dict = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
- if c_dict:
- agent_config=AgentConfig.model_validate(c_dict)
- agent_config.agentEnabled=commands[msg_content]
-
- await request.app.state.gewe_service.wxchat.save_wxchat_config_async(wxid, agent_config.model_dump())
- logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管')
-
- async def handle_messages_async(request: Request,token_id,msg):
- #msg_data=msg.get("Data")
- type_name =msg.get("TypeName")
-
- # app_id = msg.get("Appid")
- # from_wxid=msg_data["FromUserName"]["string"]
- # msg_content=msg_data["Content"]["string"]
- wxid = msg.get("Wxid",'')
-
- match type_name:
- case 'AddMsg':
- await handle_add_messages_async(request,token_id,msg,wxid)
- case 'ModContacts':
- await handle_mod_contacts_async(request,token_id,msg,wxid)
- case 'DelContacts':
- await handle_del_contacts_async(request,token_id,msg,wxid)
- case 'Offline':
- await handle_offline_async(request,token_id,msg,wxid)
- case _:
- logger.warning(f'未知消息类型:{type_name}')
-
- async def gpt_client_async(request,messages: list, wixd: str, friend_wxid: str):
- c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd)
- api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
- print(f'流程key:{api_key}\n')
-
- #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
- #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
-
- api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {api_key}"
- }
-
- session_id = f'{wixd}-{friend_wxid}'
- data = {
- "model": "",
- "messages": messages,
- "chatId": session_id,
- "detail": True
- }
-
- logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
-
- async with aiohttp.ClientSession() as session:
- try:
- async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) as response:
- response.raise_for_status()
- response_data = await response.json()
- logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'), ensure_ascii=False)))
- return response_data
- except aiohttp.ClientError as e:
- logger.error(f"请求失败: {e}")
- raise
-
- async def handle_add_messages_async(request: Request,token_id,msg,wxid):
-
- wx_config =await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
-
- if not bool(wx_config.get("agentEnabled",False)):
- logger.info(f'微信ID {wxid} 未托管,不处理')
- return
- app_id = msg.get("Appid")
- msg_data = msg.get("Data")
- msg_type = msg_data.get("MsgType",None)
- from_wxid = msg_data["FromUserName"]["string"]
- to_wxid = msg_data["ToUserName"]["string"]
- msg_push_content=msg_data.get("PushContent")
-
- handlers = {
- 1: handle_text_async,
- 3: handle_image_async,
- 34: handle_voice_async,
- 42: handle_name_card_async,
- 49: handle_xml_async,
- 37: handle_add_friend_notice_async,
- 10002: handle_10002_msg
- }
-
- # (扫码进群情况)判断受否是群聊,并添加到通信录
- if check_chatroom(from_wxid) or check_chatroom(to_wxid):
- logger.info('群信息')
- chatroom_id=from_wxid
- ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
- logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
- await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
- await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
- handlers[1]=handle_text_group_async
- handlers[3]=handle_image_group_async
- handlers[34]=handle_voice_group_async
-
- handler = handlers.get(msg_type)
- if handler:
- return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
- else:
- logger.warning(f"微信回调消息类型 {msg_type} 未处理")
-
- async def handle_mod_contacts_async(request: Request,token_id,msg,wxid):
- '''
- 好友通过验证及好友资料变更的通知消息
- '''
- wxid = msg.get("Wxid")
- msg_data = msg.get("Data")
- app_id = msg.get("Appid")
- #
- #handle_mod_contacts(token_id,app_id,wxid,msg_data)
-
- #
- loop = asyncio.get_event_loop()
- future = asyncio.run_coroutine_threadsafe(
- handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data),
- loop
- )
-
- contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"]
- nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"]
- city=msg_data.get("City","")
- signature=msg_data.get("Signature","")
- province=msg_data.get("Province","")
- bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId","") #msg_data["SnsUserInfo"]["SnsBgimgId"]
- country=msg_data.get("Country","")
- sex=msg_data.get("Sex",None)
- pyInitial= msg_data.get("PyInitial",{}).get("string","")#msg_data["PyInitial"]["string"]
- quanPin=msg_data.get("QuanPin",{}).get("string","") #msg_data["QuanPin"]["string"]
- remark=msg_data.get("Remark",{}).get("string","")
- remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string","")
- remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string","")
- smallHeadImgUrl=msg_data.get("smallHeadImgUrl","")
-
- # data=gewe_chat.wxchat.get_brief_info(token_id,app_id,[contact_wxid])
- # contact=data[0]
- # alias=contact.get("alias")
- #推动到kafka
- contact_data = {
- "alias": None,
- "bigHeadImgUrl": bigHeadImgUrl,
- "cardImgUrl": None,
- "city": city,
- "country": country,
- "description": None,
- "labelList": None,
- "nickName": nickname,
- "phoneNumList": None,
- "province": province,
- "pyInitial": pyInitial,
- "quanPin": quanPin,
- "remark": remark,
- "remarkPyInitial": remarkPyInitial,
- "remarkQuanPin": remarkQuanPin,
- "sex": sex,
- "signature": signature,
- "smallHeadImgUrl": smallHeadImgUrl,
- "snsBgImg": None,
- "userName": contact_wxid
- }
- input_message=wx_mod_contact_message(wxid,contact_data)
- await request.app.state.kafka_service.send_message_async(input_message)
-
- async def handle_del_contacts_async(request: Request,token_id,msg,wxid):
- '''
- 删除好友通知/退出群聊
- '''
- msg_data = msg.get("Data")
- username=msg_data["UserName"]["string"]
- if check_chatroom(username):
- logger.info('退出群聊')
- wxid = msg.get("Wxid")
- chatroom_id=username
- await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
- logger.info(f'清除 chatroom_id{chatroom_id} 数据')
- else:
- logger.info('删除好友通知')
- # 推送到kafka
- input_message=wx_del_contact_message(wxid,username)
- await request.app.state.kafka_service.send_message_async(input_message)
-
- async def handle_offline_async(request: Request,token_id,msg,wxid):
- '''
- 已经离线
- '''
- wxid = msg.get("Wxid")
- app_id = msg.get("Appid")
- logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
- k,r=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
- print(k)
- await request.app.state.redis_service.update_hash_field(k,'status',0)
- await request.app.state.redis_service.update_hash_field(k,'modify_at',int(time.time()))
-
- # 推送到kafka
- input_message=wx_offline_message(app_id,wxid)
- await request.app.state.kafka_service.send_message_async(input_message)
-
- async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid,msg_data):
- '''
- 好友通过验证及好友资料变更的通知消息
- '''
- logger.info('好友通过验证及好友资料变更的通知消息')
- if not check_chatroom(msg_data["UserName"]["string"]):
- contact_wxid = msg_data["UserName"]["string"]
-
- # 更新好友信息
- # 检查好友关系,不是好友则删除
- # ret,msg,check_relation=gewe_chat.wxchat.check_relation(token_id, app_id,[contact_wxid])
- # first_item = check_relation[0]
- # check_relation_status=first_item.get('relation')
- # logger.info(f'{wxid} 好友 {contact_wxid} 关系检查:{check_relation_status}')
- # if check_relation_status != 0:
- # gewe_chat.wxchat.delete_contacts_brief_from_cache(wxid, [contact_wxid])
- # logger.info(f'好友关系异常:{check_relation_status},删除好友 {contact_wxid} 信息')
- # else:
- # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, [contact_wxid])
-
-
- ret,msg,contacts_list = await request.app.state.gewe_service.fetch_contacts_list_async(token_id, app_id)
- # friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
- # print(friend_wxids)
-
- #friend_wxids.remove('fmessage')
- #friend_wxids.remove('weixin')
-
- friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong','tmessage']] # 可以调整截取范围
- print(f'{wxid}的好友数量 {len(friend_wxids)}')
- await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
-
- else:
- logger.info('群聊好友通过验证及好友资料变更的通知消息')
-
- async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- '''
- 私聊文本消息
- '''
- msg_content=msg_data["Content"]["string"]
- if wxid == from_wxid: #手动发送消息
- logger.info("Active message sending detected")
-
- await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
- callback_to_user=msg_data["ToUserName"]["string"]
-
- input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
- input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
- await request.app.state.kafaka_service.send_message_async(input_message)
- logger.info("发送对话 %s",input_message)
- else:
- callback_to_user=msg_data["FromUserName"]["string"]
-
- # 创建并启动任务协程,将参数传递给 ai_chat_text 函数
- task = asyncio.create_task(
- ai_chat_text_async(request,token_id, app_id, wxid, msg_data, msg_content)
- )
-
- # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
- timeout_timer = asyncio.create_task(
- check_timeout_async(task, request,token_id, wxid, app_id, callback_to_user)
- )
-
- # 等待任务协程完成
- await task
- # 取消定时器
- timeout_timer.cancel()
-
- async def check_timeout_async(task: asyncio.Task, request: Request,token_id, wxid, app_id, callback_to_user):
- await asyncio.sleep(timeout_duration) # 等待超时时间
- if not task.done():
- print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
- wx_config = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
- if bool(wx_config.get("chatWaitingMsgEnabled", True)):
- await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "亲,我正在组织回复的信息,请稍等一会")
-
- async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, msg_content):
- start_time = time.time() # 记录任务开始时间
- callback_to_user = msg_data["FromUserName"]["string"]
- hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
- prompt = {"role": "user", "content": [{
- "type": "text",
- "text": msg_content
- }]}
- messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
- # 收到的对话
- input_wx_content_dialogue_message = [{"type": "text", "text": msg_content}]
- input_message = dialogue_message(callback_to_user, wxid, input_wx_content_dialogue_message)
- await request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s", input_message)
-
- cache_data = USER_INTERACTIVE_CACHE.get(wxid)
- if cache_data and cache_data.get('interactive'):
- o = get_first_char_if_digit(msg_content)
- if o is not None:
- userSelectOptions = cache_data.get('options')
- if o < len(userSelectOptions):
- o = o - 1
- msg_content = userSelectOptions[o].get("value")
- messages_to_send = [{"role": "user", "content": msg_content}]
- else:
- messages_to_send = [{"role": "user", "content": msg_content}]
- else:
- messages_to_send = [{"role": "user", "content": msg_content}]
- res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
- reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
- description = ''
- userSelectOptions = []
- if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
- for item in reply_content:
- if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
- params = item["interactive"]["params"]
- description = params.get("description")
- userSelectOptions = params.get("userSelectOptions", [])
- values_string = "\n".join(option["value"] for option in userSelectOptions)
-
- if description is not None:
- USER_INTERACTIVE_CACHE[wxid] = {
- "interactive": True,
- "options": userSelectOptions,
- }
- reply_content = description + '------------------------------\n' + values_string
-
- elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
- USER_INTERACTIVE_CACHE[wxid] = {
- "interactive": False
- }
- text = ''
- for item in reply_content:
- if item["type"] == "text":
- text = item["text"]["content"]
- if text == '':
- # 去除上次上一轮对话再次请求
- cache_messages_str = await request.app.state.redis_service.get_hash_field(hash_key, "data")
- cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
-
- if len(cache_messages) >= 3:
- cache_messages = cache_messages[:-3]
- await request.app.state.redis_service.update_hash_field(hash_key, "data", json.dumps(cache_messages, ensure_ascii=False))
- messages_to_send = await request.app.state.redis_service.save_session_messages_to_cache_async(hash_key, prompt)
- res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
- reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
-
- if isinstance(reply_content, list):
- reply_content = remove_markdown_symbol(reply_content[0].get('text').get("content"))
- else:
- reply_content = text
- else:
- USER_INTERACTIVE_CACHE[wxid] = {
- "interactive": False
- }
- reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
-
- await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
- await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
- # 回复的对话
- input_wx_content_dialogue_message = [{"type": "text", "text": reply_content}]
- input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message, True)
- await request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s", input_message)
-
- end_time = time.time() # 记录任务结束时间
- execution_time = end_time - start_time # 计算执行时间
- logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
-
- async def handle_text_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- '''
- 群聊文本消息
- '''
- msg_content=msg_data["Content"]["string"]
- msg_push_content=msg_data.get("PushContent")
- k,login_info=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
- nickname=login_info.get("nickName")
-
- if wxid == from_wxid: #手动发送消息
- logger.info("Active message sending detected")
-
- await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
- callback_to_user=msg_data["ToUserName"]["string"]
-
- input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
- input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
- await request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s",input_message)
- else:
- c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
- chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
-
- if not chatroom_id_white_list:
- logger.info('白名单为空或未定义,不处理')
- return
-
- if from_wxid not in chatroom_id_white_list:
- logger.info(f'群ID {from_wxid} 不在白名单中,不处理')
- return
-
- if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
-
- callback_to_user=msg_data["FromUserName"]["string"]
- hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
-
- prompt={"role": "user", "content": [{
- "type": "text",
- "text": msg_content
- }]}
- messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
- # 收到的对话
- input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
- input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
- await request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s",input_message)
-
- cache_data = USER_INTERACTIVE_CACHE.get(wxid)
- if cache_data and cache_data.get('interactive') :
- o=get_first_char_if_digit(msg_content)
- if o is not None:
- userSelectOptions=cache_data.get('options')
- if o < len(userSelectOptions):
- o=o-1
- msg_content=userSelectOptions[o].get("value")
- messages_to_send=[{"role": "user", "content": msg_content}]
- else:
- messages_to_send=[{"role": "user", "content": msg_content}]
- else:
- messages_to_send=[{"role": "user", "content": msg_content}]
-
- res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
- reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
-
- description = ''
- userSelectOptions = []
-
- if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
- for item in reply_content:
- if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
- params = item["interactive"]["params"]
- description = params.get("description")
- userSelectOptions = params.get("userSelectOptions", [])
- values_string = "\n".join(option["value"] for option in userSelectOptions)
-
- if description is not None:
- USER_INTERACTIVE_CACHE[wxid] = {
- "interactive":True,
- "options": userSelectOptions,
- }
- reply_content=description + '------------------------------\n'+values_string
-
- elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
- USER_INTERACTIVE_CACHE[wxid] = {
- "interactive":False
- }
- text=''
- for item in reply_content:
- if item["type"] == "text":
- text=item["text"]["content"]
- if text=='':
- # 去除上次上一轮对话再次请求
- cache_messages_str=await request.app.state.redis_service.get_hash_field(hash_key,"data")
- cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
-
- if len(cache_messages) >= 3:
- cache_messages = cache_messages[:-3]
-
- await request.app.state.redis_service.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
- messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
- res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
- reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
- else:
- reply_content=text
- else:
- USER_INTERACTIVE_CACHE[wxid] = {
- "interactive":False
- }
- reply_content=res["choices"][0]["message"]["content"]
-
-
- reply_content='@'+extract_nickname(msg_push_content) + reply_content
-
- await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content)
- await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
- # 回复的对话
- input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
- input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
- await request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s",input_message)
- else:
- logger.info('群聊公开消息')
- callback_to_user=msg_data["FromUserName"]["string"]
- group_dialogue_message=[{"type": "text", "text": msg_content}]
- input_message=dialogue_message(callback_to_user,wxid,group_dialogue_message)
- await request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s",input_message)
- return
-
- async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- '''
- 私聊图片消息
- '''
- msg_content=msg_data["Content"]["string"]
-
- callback_to_user=from_wxid
- hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
- wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content)
-
- oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
- oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
- oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
- oss_bucket_name="cow-agent"
- oss_prefix="cow"
-
- img_url=upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
-
- prompt={
- "role": "user",
- "content": [{
- "type": "image_url",
- "image_url": {"url": img_url}
- }]
- }
-
- await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
- await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
- logger.info(f"上传图片 URL: {img_url}")
-
- wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
- input_message=dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
- await request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s",input_message)
-
- async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- logger.info('群聊图片消息')
-
- async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- '''
- 私聊语音消息
- '''
- callback_to_user=from_wxid
- msg_content=msg_data["Content"]["string"]
- msg_id=msg_data["MsgId"]
-
-
- file_url=await request.app.state.gewe_service.download_audio_msg_async(token_id,app_id,msg_id,msg_content)
- react_silk_path=await save_to_local_from_url_async(file_url)
- react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
- audio_convert.any_to_wav(react_silk_path,react_wav_path)
- react_voice_text=AliVoice().voiceToText(react_wav_path)
-
- os.remove(react_silk_path)
- os.remove(react_wav_path)
-
- hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
- messages=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "user", "content": react_voice_text})
- ai_res=await gpt_client_async(request,messages,wxid,callback_to_user)
- ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"])
- has_url=contains_url(ai_res_content)
- if not has_url:
- voice_during,voice_url=wx_voice(ai_res_content)
-
- if voice_during < 60 * 1000:
- ret,ret_msg,res=await request.app.state.gewe_service.post_voice_async(token_id,app_id,callback_to_user,voice_url,voice_during)
-
- else:
- ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
- logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
- if ret==200:
- logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
-
- else:
- logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
- ret,ret_msg,res==await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
- logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
-
- else:
- logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
- ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
-
-
-
-
- await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": ai_res_content})
- # 构造对话消息并发送到 Kafka
- input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
- input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
- await request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s", input_message)
-
- async def handle_voice_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- logger.info('群聊语音消息')
-
- async def handle_name_card_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- logger.info('名片消息')
-
- try:
- msg_content_xml=msg_data["Content"]["string"]
- # 解析XML字符串
- root = ET.fromstring(msg_content_xml)
-
- # 提取alias属性
- alias_value = root.get("alias")
- # 加好友资料
- scene = int(root.get("scene"))
- v3 = root.get("username")
- v4 = root.get("antispamticket")
-
- logger.info(f"alias_value: {alias_value}, scene: {scene}, v3: {v3}, v4: {v4}")
- # 判断appid 是否已经创建3天
- k, login_info = await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
- creation_timestamp=int(login_info.get('create_at',time.time()))
- current_timestamp = time.time()
- three_days_seconds = 3 * 24 * 60 * 60 # 三天的秒数
- diff_flag=(current_timestamp - creation_timestamp) >= three_days_seconds
- if not diff_flag:
- log_content=f'名片添加好友功能,{wxid} 用户创建不够三天,不能使用该功能'
- logger.warning(log_content)
- return
-
- # 将加好友资料添加到待加好友队列
- #gewe_chat.wxchat.enqueue_to_add_contacts(wxid,scene,v3,v4)
- _,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
-
- nickname=loginfo.get('nickName')
- add_contact_content=f'您好,我是{nickname}'
- #gewe_chat.wxchat.add_contacts(token_id,app_id,scene,Models.OperationType.ADD_FRIEND,v3,v4,add_contact_content)
- await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,OperationType.ADD_FRIEND.value,v3,v4,add_contact_content)
-
- except ET.ParseError as e:
- logger.error(f"XML解析错误: {e}")
- except KeyError as e:
- logger.error(f"字典键错误: {e}")
- except Exception as e:
- logger.error(f"未知错误: {e}")
-
- async def handle_xml_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- '''
- 处理xml
- '''
- try:
- msg_content_xml=msg_data["Content"]["string"]
- root = ET.fromstring(msg_content_xml)
- type_value = int(root.find(".//appmsg/type").text)
- handlers = {
- 57: handle_xml_reference_async,
- 5: handle_xml_invite_group_async
- }
- handler = handlers.get(type_value)
- if handler:
- return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
- # elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
- # logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
- else:
- print(f"xml消息 {type_value} 未解析")
- except ET.ParseError as e:
- logger.error(f"解析XML失败: {e}")
- except Exception as e:
- logger.error(f"未知错误: {e}")
- return
-
- async def handle_xml_reference_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- '''
- 引用消息
- 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
- '''
- callback_to_user=from_wxid
- hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
-
- msg_content= msg_data["PushContent"]
-
- prompt={"role": "user", "content": [{
- "type": "text",
- "text": msg_content
- }]}
-
- # 收到的对话
- messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
- input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
- input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
- await request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s",input_message)
-
- # 回复的对话
- res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
- reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
- input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
- input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
- await request.app.state.kafka_service.kafka_client.produce_message(input_message)
- logger.info("发送对话 %s",input_message)
- await request.app.state.kafka_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
- await request.app.state.kafka_service.post_text_async(token_id,app_id,callback_to_user,reply_content)
-
- async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- '''
- 群聊邀请
- 判断此类消息的逻辑:$.Data.MsgType=49
- 并且 解析$.Data.Content.string中的xml msg.appmsg.title=邀请你加入群聊(根据手机设置的系统语言title会有调整,不同语言关键字不同)
- '''
- logger.info(f'{wxid} 群聊邀请')
- msg_content_xml=msg_data["Content"]["string"]
- root = ET.fromstring(msg_content_xml)
- title_value = root.find(".//appmsg/title").text
-
- if '邀请你加入群聊' in title_value:
- invite_url = root.find('.//url').text
- ret,msg,data=await request.app.state.gewe_service.agree_join_room_async(token_id,app_id,invite_url)
- if ret==200:
- logger.info(f'群聊邀请,同意加入群聊 {msg} {data}')
- chatroom_id=data.get('chatroomId','')
- # if not chatroom_id:
- # logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
- # return
- ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
- logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
- await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
- await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
- else:
- logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
-
- async def handle_add_friend_notice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
- '''
- 好友添加请求通知
- '''
- logger.info('好友添加请求通知')
- try:
- msg_content_xml=msg_data["Content"]["string"]
-
- root = ET.fromstring(msg_content_xml)
-
- msg_content = root.attrib.get('content', None)
- v3= root.attrib.get('encryptusername', None)
- v4= root.attrib.get('ticket', None)
- scene=root.attrib.get('scene', None)
-
-
- to_contact_wxid=root.attrib.get('fromusername', None)
- wxid=msg_data["ToUserName"]["string"]
-
- # 自动同意好友
- # print(v3)
- # print(v4)
- # print(scene)
- # print(msg_content)
- # 操作类型,2添加好友 3同意好友 4拒绝好友
- #option=2
- option=3
- reply_add_contact_contact="亲,我是你的好友"
- ret,ret_msg=await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
- if ret==200:
- logger.info('自动添加好友成功')
-
- # 好友发送的文字
- hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
- prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
- messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
-
- input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
- input_message=dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
- await request.app.state.gewe_service.send_message_async(input_message)
- logger.info("发送对话 %s",input_message)
-
- callback_to_user=to_contact_wxid
- res=await gpt_client_async(messages_to_send,wxid,callback_to_user)
- reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
-
- #保存好友信息
- await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id, wxid,[to_contact_wxid])
-
- # 保存到缓存
- await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
-
- # 发送信息
- await request.app.state.gewe_service.post_text_async(token_id,app_id, to_contact_wxid,reply_content)
-
- # 发送到kafka
- input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
- input_message=dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
- request.app.state.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s",input_message)
-
- else:
- logger.warning("添加好友失败")
- except ET.ParseError as e:
- logger.error(f"解析XML失败: {e}")
- except Exception as e:
- logger.error(f"未知错误: {e}")
- return
-
- async def handle_10002_msg(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
-
- '''
- 群聊邀请
- 撤回消息
- 拍一拍消息
- 地理位置
- 踢出群聊通知
- 解散群聊通知
- 发布群公告
- '''
-
- try:
- msg_content_xml=msg_data["Content"]["string"]
- # 群聊邀请
- if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
- chatroom_id=msg_data["FromUserName"]["string"]
- ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
- logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
- await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
- await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
-
- if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
- chatroom_id=msg_data["FromUserName"]["string"]
- ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
- logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
- await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
- logger.info(f'清除 chatroom_id{chatroom_id} 数据')
-
- if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
- chatroom_id=msg_data["FromUserName"]["string"]
- ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
- logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
- await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
- logger.info(f'清除 chatroom_id{chatroom_id} 数据')
-
- print('撤回消息,拍一拍消息,地理位置')
- except ET.ParseError as e:
- logger.error(f"解析XML失败: {e}")
- except Exception as e:
- logger.error(f"未知错误: {e}")
- return
-
-
-
-
-
|