From 1e041b6ff53d2286e9991ffaa4f4e48323c9e5cd Mon Sep 17 00:00:00 2001 From: H Vs Date: Mon, 6 Jan 2025 18:14:33 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 43 ++- common/redis_helper.py | 2 + common/utils.py | 25 +- config-dev.json | 38 +-- config-test.json | 35 +- config.json | 5 - docker/Dockerfile.latest | 42 +++ docker/build.latest.sh | 8 + docker/entrypoint.sh | 36 ++ resources/messages_resource-bk.py | 319 +++++++++++++++++ resources/messages_resource.py | 546 ++++++++++++++++++------------ resources/messages_resource.txt | 243 +++++++++++++ wechat/gewe_chat.py | 81 +++++ 13 files changed, 1122 insertions(+), 301 deletions(-) create mode 100644 docker/Dockerfile.latest create mode 100644 docker/build.latest.sh create mode 100644 docker/entrypoint.sh create mode 100644 resources/messages_resource-bk.py create mode 100644 resources/messages_resource.txt diff --git a/app.py b/app.py index 5533b14..31a704f 100644 --- a/app.py +++ b/app.py @@ -5,7 +5,7 @@ from resources.messages_resource import MessagesResource from common.log import logger, log_exception from common.interceptors import before_request, after_request, handle_exception import threading -from common import kafka_helper, redis_helper +from common import kafka_helper, redis_helper,utils import logging from config import load_config @@ -40,6 +40,23 @@ def save_friends_to_redis(wxid, friends): hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(friends,ensure_ascii=False)}) +# def save_contacts_brief_to_redis(wxid, friends): +# # 将联系人信息保存到 Redis,使用一个合适的 key +# hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" + +# # 获取缓存中的数据,如果缓存不存在则初始化为空列表 +# cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") +# cache = json.loads(cache_str) if cache_str else [] + +# # 合并联系人信息 +# cache.extend(friends) + +# # 将合并后的联系人数据保存回 Redis +# redis_helper.redis_helper.update_hash_field(hash_key, "data", { +# "data": json.dumps(cache, ensure_ascii=False) +# }) + + def worker(): kafka_helper.start() redis_helper.start() @@ -51,7 +68,7 @@ def start_wxchat_thread_free(): gewe_chat.start() wxchat=gewe_chat.wxchat token_id = wxchat.get_token_id() - # token_id='9ba29f73-e46a-40b5-873d-795490f732e3' + # token_id='f828cb3c-1039-489f-b9ae-7494d1778a15' print(f'tokenId: {token_id}') qr_code = wxchat.get_login_qr_code(token_id) @@ -84,7 +101,7 @@ def start_wxchat_thread(): gewe_chat.start() wxchat=gewe_chat.wxchat # token_id = wxchat.get_token_id() - token_id='9ba29f73-e46a-40b5-873d-795490f732e3' + token_id='f828cb3c-1039-489f-b9ae-7494d1778a15' tel='18029274615' region_id='440000' print(f'tokenId: {token_id}') @@ -123,11 +140,12 @@ def start_wxchat_thread(): print(app_id) contacts_list=wxchat.fetch_contacts_list(token_id, app_id) - friend_wxids=contacts_list['friends'] - # print(friend_wxids) + friend_wxids=contacts_list['friends'][3:] + print(friend_wxids) wxid=redis_helper.redis_helper.get_hash_field(hash_key,'wxid') - friends_brief=wxchat.get_brief_info(token_id,app_id,friend_wxids) - save_friends_to_redis(wxid,friends_brief) + # friends_brief=wxchat.get_brief_info(token_id,app_id,friend_wxids) + # utils.save_contacts_brief_to_redis(wxid,friends_brief) + wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,friend_wxids) print(f'{wxid} 登录 {app_id} 成功') else: app_id=login_info.get('appId') @@ -175,10 +193,11 @@ def start_wxchat_thread(): contacts_list=wxchat.fetch_contacts_list(token_id, app_id) friend_wxids=contacts_list['friends'][3:] + # friend_wxids=['wxid_95rrm8l6tznb21'] wxid=redis_helper.redis_helper.get_hash_field(hash_key,'wxid') - friends_brief=wxchat.get_brief_info(token_id,app_id,friend_wxids) - save_friends_to_redis(wxid,friends_brief) + wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,friend_wxids) print(f'{wxid} 登录 {app_id} 成功') + #wxchat.post_text(token_id,app_id,'wxid_eigw91zpsl1p22','你好') def app_run(): @@ -228,7 +247,11 @@ def app_run(): # channel = channel_factory.create_channel('wx') # channel.startup() - flask_app.run(debug=False,port=80) + environment = os.environ.get('environment', 'default') # 默认是生产环境 + port=5000 + if environment== 'default': + port=80 + flask_app.run(debug=False,host='0.0.0.0',port=port) if __name__ == '__main__': app_run() diff --git a/common/redis_helper.py b/common/redis_helper.py index a9080aa..ce25345 100644 --- a/common/redis_helper.py +++ b/common/redis_helper.py @@ -39,6 +39,8 @@ class RedisHelper: """更新哈希表中的某个字段""" self.client.hset(hash_key, field, value) + + def start(): global redis_helper diff --git a/common/utils.py b/common/utils.py index 246bb51..1d2ccec 100644 --- a/common/utils.py +++ b/common/utils.py @@ -8,6 +8,8 @@ from common.log import logger import oss2,time,json from urllib.parse import urlparse, unquote +from common import redis_helper + from datetime import datetime def fsize(file): @@ -205,7 +207,7 @@ def generate_guid_no_dashes(): """ return str(uuid.uuid4()).replace('-', '') -def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list): +def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list,is_ai:bool=False): """ 构造消息的 JSON 数据 :param contents: list,包含多个消息内容,每个内容为字典,如: @@ -229,6 +231,7 @@ def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list): "time": current_time, "data": { "msg_type": "dialogue", + "is_ai":is_ai, "content": { "wxid_from": wxid_from, "wxid_to": wxid_to, @@ -237,4 +240,22 @@ def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list): } } - return json.dumps(data, separators=(',', ':'), ensure_ascii=False) \ No newline at end of file + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + + + +def save_contacts_brief_to_redis(wxid, friends): + # 将联系人信息保存到 Redis,使用一个合适的 key + hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" + + # 获取缓存中的数据,如果缓存不存在则初始化为空列表 + cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") + cache = json.loads(cache_str) if cache_str else [] + + # 合并联系人信息 + cache.extend(friends) + + # 将合并后的联系人数据保存回 Redis + redis_helper.redis_helper.update_hash_field(hash_key, "data", { + "data": json.dumps(cache, ensure_ascii=False) + }) \ No newline at end of file diff --git a/config-dev.json b/config-dev.json index 3dd95c8..d5a45ad 100644 --- a/config-dev.json +++ b/config-dev.json @@ -1,43 +1,9 @@ { - "channel_type": "wx", - "model": "", - "open_ai_api_key": "sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI", - "open_ai_api_base": "http://106.15.182.218:3000/api/v1", - "claude_api_key": "YOUR API KEY", - "proxy": "", - "hot_reload": true, "debug": false, - "single_chat_reply_prefix": "", - "group_chat_prefix": [ - "zhushou" - ], - "group_name_white_list": [ - ], - "image_create_prefix": [ - - ], - "group_welcome_msg": "", - "trigger_by_self": true, - "voice_to_text":"ali", - "text_to_voice":"ali", - "speech_recognition": true, - "group_speech_recognition": false, - "voice_reply_voice": true, - "conversation_max_tokens": 2500, - "expires_in_seconds": 300, - "character_desc": "", - "temperature": 0.9, - "subscribe_msg": "", - "use_linkai": false, - "linkai_api_key": "", - "linkai_app_code": "", - - "redis_host":"47.116.142.20", + "redis_host":"192.168.2.121", "redis_port":8090, "redis_password":"telpo#1234", "redis_db":3, - - "kafka_bootstrap_servers":"172.19.42.53:9092", - + "kafka_bootstrap_servers":"192.168.2.121:9092", "aiops_api":"https://id.ssjlai.com/aiopsadmin" } diff --git a/config-test.json b/config-test.json index ed145d2..153f956 100644 --- a/config-test.json +++ b/config-test.json @@ -1,42 +1,9 @@ { - "channel_type": "wx", - "model": "", - "open_ai_api_key": "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH", - "open_ai_api_base": "http://106.15.182.218:3000/api/v1", - "claude_api_key": "YOUR API KEY", - "proxy": "", - "hot_reload": true, "debug": false, - "single_chat_reply_prefix": "", - "group_chat_prefix": [ - "zhushou" - ], - "group_name_white_list": [ - ], - "image_create_prefix": [ - - ], - "group_welcome_msg": "", - "trigger_by_self": true, - "voice_to_text":"ali", - "speech_recognition": true, - "group_speech_recognition": true, - "voice_reply_voice": false, - "conversation_max_tokens": 2500, - "expires_in_seconds": 300, - "character_desc": "", - "temperature": 0.9, - "subscribe_msg": "", - "use_linkai": false, - "linkai_api_key": "", - "linkai_app_code": "", - "redis_host":"47.116.142.20", "redis_port":8090, "redis_password":"telpo#1234", "redis_db":3, - "kafka_bootstrap_servers":"172.19.42.53:9092", - - "aiops_api":"https://id.ssjlai.com/aiopsadmin" + "aiops_api":"https://id.ssjlai.com/aiopsadmin" } diff --git a/config.json b/config.json index ae8c567..d5a45ad 100644 --- a/config.json +++ b/config.json @@ -1,14 +1,9 @@ { - - "debug": false, "redis_host":"192.168.2.121", "redis_port":8090, "redis_password":"telpo#1234", "redis_db":3, - "kafka_bootstrap_servers":"192.168.2.121:9092", - "aiops_api":"https://id.ssjlai.com/aiopsadmin" - } diff --git a/docker/Dockerfile.latest b/docker/Dockerfile.latest new file mode 100644 index 0000000..8c0775e --- /dev/null +++ b/docker/Dockerfile.latest @@ -0,0 +1,42 @@ +FROM python:3.10-slim-bullseye + +LABEL maintainer="foo@bar.com" +ARG TZ='Asia/Shanghai' + +ARG CHATGPT_ON_WECHAT_VER + +# RUN echo /etc/apt/sources.list +RUN sed -i 's/deb.debian.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apt/sources.list + +# Set the timezone and configure tzdata +RUN apt-get update \ + && apt-get install -y --no-install-recommends tzdata \ + && ln -sf /usr/share/zoneinfo/$TZ /etc/localtime \ + && dpkg-reconfigure --frontend noninteractive tzdata \ + && apt-get clean + + +ENV BUILD_PREFIX=/app + +ADD . ${BUILD_PREFIX} + +RUN apt-get update \ + &&apt-get install -y --no-install-recommends bash ffmpeg espeak libavcodec-extra\ + && cd ${BUILD_PREFIX} \ + && cp config-template.json config.json \ + && /usr/local/bin/python -m pip install --no-cache --upgrade pip \ + && pip install --no-cache -r requirements.txt + +WORKDIR ${BUILD_PREFIX} + +ADD docker/entrypoint.sh /entrypoint.sh + +RUN chmod +x /entrypoint.sh \ + && mkdir -p /home/noroot \ + && groupadd -r noroot \ + && useradd -r -g noroot -s /bin/bash -d /home/noroot noroot \ + && chown -R noroot:noroot /home/noroot ${BUILD_PREFIX} /usr/local/lib + +USER noroot + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/docker/build.latest.sh b/docker/build.latest.sh new file mode 100644 index 0000000..5162216 --- /dev/null +++ b/docker/build.latest.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +unset KUBECONFIG + +cd .. && docker build -f docker/Dockerfile.latest \ + -t 139.224.254.18:5000/ssjl/ai-ops-wechat . + +docker tag 139.224.254.18:5000/ssjl/ai-ops-wechat 139.224.254.18:5000/ssjl/ai-ops-wechat:$(date +%y%m%d) \ No newline at end of file diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh new file mode 100644 index 0000000..6c9338f --- /dev/null +++ b/docker/entrypoint.sh @@ -0,0 +1,36 @@ +#!/bin/bash +set -e + +# build prefix +AI_OPS_WECHAT_PREFIX=${AI_OPS_WECHAT_PREFIX:-""} +# path to config.json +AI_OPS_WECHAT_CONFIG_PATH=${AI_OPS_WECHAT_CONFIG_PATH:-""} +# execution command line +AI_OPS_WECHAT_EXEC=${AI_OPS_WECHAT_EXEC:-""} + +# Determine the environment and set the config file accordingly +if [ "$environment" == "test" ]; then + AI_OPS_WECHAT_CONFIG_PATH=${AI_OPS_WECHAT_CONFIG_PATH:-$AI_OPS_WECHAT_PREFIX/config-test.json} +elif [ "$environment" == "production" ]; then + AI_OPS_WECHAT_CONFIG_PATH=${AI_OPS_WECHAT_CONFIG_PATH:-$AI_OPS_WECHAT_PREFIX/config-production.json} +elif [ "$environment" == "dev" ]; then + AI_OPS_WECHAT_CONFIG_PATH=${AI_OPS_WECHAT_CONFIG_PATH:-$AI_OPS_WECHAT_PREFIX/config-dev.json} +else + echo "Invalid environment specified. Please set environment to 'test' or 'prod' or 'dev'." + exit 1 +fi + +# AI_OPS_WECHAT_PREFIX is empty, use /app +if [ "$AI_OPS_WECHAT_PREFIX" == "" ]; then + AI_OPS_WECHAT_PREFIX=/app +fi + +# AI_OPS_WECHAT_EXEC is empty, use ‘python app.py’ +if [ "$AI_OPS_WECHAT_EXEC" == "" ]; then + AI_OPS_WECHAT_EXEC="python app.py" +fi + +# go to prefix dir +cd $AI_OPS_WECHAT_PREFIX +# execute +$AI_OPS_WECHAT_EXEC diff --git a/resources/messages_resource-bk.py b/resources/messages_resource-bk.py new file mode 100644 index 0000000..49eec5a --- /dev/null +++ b/resources/messages_resource-bk.py @@ -0,0 +1,319 @@ +from flask_restful import Resource, reqparse +from flask import jsonify,request +from bridge.context import ContextType +import requests,json +from wechat import gewe_chat +from voice.ali.ali_voice import AliVoice +from common import utils,redis_helper,memory,kafka_helper +from common.log import logger +import openai +import xml.etree.ElementTree as ET + + +import os + +from voice import audio_convert + +class MessagesBKResource(Resource): + def __init__(self): + self.parser = reqparse.RequestParser() + + def post(self): + msg = request.get_json() + logger.debug(f"Received message: {msg}") + + if 'Data' in msg: + msg_data=msg.get("Data") + msg_type=msg_data.get("MsgType") + if msg_type == 1:#ContextType.TEXT: # 文字 + msg_content=msg_data["Content"]["string"] + #print(msg_content) + logger.info(msg_content) + + app_id=msg["Appid"] + wxid=msg["Wxid"] + from_wxid=msg_data["FromUserName"]["string"] + to_wxid=msg_data["ToUserName"]["string"] + + token_id="f828cb3c-1039-489f-b9ae-7494d1778a15" + hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + + if wxid == from_wxid: #主动发送消息 + logger.info("Active message sending detected") + gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid]) + callback_to_user=msg_data["ToUserName"]["string"] + + input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] + input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + else: + callback_to_user=msg_data["FromUserName"]["string"] + + prompt={"role": "user", "content": [{ + "type": "text", + "text": msg_content + }]} + messages_to_send=get_messages_from_cache(hash_key, prompt) + # 收到的对话 + input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] + input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid) + if cache_data and cache_data.get('interactive'): + messages_to_send=[{"role": "user", "content": msg_content}] + + res=fast_gpt_api(messages_to_send,wxid) + reply_content=res["choices"][0]["message"]["content"] + + description = '' + userSelectOptions = [] + + if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): + for item in reply_content: + if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": + params = item["interactive"]["params"] + description = params.get("description") + userSelectOptions = params.get("userSelectOptions", []) + values_string = "\n".join(option["value"] for option in userSelectOptions) + + if description is not None: + memory.USER_INTERACTIVE_CACHE[wxid] = { + "interactive":True + } + reply_content=description + '------------------------------\n'+values_string + + elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): + memory.USER_INTERACTIVE_CACHE[wxid] = { + "interactive":False + } + text='' + for item in reply_content: + if item["type"] == "text": + text=item["text"]["content"] + if text=='': + # 去除上次上一轮对话再次请求 + cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") + cache_messages = json.loads(cache_messages_str) if cache_messages_str else [] + + if len(cache_messages) >= 3: + cache_messages = cache_messages[:-3] + + redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False)) + messages_to_send=get_messages_from_cache(hash_key, prompt) + res=fast_gpt_api(messages_to_send,wxid) + reply_content=res["choices"][0]["message"]["content"] + else: + reply_content=text + else: + memory.USER_INTERACTIVE_CACHE[wxid] = { + "interactive":False + } + reply_content=res["choices"][0]["message"]["content"] + + print(f'token_id {token_id}') + print(f'app_id {app_id}') + print(f'touser: {callback_to_user}') + # print(f'towxuser:{towxuser}') + print(reply_content) + gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content) + get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) + # 回复的对话 + input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] + input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + if msg_type == 3: #图片 + + token_id="f828cb3c-1039-489f-b9ae-7494d1778a15" + app_id=msg["Appid"] + callback_to_user=msg_data["FromUserName"]["string"] + msg_content=msg_data["Content"]["string"] + + print(f'token_id {token_id}') + print(f'app_id {app_id}') + print(f'touser: {callback_to_user}') + # print(res_content) + wxid=msg["Wxid"] + hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content) + + oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" + oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" + oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" + oss_bucket_name="cow-agent" + oss_prefix="cow" + + img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix) + + prompt={ + "role": "user", + "content": [{ + "type": "image_url", + "image_url": {"url": img_url} + }] + } + + get_messages_from_cache(hash_key, prompt) + gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务') + logger.debug(f"Uploaded image URL: {img_url}") + + wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}] + input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + if msg_type == 34: # 语音 + token_id="f828cb3c-1039-489f-b9ae-7494d1778a15" + callback_to_user=msg_data["FromUserName"]["string"] + app_id=msg["Appid"] + msg_content=msg_data["Content"]["string"] + msg_id=msg_data["MsgId"] + + wxid=msg["Wxid"] + hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + + print(f'token_id {token_id}') + print(f'app_id {app_id}') + print(f'touser: {callback_to_user}') + print(f'msg_id:{msg_id}') + + + file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content) + react_silk_path=utils.save_to_local_from_url(file_url) + react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav" + audio_convert.any_to_wav(react_silk_path,react_wav_path) + react_voice_text=AliVoice().voiceToText(react_wav_path) + + messages=get_messages_from_cache(hash_key, {"role": "user", "content": react_voice_text}) + ai_res=fast_gpt_api(messages,wxid) + ai_res_content=ai_res["choices"][0]["message"]["content"] + reply_text_voice=AliVoice().textToVoice(ai_res_content) + reply_text_voice_path=os.path.join(os.getcwd(), reply_text_voice) + reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk" + reply_silk_during=audio_convert.any_to_sil(reply_text_voice_path,reply_silk_path) + + # print(int(reply_silk_during)) + # print(reply_silk_path) + oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" + oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" + oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" + oss_bucket_name="cow-agent" + oss_prefix="cow" + + file_path=reply_silk_path + file_url = utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix) + print(file_url) + res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,file_url,int(reply_silk_during)) + # 删除临时文件 + os.remove(react_silk_path) + os.remove(react_wav_path) + os.remove(reply_text_voice_path) + os.remove(reply_silk_path) + + get_messages_from_cache(hash_key, {"role": "assistant", "content": ai_res}) + + if msg_type == 49: + msg_content_xml=msg_data["Content"]["string"] + root = ET.fromstring(msg_content_xml) + type_value = root.find(".//appmsg/type").text + if type_value==57: # 引用消息 + ''' + # 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57 + ''' + wxid=msg["Wxid"] + hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + + app_id=msg["Appid"] + callback_to_user=msg_data["FromUserName"]["string"] + # towxuser=touser.get("") + + # token_id="ce50e2c376c843a9a281af3a1a0f4420" + token_id="f828cb3c-1039-489f-b9ae-7494d1778a15" + + prompt={"role": "user", "content": [{ + "type": "text", + "text": msg_content + }]} + + # 收到的对话 + messages_to_send=get_messages_from_cache(hash_key, prompt) + input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] + input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + # 回复的对话 + res=fast_gpt_api(messages_to_send,wxid) + reply_content=res["choices"][0]["message"]["content"] + input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] + input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) + gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content) + + return jsonify({"message": "微信回调成功"}) + + +def get_messages_from_cache(hash_key,object:object)->object: + messages=redis_helper.redis_helper.get_hash(hash_key) + wxid=hash_key.split(':')[-1] + if not messages: + messages=[{"role": "system", "content": ""}] + messages.append(object) + redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600) + else: + + messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") + messages = json.loads(messages_str) if messages_str else [] + #判断是否含有图片 + last_message = messages[-1] + # print(last_message) + # print('~~~~~~~~~~') + # 获取 content 并判断其是否为列表 + content = last_message.get("content", []) + if isinstance(content, list) and content: + last_content_type = content[-1].get("type") + if last_content_type == 'image_url': + content.append(object['content'][0]) + messages[-1]['content']=content + else: + messages.append(object) + else: + messages.append(object) + + + # messages.append({"role": "user", "content": msg_content}) + #messages.append(object) + # print(messages) + redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600) + return messages + + +def fast_gpt_api(messages:list,session_id:str): + #api_key="sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w" + api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" + api_url = "http://106.15.182.218:3000/api/v1/chat/completions" + headers = { + + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + data={ + "model": "", + "messages":messages, + "chatId": session_id, + "detail": True + } + print(json.dumps(data,ensure_ascii=False)) + logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False))) + response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) + response.raise_for_status() + response_data = response.json() + logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False))) + print(response_data) + return response_data diff --git a/resources/messages_resource.py b/resources/messages_resource.py index 7f619f0..f7a012b 100644 --- a/resources/messages_resource.py +++ b/resources/messages_resource.py @@ -7,6 +7,7 @@ from voice.ali.ali_voice import AliVoice from common import utils,redis_helper,memory,kafka_helper from common.log import logger import openai +import xml.etree.ElementTree as ET import os @@ -19,195 +20,344 @@ class MessagesResource(Resource): def post(self): msg = request.get_json() - print(msg) + logger.info(f"Received message: {msg}") + + # if msg and 'Data' not in msg: + # logger.warning(f"未知消息") + # return jsonify({"message": "未知消息"}) + + + + type_name =msg.get("TypeName") + app_id = msg.get("Appid") + token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15" + + if type_name=='AddMsg': + wxid = msg.get("Wxid") + msg_data = msg.get("Data") + msg_type = msg_data.get("MsgType",None) + from_wxid = msg_data["FromUserName"]["string"] + to_wxid = msg_data["ToUserName"]["string"] + message_hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + handlers = { + 1: handle_text, + 3: handle_image, + 34: handle_voice, + 49: handle_xml, + 37: handle_add_friend_info + } + + handler = handlers.get(msg_type) + if handler: + return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, message_hash_key) + else: + logger.warning(f"消息类型{msg_type}未处理") + elif type_name=='ModContacts': + ''' + 好友通过验证及好友资料变更的通知消息 + ''' + msg_data = msg.get("Data") + handle_mod_contacts(token_id,app_id,msg_data) + + elif type_name=="Offline": + ''' + 掉线通知 + ''' + elif type_name=="DelContacts": + ''' + 删除好友通知 + 退出群聊 + ''' + else: + logger.warning(f"不知道消息类型") + + return jsonify({"message": "Unsupported message type"}) + +def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): + msg_content=msg_data["Content"]["string"] + if wxid == from_wxid: #主动发送消息 + logger.info("Active message sending detected") + + gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid]) + callback_to_user=msg_data["ToUserName"]["string"] + + input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] + input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + else: + callback_to_user=msg_data["FromUserName"]["string"] + + prompt={"role": "user", "content": [{ + "type": "text", + "text": msg_content + }]} + messages_to_send=get_messages_from_cache(hash_key, prompt) + # 收到的对话 + input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] + input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid) + if cache_data and cache_data.get('interactive'): + messages_to_send=[{"role": "user", "content": msg_content}] - if 'Data' in msg: - msg_data=msg.get("Data") - msg_type=msg_data.get("MsgType") - if msg_type == 1:#ContextType.TEXT: # 文字 - msg_content=msg_data["Content"]["string"] - #print(msg_content) - - wxid=msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - - app_id=msg["Appid"] - callback_to_user=msg_data["FromUserName"]["string"] - # towxuser=touser.get("") + res=fast_gpt_api(messages_to_send,wxid) + reply_content=res["choices"][0]["message"]["content"] + + description = '' + userSelectOptions = [] + + if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): + for item in reply_content: + if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": + params = item["interactive"]["params"] + description = params.get("description") + userSelectOptions = params.get("userSelectOptions", []) + values_string = "\n".join(option["value"] for option in userSelectOptions) + + if description is not None: + memory.USER_INTERACTIVE_CACHE[wxid] = { + "interactive":True + } + reply_content=description + '------------------------------\n'+values_string + + elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): + memory.USER_INTERACTIVE_CACHE[wxid] = { + "interactive":False + } + text='' + for item in reply_content: + if item["type"] == "text": + text=item["text"]["content"] + if text=='': + # 去除上次上一轮对话再次请求 + cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") + cache_messages = json.loads(cache_messages_str) if cache_messages_str else [] - # token_id="ce50e2c376c843a9a281af3a1a0f4420" - token_id="9ba29f73-e46a-40b5-873d-795490f732e3" + if len(cache_messages) >= 3: + cache_messages = cache_messages[:-3] - prompt={"role": "user", "content": [{ - "type": "text", - "text": msg_content - }]} + redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False)) messages_to_send=get_messages_from_cache(hash_key, prompt) - # 收到的对话 - input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] - input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - - cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid) - if cache_data and cache_data.get('interactive'): - messages_to_send=[{"role": "user", "content": msg_content}] - res=fast_gpt_api(messages_to_send,wxid) reply_content=res["choices"][0]["message"]["content"] - - description = '' - userSelectOptions = [] - - if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): - for item in reply_content: - if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": - params = item["interactive"]["params"] - description = params.get("description") - userSelectOptions = params.get("userSelectOptions", []) - values_string = "\n".join(option["value"] for option in userSelectOptions) - - if description is not None: - memory.USER_INTERACTIVE_CACHE[wxid] = { - "interactive":True - } - reply_content=description + '------------------------------\n'+values_string - - elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): - memory.USER_INTERACTIVE_CACHE[wxid] = { - "interactive":False - } - text='' - for item in reply_content: - if item["type"] == "text": - text=item["text"]["content"] - if text=='': - # 去除上次上一轮对话再次请求 - cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") - cache_messages = json.loads(cache_messages_str) if cache_messages_str else [] - - if len(cache_messages) >= 3: - cache_messages = cache_messages[:-3] - - redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False)) - messages_to_send=get_messages_from_cache(hash_key, prompt) - res=fast_gpt_api(messages_to_send,wxid) - reply_content=res["choices"][0]["message"]["content"] - else: - reply_content=text - else: - memory.USER_INTERACTIVE_CACHE[wxid] = { - "interactive":False - } - reply_content=res["choices"][0]["message"]["content"] - - print(f'token_id {token_id}') - print(f'app_id {app_id}') - print(f'touser: {callback_to_user}') - # print(f'towxuser:{towxuser}') - print(reply_content) - gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content) - get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) - # 回复的对话 - input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] - input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - - if msg_type == 3: #图片 - - token_id="9ba29f73-e46a-40b5-873d-795490f732e3" - app_id=msg["Appid"] - callback_to_user=msg_data["FromUserName"]["string"] - msg_content=msg_data["Content"]["string"] - - print(f'token_id {token_id}') - print(f'app_id {app_id}') - print(f'touser: {callback_to_user}') - # print(res_content) - wxid=msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content) - - oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" - oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" - oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" - oss_bucket_name="cow-agent" - oss_prefix="cow" - - img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix) - - prompt={ - "role": "user", - "content": [{ - "type": "image_url", - "image_url": {"url": img_url} - }] - } - - get_messages_from_cache(hash_key, prompt) - gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务') - print(img_url) - - wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}] - input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - - - if msg_type == 34: # 语音 - token_id="9ba29f73-e46a-40b5-873d-795490f732e3" - callback_to_user=msg_data["FromUserName"]["string"] - app_id=msg["Appid"] - msg_content=msg_data["Content"]["string"] - msg_id=msg_data["MsgId"] - - wxid=msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - - print(f'token_id {token_id}') - print(f'app_id {app_id}') - print(f'touser: {callback_to_user}') - print(f'msg_id:{msg_id}') + else: + reply_content=text + else: + memory.USER_INTERACTIVE_CACHE[wxid] = { + "interactive":False + } + reply_content=res["choices"][0]["message"]["content"] + + # print(f'token_id {token_id}') + # print(f'app_id {app_id}') + # print(f'touser: {callback_to_user}') + # # print(f'towxuser:{towxuser}') + # print(reply_content) + gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content) + get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) + # 回复的对话 + input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] + input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + +def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): + msg_content=msg_data["Content"]["string"] + + callback_to_user=from_wxid + hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content) + + oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" + oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" + oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" + oss_bucket_name="cow-agent" + oss_prefix="cow" + + img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix) + + prompt={ + "role": "user", + "content": [{ + "type": "image_url", + "image_url": {"url": img_url} + }] + } + get_messages_from_cache(hash_key, prompt) + gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务') + logger.debug(f"Uploaded image URL: {img_url}") + + wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}] + input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + +def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): + callback_to_user=from_wxid + msg_content=msg_data["Content"]["string"] + msg_id=msg_data["MsgId"] + + + file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content) + react_silk_path=utils.save_to_local_from_url(file_url) + react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav" + audio_convert.any_to_wav(react_silk_path,react_wav_path) + react_voice_text=AliVoice().voiceToText(react_wav_path) + + messages=get_messages_from_cache(hash_key, {"role": "user", "content": react_voice_text}) + ai_res=fast_gpt_api(messages,wxid) + ai_res_content=ai_res["choices"][0]["message"]["content"] + reply_text_voice=AliVoice().textToVoice(ai_res_content) + reply_text_voice_path=os.path.join(os.getcwd(), reply_text_voice) + reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk" + reply_silk_during=audio_convert.any_to_sil(reply_text_voice_path,reply_silk_path) + + # print(int(reply_silk_during)) + # print(reply_silk_path) + oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" + oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" + oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" + oss_bucket_name="cow-agent" + oss_prefix="cow" + + file_path=reply_silk_path + file_url = utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix) + print(file_url) + res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,file_url,int(reply_silk_during)) + # 删除临时文件 + os.remove(react_silk_path) + os.remove(react_wav_path) + os.remove(reply_text_voice_path) + os.remove(reply_silk_path) + + get_messages_from_cache(hash_key, {"role": "assistant", "content": ai_res_content}) + +def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): + msg_content_xml=msg_data["Content"]["string"] + root = ET.fromstring(msg_content_xml) + type_value = root.find(".//appmsg/type").text + handlers = { + 57: handle_xml_reference, + } + handler = handlers.get(type_value) + if handler: + return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key) + else: + print(f"xml消息 {type_value} 未解析") + +def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): + ''' + 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57 + ''' + callback_to_user=from_wxid + msg_content= msg_data["PushContent"] + + prompt={"role": "user", "content": [{ + "type": "text", + "text": msg_content + }]} + + # 收到的对话 + messages_to_send=get_messages_from_cache(hash_key, prompt) + input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] + input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + # 回复的对话 + res=fast_gpt_api(messages_to_send,wxid) + reply_content=res["choices"][0]["message"]["content"] + input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] + input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) + gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content) + +def handle_add_friend_info(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): + ''' + 好友添加请求通知 + ''' + msg_content_xml=msg_data["Content"]["string"] + + root = ET.fromstring(msg_content_xml) + msg_content = root.attrib.get('content', None) + + message_hash_key = f"__AI_OPS_WX__:MESSAGES:{msg_data["ToUserName"]["string"]}" + prompt={"role": "user", "content": [{"type": "text","text": msg_content}]} + messages_to_send=get_messages_from_cache(message_hash_key, prompt) + + input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] + input_message=utils.dialogue_message(wxid,msg_data["ToUserName"]["string"],input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + + # # 更改通信方向 + to_wxid=wxid + wxid=msg_data["ToUserName"]["string"] + + res=fast_gpt_api(messages_to_send,wxid) + reply_content=res["choices"][0]["message"]["content"] + + #保存好友信息 + gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_wxid]) + + # 保存到缓存 + get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) + + # 发送信息 + gewe_chat.wxchat.post_text(token_id,app_id, to_wxid,reply_content) + + # 发送到kafka + input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] + input_message=utils.dialogue_message(wxid,to_wxid,input_wx_content_dialogue_message,True) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + +def handle_mod_contacts(token_id,app_id,msg_data): + ''' + 好友通过验证及好友资料变更的通知消息 + ''' + contact_wxid = msg_data["UserName"]["string"] + keys=redis_helper.redis_helper.client.keys('__AI_OPS_WX__:LOGININFO:*') + + wxid="" + for k in keys: + # 将字节字符串转换为普通字符串 + # key_set.add(key.decode('utf-8')) + key=k.decode('utf-8') + hash_key=f"__AI_OPS_WX__:LOGININFO:{key}" + cache_app_id=redis_helper.redis_helper.get_hash_field(hash_key,"appId") + if app_id==cache_app_id: + wxid=redis_helper.redis_helper.get_hash_field(hash_key,"wxid") + break + + if wxid!="": + hash_key=f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" + cache_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") + cache = json.loads(cache_str) if cache_str else [] + for c in cache: + if c["userName"]==contact_wxid: + c["nickName"] = msg_data["NickName"] + c["snsBgImg"] = msg_data["SnsBgimgId"] + c["smallHeadImgUrl"] = msg_data["SmallHeadImgUrl"] - file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content) - react_silk_path=utils.save_to_local_from_url(file_url) - react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav" - audio_convert.any_to_wav(react_silk_path,react_wav_path) - react_voice_text=AliVoice().voiceToText(react_wav_path) - - messages=get_messages_from_cache(hash_key, {"role": "user", "content": react_voice_text}) - ai_res=fast_gpt_api(messages,wxid) - ai_res_content=ai_res["choices"][0]["message"]["content"] - reply_text_voice=AliVoice().textToVoice(ai_res_content) - reply_text_voice_path=os.path.join(os.getcwd(), reply_text_voice) - reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk" - reply_silk_during=audio_convert.any_to_sil(reply_text_voice_path,reply_silk_path) - - # print(int(reply_silk_during)) - # print(reply_silk_path) - oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" - oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" - oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" - oss_bucket_name="cow-agent" - oss_prefix="cow" - - file_path=reply_silk_path - file_url = utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix) - print(file_url) - res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,file_url,int(reply_silk_during)) - # 删除临时文件 - os.remove(react_silk_path) - os.remove(react_wav_path) - os.remove(reply_text_voice_path) - os.remove(reply_silk_path) - - get_messages_from_cache(hash_key, {"role": "assistant", "content": ai_res}) - - - return jsonify({"message": "微信回调成功"}) + # 更新缓存,如果有修改过的话 + if cache: + updated_cache_str = json.dumps(cache) + redis_helper.redis_helper.set_hash_field(hash_key, "data", updated_cache_str) + + + + + def get_messages_from_cache(hash_key,object:object)->object: messages=redis_helper.redis_helper.get_hash(hash_key) @@ -243,9 +393,9 @@ def get_messages_from_cache(hash_key,object:object)->object: redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600) return messages - def fast_gpt_api(messages:list,session_id:str): - api_key="sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w" + #api_key="sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w" + api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" api_url = "http://106.15.182.218:3000/api/v1/chat/completions" headers = { @@ -266,35 +416,3 @@ def fast_gpt_api(messages:list,session_id:str): logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False))) print(response_data) return response_data - - -def fast_gpt_api1(messages:list,session_id:str): - - - api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" - api_url = "http://106.15.182.218:3000/api/v1/chat/completions" - # headers = { - - # "Content-Type": "application/json", - # "Authorization": f"Bearer {api_key}" - # } - - logger.info("[CHATGPT] 请求={}".format(json.dumps(messages, ensure_ascii=False))) - args = { - "model": "", - "request_timeout": 600, # 请求超时时间,openai接口默认设置为600,对于难问题一般需要较长时间 - "timeout": 600, # 重试超时时间,在这个时间内,将会自动重试, - "chatId": session_id, - "detail": True - } - # response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - openai.api_base=api_url - response = openai.ChatCompletion.create(api_key=api_key, messages=messages, **args) - - - - response.raise_for_status() - response_data = response.json() - logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False))) - print(response_data) - return response_data \ No newline at end of file diff --git a/resources/messages_resource.txt b/resources/messages_resource.txt new file mode 100644 index 0000000..8cf3ef9 --- /dev/null +++ b/resources/messages_resource.txt @@ -0,0 +1,243 @@ +from flask_restful import Resource, reqparse +from flask import jsonify,request +from bridge.context import ContextType +import requests,json +from wechat import gewe_chat +from voice.ali.ali_voice import AliVoice +from common import utils,redis_helper,memory,kafka_helper +from common.log import logger +import openai +import xml.etree.ElementTree as ET + +import os + +from voice import audio_convert + + +class MessagesResource(Resource): + def __init__(self): + self.parser = reqparse.RequestParser() + + def post(self): + """处理微信回调的POST请求""" + msg = request.get_json() + if 'Data' not in msg: + return jsonify({"message": "无效的数据格式"}), 400 # 如果数据格式不正确,返回错误信息 + + msg_data = msg.get("Data") + msg_type = msg_data.get("MsgType") + + # 根据不同的消息类型调用对应的处理函数 + if msg_type == 1: # 文字消息 + return self.handle_text_message(msg) + elif msg_type == 3: # 图片消息 + return self.handle_image_message(msg) + elif msg_type == 34: # 语音消息 + return self.handle_voice_message(msg) + elif msg_type == 49: # 引用消息 + return self.handle_replied_message(msg) + + return jsonify({"message": "不支持的消息类型"}), 400 # 如果消息类型不支持,返回错误信息 + + def handle_text_message(self, msg): + """处理文字消息""" + msg_data = msg["Data"] + wxid = msg["Wxid"] + hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + app_id = msg["Appid"] + callback_to_user = msg_data["FromUserName"]["string"] + msg_content = msg_data["Content"]["string"] + + # 构造 GPT 请求的 prompt + prompt = {"role": "user", "content": [{"type": "text", "text": msg_content}]} + messages_to_send = self.get_messages_from_cache(hash_key, prompt) + + # 将消息发送到 Kafka 中 + self.send_to_kafka(wxid, callback_to_user, msg_content) + + # 调用 GPT API 获取回复 + res = self.fast_gpt_api(messages_to_send, wxid) + reply_content = self.process_reply_content(res, wxid) + + # 将 GPT 回复的内容发送到微信 + gewe_chat.wxchat.post_text(msg["TokenId"], app_id, callback_to_user, reply_content) + self.get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) + + return jsonify({"message": "文字消息处理成功"}) + + def handle_image_message(self, msg): + """处理图片消息""" + msg_data = msg["Data"] + wxid = msg["Wxid"] + hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + app_id = msg["Appid"] + callback_to_user = msg_data["FromUserName"]["string"] + img_url = msg_data["Content"]["string"] + + # 图片下载并上传至 OSS + wx_img_url = gewe_chat.wxchat.download_image_msg(msg["TokenId"], app_id, img_url) + # img_url = self.upload_image_to_oss(wx_img_url) + oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" + oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" + oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" + oss_bucket_name="cow-agent" + oss_prefix="cow" + + img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix) + + + # 发送确认消息 + gewe_chat.wxchat.post_text(msg["TokenId"], app_id, callback_to_user, '已经上传了图片,有什么可以为您服务') + + # 构造消息并发送到 Kafka + wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": img_url}}] + self.send_to_kafka(wxid, callback_to_user, wx_content_dialogue_message) + + return jsonify({"message": "图片消息处理成功"}) + + def handle_voice_message(self, msg): + """处理语音消息""" + msg_data = msg["Data"] + wxid = msg["Wxid"] + hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + app_id = msg["Appid"] + callback_to_user = msg_data["FromUserName"]["string"] + msg_id = msg_data["MsgId"] + msg_content = msg_data["Content"]["string"] + + # 下载语音文件并转为文本 + file_url = gewe_chat.wxchat.download_audio_msg(msg["TokenId"], app_id, msg_id, msg_content) + react_silk_path = utils.save_to_local_from_url(file_url) + react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav" + audio_convert.any_to_wav(react_silk_path, react_wav_path) + react_voice_text = AliVoice().voiceToText(react_wav_path) + + # 发送到 GPT 获取回复 + messages_to_send = self.get_messages_from_cache(hash_key, {"role": "user", "content": react_voice_text}) + ai_res = self.fast_gpt_api(messages_to_send, wxid) + ai_res_content = ai_res["choices"][0]["message"]["content"] + + # 将 GPT 的文本转换成语音并上传到 OSS + reply_text_voice = AliVoice().textToVoice(ai_res_content) + reply_text_voice_path = os.path.join(os.getcwd(), reply_text_voice) + reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk" + audio_convert.any_to_sil(reply_text_voice_path, reply_silk_path) + + file_url = self.upload_audio_to_oss(reply_silk_path) + + # 发送语音回复 + res = gewe_chat.wxchat.post_voice(msg["TokenId"], app_id, callback_to_user, file_url, int(reply_silk_during)) + + # 删除临时文件 + self.delete_temp_files([react_silk_path, react_wav_path, reply_text_voice_path, reply_silk_path]) + + return jsonify({"message": "语音消息处理成功"}) + + def handle_replied_message(self, msg): + """处理引用消息""" + msg_data = msg["Data"] + wxid = msg["Wxid"] + hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + app_id = msg["Appid"] + callback_to_user = msg_data["FromUserName"]["string"] + msg_content_xml = msg_data["Content"]["string"] + + # 解析 XML,获取引用消息 + root = ET.fromstring(msg_content_xml) + type_value = root.find(".//appmsg/type").text + if type_value == '57': # 如果是引用消息 + prompt = {"role": "user", "content": [{"type": "text", "text": msg_content_xml}]} + messages_to_send = self.get_messages_from_cache(hash_key, prompt) + + # 发送到 Kafka + self.send_to_kafka(wxid, callback_to_user, msg_content_xml) + + # 获取 GPT 回复并发送 + res = self.fast_gpt_api(messages_to_send, wxid) + reply_content = res["choices"][0]["message"]["content"] + gewe_chat.wxchat.post_text(msg["TokenId"], app_id, callback_to_user, reply_content) + + return jsonify({"message": "引用消息处理成功"}) + + def get_messages_from_cache(self, hash_key, object: object) -> object: + """从缓存中获取消息并更新缓存""" + messages = redis_helper.redis_helper.get_hash(hash_key) + if not messages: + messages = [{"role": "system", "content": ""}] + messages.append(object) + redis_helper.redis_helper.set_hash(hash_key, {"data": json.dumps(messages, ensure_ascii=False)}, 3600) + else: + messages_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") + messages = json.loads(messages_str) if messages_str else [] + last_message = messages[-1] + content = last_message.get("content", []) + if isinstance(content, list) and content: + last_content_type = content[-1].get("type") + if last_content_type == 'image_url': + content.append(object['content'][0]) + messages[-1]['content'] = content + else: + messages.append(object) + else: + messages.append(object) + + redis_helper.redis_helper.set_hash(hash_key, {"data": json.dumps(messages, ensure_ascii=False)}, 3600) + return messages + + def send_to_kafka(self, wxid, callback_to_user, msg_content): + """将消息发送到 Kafka""" + input_wx_content_dialogue_message = [{"type": "text", "text": msg_content}] + input_message = utils.dialogue_message(callback_to_user, wxid, input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s", input_message) + + def fast_gpt_api(self, messages: list, session_id: str): + """调用 GPT API 获取回复""" + api_key = "sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w" + api_url = "http://106.15.182.218:3000/api/v1/chat/completions" + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} + data = {"model": "", "messages": messages, "chatId": session_id, "detail": True} + + try: + response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + logger.error(f"GPT API 请求失败: {e}") + return {"error": "API 请求失败"} + + def process_reply_content(self, res, wxid): + """处理 GPT 回复内容""" + reply_content = res["choices"][0]["message"]["content"] + if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): + return self.process_interactive_reply(reply_content, wxid) + elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): + return self.process_text_reply(reply_content, wxid) + else: + return reply_content + + def process_interactive_reply(self, reply_content, wxid): + """处理交互式回复""" + description = '' + user_select_options = [] + for item in reply_content: + if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": + params = item["interactive"]["params"] + description = params.get("description") + user_select_options = params.get("userSelectOptions", []) + values_string = "\n".join(option["value"] for option in user_select_options) + + if description: + memory.USER_INTERACTIVE_CACHE[wxid] = {"interactive": True} + return f"{description}\n------------------------------\n{values_string}" + return reply_content + + def process_text_reply(self, reply_content, wxid): + """处理文本回复""" + memory.USER_INTERACTIVE_CACHE[wxid] = {"interactive": False} + text = next((item["text"]["content"] for item in reply_content if item["type"] == "text"), '') + if not text: + messages_to_send = self.get_messages_from_cache(f"__AI_OPS_WX__:MESSAGES:{wxid}", {"role": "user", "content": text}) + res = self.fast_gpt_api(messages_to_send, wxid) + return res["choices"][0]["message"]["content"] + return text diff --git a/wechat/gewe_chat.py b/wechat/gewe_chat.py index 7bdf629..239f595 100644 --- a/wechat/gewe_chat.py +++ b/wechat/gewe_chat.py @@ -10,6 +10,7 @@ import requests from io import BytesIO from PIL import Image +from common import redis_helper wxchat=None @@ -710,7 +711,87 @@ class GeWeChatCom: else: print(f"请求失败,状态码: {response.status_code}") +############################### 其他 ############################### + # def save_contacts_brief_to_cache(self, token_id, app_id, wxid, contacts_wxids: list): + # # 将联系人信息保存到 Redis,使用一个合适的 key + # hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" + + # # 获取缓存中的数据,如果缓存不存在则初始化为空列表 + # cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") + # cache = json.loads(cache_str) if cache_str else [] + + # # 如果缓存为空,获取并保存联系人详细信息 + # if not cache: + # friends_brief = self.get_brief_info(token_id, app_id, contacts_wxids) + # contacts_no_info=[] + # for f in friends_brief: + # if f["nickName"]: + # cache.append(f) + # else: + # contacts_no_info.append(f) + # if len(contacts_no_info)!=0: + # f_detail = self.get_detail_info(token_id, app_id, contacts_no_info) + # cache.extend(f_detail) + + # # 更新缓存 + # redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) + # else: + # # 获取现有缓存中的 userName 列表,避免重复 + # existing_usernames = {contact['userName'] for contact in cache} + # new_contacts_wxids = [contact_wxid for contact_wxid in contacts_wxids if contact_wxid not in existing_usernames] + + # if new_contacts_wxids: + # f_detail = self.get_detail_info(token_id, app_id, new_contacts_wxids) + # print(f_detail) + # cache.extend(f_detail) + # # 更新缓存 + # redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) + def save_contacts_brief_to_cache(self, token_id, app_id, wxid, contacts_wxids: list): + """ + 将联系人信息保存到 Redis 缓存。 + """ + # Redis 缓存的 key + hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" + + # 获取缓存中的数据 + cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") + cache = json.loads(cache_str) if cache_str else [] + + if not cache: + # 缓存为空,分批处理 contacts_wxids + batch_size = 100 + for i in range(0, len(contacts_wxids), batch_size): + batch = contacts_wxids[i:i + batch_size] + friends_brief = self.get_brief_info(token_id, app_id, batch) + + friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]] + cache.extend(f for f in friends_brief if f["nickName"]) + + if friends_no_brief_wxid: + detailed_info = self.get_detail_info(token_id, app_id, friends_no_brief_wxid) + cache.extend(detailed_info) + + # 更新缓存 + redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) + return + + # 缓存已存在,检查新联系人 + existing_usernames = {contact['userName'] for contact in cache} + new_contacts_wxids = [wxid for wxid in contacts_wxids if wxid not in existing_usernames] + + # 如果有新联系人,分批获取详细信息并更新缓存 + if new_contacts_wxids: + batch_size = 20 + for i in range(0, len(new_contacts_wxids), batch_size): + batch = new_contacts_wxids[i:i + batch_size] + detailed_info = self.get_detail_info(token_id, app_id, batch) + cache.extend(detailed_info) + + redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) + + + def start(): global wxchat # base_url = "http://192.168.88.11:2531"