diff --git a/app.py b/app.py index 31a704f..00f67e5 100644 --- a/app.py +++ b/app.py @@ -40,22 +40,6 @@ def save_friends_to_redis(wxid, friends): hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(friends,ensure_ascii=False)}) -# def save_contacts_brief_to_redis(wxid, friends): -# # 将联系人信息保存到 Redis,使用一个合适的 key -# hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" - -# # 获取缓存中的数据,如果缓存不存在则初始化为空列表 -# cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") -# cache = json.loads(cache_str) if cache_str else [] - -# # 合并联系人信息 -# cache.extend(friends) - -# # 将合并后的联系人数据保存回 Redis -# redis_helper.redis_helper.update_hash_field(hash_key, "data", { -# "data": json.dumps(cache, ensure_ascii=False) -# }) - def worker(): kafka_helper.start() @@ -97,133 +81,195 @@ def start_wxchat_thread_free(): wxchat.fetch_contacts_list(token_id, app_id) +# def start_wxchat_thread(): +# gewe_chat.start() +# wxchat=gewe_chat.wxchat +# # token_id = wxchat.get_token_id() +# token_id='f828cb3c-1039-489f-b9ae-7494d1778a15' +# tel='18029274615' +# region_id='440000' +# print(f'tokenId: {token_id}') + +# hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" +# login_info=redis_helper.redis_helper.get_hash(hash_key) +# if not login_info: +# qr_code = wxchat.get_login_qr_code(token_id) +# # print(qr_code) +# base64_string = qr_code.get('qrImgBase64') +# app_id = qr_code.get('appId') +# uuid = qr_code.get('uuid') +# # print(f'appId: {app_id}') +# # print(f'uuid: {uuid}') +# wxchat.qrCallback(uuid,base64_string) +# while True: +# res=wxchat.check_login(token_id,app_id,uuid) +# flag=res.get('status') +# if flag == 2: +# print(res) +# login_info=res.get('loginInfo',{}) +# login_info['appId'] = app_id +# login_info['uuid'] = uuid +# login_info['tokenId'] = token_id +# print(login_info) +# cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} +# redis_helper.redis_helper.set_hash(hash_key,cleaned_login_info) +# break +# time.sleep(3) + + +# # call_back_url="http://xgejpm.natappfree.cc/messages" +# # res=wxchat.callback_collect(token_id,call_back_url) +# # print(f'设置回调地址:{call_back_url}') +# print(token_id) +# print(app_id) +# contacts_list=wxchat.fetch_contacts_list(token_id, app_id) + +# friend_wxids=contacts_list['friends'][3:] +# print(friend_wxids) +# wxid=redis_helper.redis_helper.get_hash_field(hash_key,'wxid') +# # friends_brief=wxchat.get_brief_info(token_id,app_id,friend_wxids) +# # utils.save_contacts_brief_to_redis(wxid,friends_brief) +# wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,friend_wxids) +# print(f'{wxid} 登录 {app_id} 成功') +# else: +# app_id=login_info.get('appId') +# token_id=login_info.get('tokenId') + +# is_online=wxchat.check_online(token_id,app_id) +# if is_online: +# print("已经登录微信") +# else: +# # 尝试重连 +# res=wxchat.reconnection(token_id,app_id) +# flag=res.get('ret') +# if flag==200: +# print(f'重连成功') +# else: +# # 删除缓存 +# # redis_helper.redis_helper.delete_hash(hash_key) +# # 重新登录 +# qr_code = wxchat.get_login_qr_code(token_id,app_id) +# # print(qr_code) +# base64_string = qr_code.get('qrImgBase64') +# app_id = qr_code.get('appId') +# uuid = qr_code.get('uuid') +# print(f'appId: {app_id}') +# print(f'uuid: {uuid}') +# wxchat.qrCallback(uuid,base64_string) +# while True: +# res=wxchat.check_login(token_id,app_id,uuid) +# flag=res.get('status') +# if flag == 2: +# print(res) +# login_info=res.get('loginInfo',{}) +# login_info['appId'] = app_id +# login_info['uuid'] = uuid +# login_info['tokenId'] = token_id +# print(login_info) +# cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} +# redis_helper.redis_helper.set_hash(hash_key,cleaned_login_info) +# break +# time.sleep(3) + +# print('重新登录成功') +# print(token_id) +# print(app_id) + +# contacts_list=wxchat.fetch_contacts_list(token_id, app_id) +# friend_wxids=contacts_list['friends'][3:] +# # friend_wxids=['wxid_95rrm8l6tznb21'] +# wxid=redis_helper.redis_helper.get_hash_field(hash_key,'wxid') +# wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,friend_wxids) +# print(f'{wxid} 登录 {app_id} 成功') +# # wxchat.fetch_contacts_list(token_id,app_id) + +# # wxchat.get_detail_info(token_id,app_id,["wxid_qycp69orck8412"]) + +def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key, is_reconnect=False): + """ + 封装微信登录或重连的逻辑 + """ + if is_reconnect: + print("尝试重连...") + else: + print("获取二维码进行登录...") + + qr_code = wxchat.get_login_qr_code(token_id, app_id) + base64_string = qr_code.get('qrImgBase64') + uuid = qr_code.get('uuid') + + app_id = app_id or qr_code.get('appId') + + wxchat.qrCallback(uuid, base64_string) + + while True: + res = wxchat.check_login(token_id, app_id, uuid) + flag = res.get('status') + if flag == 2: + print(f"登录成功: {res}") + login_info = res.get('loginInfo', {}) + login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id}) + + cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} + redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info) + return login_info + time.sleep(3) + +def fetch_and_save_contacts(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key): + """ + 获取联系人列表并保存到缓存 + """ + contacts_list = wxchat.fetch_contacts_list(token_id, app_id) + friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围 + wxid = redis_helper.redis_helper.get_hash_field(hash_key, 'wxid') + + wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids) + print(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已保存') + def start_wxchat_thread(): gewe_chat.start() - wxchat=gewe_chat.wxchat - # token_id = wxchat.get_token_id() - token_id='f828cb3c-1039-489f-b9ae-7494d1778a15' - tel='18029274615' - region_id='440000' - print(f'tokenId: {token_id}') + wxchat = gewe_chat.wxchat + # token_id = 'f828cb3c-1039-489f-b9ae-7494d1778a15' + # tel = '18029274615' + tel=os.environ.get('tel', '18029274615') + token_id=os.environ.get('tokenId', 'f828cb3c-1039-489f-b9ae-7494d1778a15') + + + # tel=os.environ.get('tel', '19200137635') + # token_id=os.environ.get('tokenId', '9ba29f73-e46a-40b5-873d-795490f732e3') + + # region_id = '440000' + # print(f'tokenId: {token_id}') hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" - login_info=redis_helper.redis_helper.get_hash(hash_key) + login_info = redis_helper.redis_helper.get_hash(hash_key) + if not login_info: - qr_code = wxchat.get_login_qr_code(token_id) - # print(qr_code) - base64_string = qr_code.get('qrImgBase64') - app_id = qr_code.get('appId') - uuid = qr_code.get('uuid') - print(f'appId: {app_id}') - print(f'uuid: {uuid}') - wxchat.qrCallback(uuid,base64_string) - while True: - res=wxchat.check_login(token_id,app_id,uuid) - flag=res.get('status') - if flag == 2: - print(res) - login_info=res.get('loginInfo',{}) - login_info['appId'] = app_id - login_info['uuid'] = uuid - login_info['tokenId'] = token_id - print(login_info) - cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} - redis_helper.redis_helper.set_hash(hash_key,cleaned_login_info) - break - time.sleep(3) - - - # call_back_url="http://xgejpm.natappfree.cc/messages" - # res=wxchat.callback_collect(token_id,call_back_url) - # print(f'设置回调地址:{call_back_url}') - print(token_id) - print(app_id) - contacts_list=wxchat.fetch_contacts_list(token_id, app_id) - - friend_wxids=contacts_list['friends'][3:] - print(friend_wxids) - wxid=redis_helper.redis_helper.get_hash_field(hash_key,'wxid') - # friends_brief=wxchat.get_brief_info(token_id,app_id,friend_wxids) - # utils.save_contacts_brief_to_redis(wxid,friends_brief) - wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,friend_wxids) - print(f'{wxid} 登录 {app_id} 成功') - else: - app_id=login_info.get('appId') - token_id=login_info.get('tokenId') - - is_online=wxchat.check_online(token_id,app_id) + login_info = login_or_reconnect(wxchat, token_id, '', hash_key) + else: + app_id = login_info.get('appId') + token_id = login_info.get('tokenId') + wxid= login_info.get('wxid') + # 检查是否已经登录 + is_online = wxchat.check_online(token_id, app_id) if is_online: - print("已经登录微信") + logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线') else: # 尝试重连 - res=wxchat.reconnection(token_id,app_id) - flag=res.get('ret') - if flag==200: - print(f'重连成功') + res = wxchat.reconnection(token_id, app_id) + if res.get('ret') == 200: + logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功') else: - # 删除缓存 - # redis_helper.redis_helper.delete_hash(hash_key) - # 重新登录 - qr_code = wxchat.get_login_qr_code(token_id,app_id) - # print(qr_code) - base64_string = qr_code.get('qrImgBase64') - app_id = qr_code.get('appId') - uuid = qr_code.get('uuid') - print(f'appId: {app_id}') - print(f'uuid: {uuid}') - wxchat.qrCallback(uuid,base64_string) - while True: - res=wxchat.check_login(token_id,app_id,uuid) - flag=res.get('status') - if flag == 2: - print(res) - login_info=res.get('loginInfo',{}) - login_info['appId'] = app_id - login_info['uuid'] = uuid - login_info['tokenId'] = token_id - print(login_info) - cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} - redis_helper.redis_helper.set_hash(hash_key,cleaned_login_info) - break - time.sleep(3) - - print('重新登录成功') - print(token_id) - print(app_id) - - contacts_list=wxchat.fetch_contacts_list(token_id, app_id) - friend_wxids=contacts_list['friends'][3:] - # friend_wxids=['wxid_95rrm8l6tznb21'] - wxid=redis_helper.redis_helper.get_hash_field(hash_key,'wxid') - wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,friend_wxids) - print(f'{wxid} 登录 {app_id} 成功') - #wxchat.post_text(token_id,app_id,'wxid_eigw91zpsl1p22','你好') + print("重连失败,重新登录...") + login_info = login_or_reconnect(wxchat, token_id, app_id, hash_key, is_reconnect=True) + + fetch_and_save_contacts(wxchat, token_id, login_info.get('appId'), hash_key) + def app_run(): flask_app = Flask(__name__) - # @flask_app.route('/silk/') - # def serve_mp3(filename): - # # 指定 audios 目录的路径 - # directory = 'silk' - # # 检查文件是否存在 - # if not filename.endswith('.silk') or not os.path.isfile(os.path.join(directory, filename)): - # return 'File not found', 404 - # # 使用 send_from_directory 发送文件 - # return send_from_directory(directory, filename, as_attachment=False) - - # @flask_app.route('/messages', methods=['POST']) - # def gewe_message(): - # msg = request.get_json() - # print(msg) - # # PushContent = msg['Data']['PushContent'] - # # print(msg) - - # # print(PushContent) - - # return jsonify({'message': 'PushContent'}) - # api = Api(app) flask_api = Api(flask_app,errors=errors, catch_all_404s=True) @@ -247,7 +293,7 @@ def app_run(): # channel = channel_factory.create_channel('wx') # channel.startup() - environment = os.environ.get('environment', 'default') # 默认是生产环境 + environment = os.environ.get('environment', 'default') port=5000 if environment== 'default': port=80 diff --git a/common/kafka_helper.py b/common/kafka_helper.py index 019d45d..c32cf5b 100644 --- a/common/kafka_helper.py +++ b/common/kafka_helper.py @@ -1,5 +1,5 @@ from confluent_kafka import Producer, Consumer, KafkaException, KafkaError -import os +import os,time from config import conf # 定义全局 redis_helper kafka_client = None @@ -98,6 +98,75 @@ class KafkaClient: finally: self.consumer.close() + def consume_messages(self,agent_tel,process_callback): + """ + 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量 + :param process_callback: 业务逻辑回调函数,返回布尔值 + :param agent_tel: 代理商手机号 + """ + consumer=Consumer({ + 'bootstrap.servers': self.bootstrap_servers, + 'group.id': f'aiops-wx_{agent_tel}', + 'auto.offset.reset': 'earliest', + # 'enable.auto.commit': False # 禁用自动提交,使用手动提交 + 'enable.auto.commit': True + }) + consumer.subscribe([self.topic]) + + try: + while True: + msg = consumer.poll(0.3) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + print(f"End of partition {msg.partition}, offset {msg.offset()}") + else: + raise KafkaException(msg.error()) + else: + # 调用业务处理逻辑 + # process_callback(msg.value().decode('utf-8')) + # 调用业务处理逻辑,传递 user_nickname 和消息 + process_callback(agent_tel,msg.value().decode('utf-8')) + # if process_callback(user_nickname, msg.value().decode('utf-8')): + # # 如果返回 True,表示处理成功,可以提交偏移量 + # try: + # # self.consumer.commit(msg) + # self.consumer.commit(message=msg, asynchronous=True) + # print(f"Manually committed offset: {msg.offset()}") + # except KafkaException as e: + # print(f"Error committing offset: {e}") + except KafkaException as e: + print(f"Kafka exception occurred: {e}") + if 'KafkaError._ALL_BROKERS_DOWN' in str(e): + print(f"Kafka brokers for agent {agent_tel} are down, retrying in 5 seconds...") + time.sleep(5) + self._reconnect_consumer_with_agent_tel(consumer, agent_tel) + except Exception as e: + print(f"An unexpected error occurred: {e}") + time.sleep(5) + + + def _reconnect_consumer_with_agent_tel(self, consumer, agent_tel): + """ + 尝试为指定的代理商重新连接 Kafka 消费者 + """ + print(f"Attempting to reconnect Kafka consumer for agent {agent_tel}...") + try: + consumer.close() # Close the old consumer + consumer = Consumer({ + 'bootstrap.servers': self.bootstrap_servers, + 'group.id': f'aiops-wx_{agent_tel}', + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': True + }) + consumer.subscribe([self.topic]) + print(f"Reconnected successfully for agent {agent_tel}.") + except KafkaException as e: + print(f"Error while reconnecting for agent {agent_tel}: {e}") + time.sleep(5) # Retry after 5 seconds + + def start(): global kafka_client kafka_client = KafkaClient() \ No newline at end of file diff --git a/common/utils.py b/common/utils.py index 1d2ccec..6557aa6 100644 --- a/common/utils.py +++ b/common/utils.py @@ -7,6 +7,8 @@ from PIL import Image from common.log import logger import oss2,time,json from urllib.parse import urlparse, unquote +from voice.ali.ali_voice import AliVoice +from voice import audio_convert from common import redis_helper @@ -243,6 +245,42 @@ def dialogue_message(wxid_from:str,wxid_to:str,wx_content:list,is_ai:bool=False) return json.dumps(data, separators=(',', ':'), ensure_ascii=False) +def wx_voice(text: str): + try: + # 将文本转换为语音 + reply_text_voice = AliVoice().textToVoice(text) + reply_text_voice_path = os.path.join(os.getcwd(), reply_text_voice) + + # 转换为 Silk 格式 + reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk" + reply_silk_during = audio_convert.any_to_sil(reply_text_voice_path, reply_silk_path) + + # OSS 配置(建议将凭证存储在安全的地方) + oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" + oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" + oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" + oss_bucket_name="cow-agent" + oss_prefix="cow" + + # 上传文件到 OSS + file_path = reply_silk_path + file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix) + + # 删除临时文件 + try: + os.remove(reply_text_voice_path) + except FileNotFoundError: + pass # 如果文件未找到,跳过删除 + try: + os.remove(reply_silk_path) + except FileNotFoundError: + pass # 如果文件未找到,跳过删除 + + return int(reply_silk_during), file_url + except Exception as e: + print(f"发生错误:{e}") + return None, None # 发生错误时返回 None + def save_contacts_brief_to_redis(wxid, friends): # 将联系人信息保存到 Redis,使用一个合适的 key diff --git a/requirements.txt b/requirements.txt index 1bb28cf..821f777 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,8 +33,12 @@ flask_restful confluent_kafka av #pilk -silk-python -# pysilk-mod + +# silk-python +# pysilk + + +pysilk-mod #pip3 install pysilk-mod oss2 diff --git a/resources/messages_resource-bk.py b/resources/messages_resource-bk.py deleted file mode 100644 index 49eec5a..0000000 --- a/resources/messages_resource-bk.py +++ /dev/null @@ -1,319 +0,0 @@ -from flask_restful import Resource, reqparse -from flask import jsonify,request -from bridge.context import ContextType -import requests,json -from wechat import gewe_chat -from voice.ali.ali_voice import AliVoice -from common import utils,redis_helper,memory,kafka_helper -from common.log import logger -import openai -import xml.etree.ElementTree as ET - - -import os - -from voice import audio_convert - -class MessagesBKResource(Resource): - def __init__(self): - self.parser = reqparse.RequestParser() - - def post(self): - msg = request.get_json() - logger.debug(f"Received message: {msg}") - - if 'Data' in msg: - msg_data=msg.get("Data") - msg_type=msg_data.get("MsgType") - if msg_type == 1:#ContextType.TEXT: # 文字 - msg_content=msg_data["Content"]["string"] - #print(msg_content) - logger.info(msg_content) - - app_id=msg["Appid"] - wxid=msg["Wxid"] - from_wxid=msg_data["FromUserName"]["string"] - to_wxid=msg_data["ToUserName"]["string"] - - token_id="f828cb3c-1039-489f-b9ae-7494d1778a15" - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - - if wxid == from_wxid: #主动发送消息 - logger.info("Active message sending detected") - gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid]) - callback_to_user=msg_data["ToUserName"]["string"] - - input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] - input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - else: - callback_to_user=msg_data["FromUserName"]["string"] - - prompt={"role": "user", "content": [{ - "type": "text", - "text": msg_content - }]} - messages_to_send=get_messages_from_cache(hash_key, prompt) - # 收到的对话 - input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] - input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - - cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid) - if cache_data and cache_data.get('interactive'): - messages_to_send=[{"role": "user", "content": msg_content}] - - res=fast_gpt_api(messages_to_send,wxid) - reply_content=res["choices"][0]["message"]["content"] - - description = '' - userSelectOptions = [] - - if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): - for item in reply_content: - if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": - params = item["interactive"]["params"] - description = params.get("description") - userSelectOptions = params.get("userSelectOptions", []) - values_string = "\n".join(option["value"] for option in userSelectOptions) - - if description is not None: - memory.USER_INTERACTIVE_CACHE[wxid] = { - "interactive":True - } - reply_content=description + '------------------------------\n'+values_string - - elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): - memory.USER_INTERACTIVE_CACHE[wxid] = { - "interactive":False - } - text='' - for item in reply_content: - if item["type"] == "text": - text=item["text"]["content"] - if text=='': - # 去除上次上一轮对话再次请求 - cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") - cache_messages = json.loads(cache_messages_str) if cache_messages_str else [] - - if len(cache_messages) >= 3: - cache_messages = cache_messages[:-3] - - redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False)) - messages_to_send=get_messages_from_cache(hash_key, prompt) - res=fast_gpt_api(messages_to_send,wxid) - reply_content=res["choices"][0]["message"]["content"] - else: - reply_content=text - else: - memory.USER_INTERACTIVE_CACHE[wxid] = { - "interactive":False - } - reply_content=res["choices"][0]["message"]["content"] - - print(f'token_id {token_id}') - print(f'app_id {app_id}') - print(f'touser: {callback_to_user}') - # print(f'towxuser:{towxuser}') - print(reply_content) - gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content) - get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) - # 回复的对话 - input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] - input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - - if msg_type == 3: #图片 - - token_id="f828cb3c-1039-489f-b9ae-7494d1778a15" - app_id=msg["Appid"] - callback_to_user=msg_data["FromUserName"]["string"] - msg_content=msg_data["Content"]["string"] - - print(f'token_id {token_id}') - print(f'app_id {app_id}') - print(f'touser: {callback_to_user}') - # print(res_content) - wxid=msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content) - - oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" - oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" - oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" - oss_bucket_name="cow-agent" - oss_prefix="cow" - - img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix) - - prompt={ - "role": "user", - "content": [{ - "type": "image_url", - "image_url": {"url": img_url} - }] - } - - get_messages_from_cache(hash_key, prompt) - gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务') - logger.debug(f"Uploaded image URL: {img_url}") - - wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}] - input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - - if msg_type == 34: # 语音 - token_id="f828cb3c-1039-489f-b9ae-7494d1778a15" - callback_to_user=msg_data["FromUserName"]["string"] - app_id=msg["Appid"] - msg_content=msg_data["Content"]["string"] - msg_id=msg_data["MsgId"] - - wxid=msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - - print(f'token_id {token_id}') - print(f'app_id {app_id}') - print(f'touser: {callback_to_user}') - print(f'msg_id:{msg_id}') - - - file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content) - react_silk_path=utils.save_to_local_from_url(file_url) - react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav" - audio_convert.any_to_wav(react_silk_path,react_wav_path) - react_voice_text=AliVoice().voiceToText(react_wav_path) - - messages=get_messages_from_cache(hash_key, {"role": "user", "content": react_voice_text}) - ai_res=fast_gpt_api(messages,wxid) - ai_res_content=ai_res["choices"][0]["message"]["content"] - reply_text_voice=AliVoice().textToVoice(ai_res_content) - reply_text_voice_path=os.path.join(os.getcwd(), reply_text_voice) - reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk" - reply_silk_during=audio_convert.any_to_sil(reply_text_voice_path,reply_silk_path) - - # print(int(reply_silk_during)) - # print(reply_silk_path) - oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" - oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" - oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" - oss_bucket_name="cow-agent" - oss_prefix="cow" - - file_path=reply_silk_path - file_url = utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix) - print(file_url) - res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,file_url,int(reply_silk_during)) - # 删除临时文件 - os.remove(react_silk_path) - os.remove(react_wav_path) - os.remove(reply_text_voice_path) - os.remove(reply_silk_path) - - get_messages_from_cache(hash_key, {"role": "assistant", "content": ai_res}) - - if msg_type == 49: - msg_content_xml=msg_data["Content"]["string"] - root = ET.fromstring(msg_content_xml) - type_value = root.find(".//appmsg/type").text - if type_value==57: # 引用消息 - ''' - # 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57 - ''' - wxid=msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - - app_id=msg["Appid"] - callback_to_user=msg_data["FromUserName"]["string"] - # towxuser=touser.get("") - - # token_id="ce50e2c376c843a9a281af3a1a0f4420" - token_id="f828cb3c-1039-489f-b9ae-7494d1778a15" - - prompt={"role": "user", "content": [{ - "type": "text", - "text": msg_content - }]} - - # 收到的对话 - messages_to_send=get_messages_from_cache(hash_key, prompt) - input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] - input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - - # 回复的对话 - res=fast_gpt_api(messages_to_send,wxid) - reply_content=res["choices"][0]["message"]["content"] - input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] - input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) - gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content) - - return jsonify({"message": "微信回调成功"}) - - -def get_messages_from_cache(hash_key,object:object)->object: - messages=redis_helper.redis_helper.get_hash(hash_key) - wxid=hash_key.split(':')[-1] - if not messages: - messages=[{"role": "system", "content": ""}] - messages.append(object) - redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600) - else: - - messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") - messages = json.loads(messages_str) if messages_str else [] - #判断是否含有图片 - last_message = messages[-1] - # print(last_message) - # print('~~~~~~~~~~') - # 获取 content 并判断其是否为列表 - content = last_message.get("content", []) - if isinstance(content, list) and content: - last_content_type = content[-1].get("type") - if last_content_type == 'image_url': - content.append(object['content'][0]) - messages[-1]['content']=content - else: - messages.append(object) - else: - messages.append(object) - - - # messages.append({"role": "user", "content": msg_content}) - #messages.append(object) - # print(messages) - redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600) - return messages - - -def fast_gpt_api(messages:list,session_id:str): - #api_key="sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w" - api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" - api_url = "http://106.15.182.218:3000/api/v1/chat/completions" - headers = { - - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}" - } - data={ - "model": "", - "messages":messages, - "chatId": session_id, - "detail": True - } - print(json.dumps(data,ensure_ascii=False)) - logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False))) - response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) - response.raise_for_status() - response_data = response.json() - logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False))) - print(response_data) - return response_data diff --git a/resources/messages_resource.py b/resources/messages_resource.py index f7a012b..26350b6 100644 --- a/resources/messages_resource.py +++ b/resources/messages_resource.py @@ -18,19 +18,18 @@ class MessagesResource(Resource): def __init__(self): self.parser = reqparse.RequestParser() + def post(self): msg = request.get_json() - logger.info(f"Received message: {msg}") - - # if msg and 'Data' not in msg: - # logger.warning(f"未知消息") - # return jsonify({"message": "未知消息"}) - - - + logger.info(f"收到微信回调消息: {msg}") type_name =msg.get("TypeName") app_id = msg.get("Appid") - token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15" + # token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15" + token_id=get_token_id_by_app_id(app_id) + + if token_id=="": + logger.warning('找不到登录信息,不处理') + return jsonify({"message": "收到微信回调消息"}) if type_name=='AddMsg': wxid = msg.get("Wxid") @@ -38,20 +37,19 @@ class MessagesResource(Resource): msg_type = msg_data.get("MsgType",None) from_wxid = msg_data["FromUserName"]["string"] to_wxid = msg_data["ToUserName"]["string"] - message_hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" handlers = { 1: handle_text, 3: handle_image, 34: handle_voice, 49: handle_xml, - 37: handle_add_friend_info + 37: handle_add_friend_notice } handler = handlers.get(msg_type) if handler: - return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, message_hash_key) + return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid) else: - logger.warning(f"消息类型{msg_type}未处理") + logger.warning(f"微信回调消息类型 {msg_type} 未处理") elif type_name=='ModContacts': ''' 好友通过验证及好友资料变更的通知消息 @@ -69,13 +67,18 @@ class MessagesResource(Resource): 退出群聊 ''' else: - logger.warning(f"不知道消息类型") + logger.warning(f"未知消息类型") - return jsonify({"message": "Unsupported message type"}) + return jsonify({"message": "收到微信回调消息"}) + -def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): +def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid): + ''' + 文本消息 + ''' msg_content=msg_data["Content"]["string"] - if wxid == from_wxid: #主动发送消息 + + if wxid == from_wxid: #手动发送消息 logger.info("Active message sending detected") gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid]) @@ -86,7 +89,9 @@ def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): kafka_helper.kafka_client.produce_message(input_message) logger.info("发送对话 %s",input_message) else: + callback_to_user=msg_data["FromUserName"]["string"] + hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' prompt={"role": "user", "content": [{ "type": "text", @@ -103,7 +108,7 @@ def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): if cache_data and cache_data.get('interactive'): messages_to_send=[{"role": "user", "content": msg_content}] - res=fast_gpt_api(messages_to_send,wxid) + res=fast_gpt_api(messages_to_send,f'{wxid}-{callback_to_user}') reply_content=res["choices"][0]["message"]["content"] description = '' @@ -141,7 +146,7 @@ def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False)) messages_to_send=get_messages_from_cache(hash_key, prompt) - res=fast_gpt_api(messages_to_send,wxid) + res=fast_gpt_api(messages_to_send,f'{wxid}-{callback_to_user}') reply_content=res["choices"][0]["message"]["content"] else: reply_content=text @@ -164,11 +169,14 @@ def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): kafka_helper.kafka_client.produce_message(input_message) logger.info("发送对话 %s",input_message) -def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): +def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid): + ''' + 图片消息 + ''' msg_content=msg_data["Content"]["string"] callback_to_user=from_wxid - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" + hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content) oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" @@ -189,14 +197,17 @@ def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): get_messages_from_cache(hash_key, prompt) gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务') - logger.debug(f"Uploaded image URL: {img_url}") + logger.info(f"上传图片 URL: {img_url}") wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}] input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message) kafka_helper.kafka_client.produce_message(input_message) logger.info("发送对话 %s",input_message) -def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): +def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid): + ''' + 语音消息 + ''' callback_to_user=from_wxid msg_content=msg_data["Content"]["string"] msg_id=msg_data["MsgId"] @@ -208,35 +219,34 @@ def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): audio_convert.any_to_wav(react_silk_path,react_wav_path) react_voice_text=AliVoice().voiceToText(react_wav_path) + os.remove(react_silk_path) + os.remove(react_wav_path) + + hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' messages=get_messages_from_cache(hash_key, {"role": "user", "content": react_voice_text}) - ai_res=fast_gpt_api(messages,wxid) + ai_res=fast_gpt_api(messages,f'{wxid}-{callback_to_user}') ai_res_content=ai_res["choices"][0]["message"]["content"] - reply_text_voice=AliVoice().textToVoice(ai_res_content) - reply_text_voice_path=os.path.join(os.getcwd(), reply_text_voice) - reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk" - reply_silk_during=audio_convert.any_to_sil(reply_text_voice_path,reply_silk_path) - - # print(int(reply_silk_during)) - # print(reply_silk_path) - oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" - oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" - oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" - oss_bucket_name="cow-agent" - oss_prefix="cow" - - file_path=reply_silk_path - file_url = utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, oss_prefix) - print(file_url) - res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,file_url,int(reply_silk_during)) + voice_during,voice_url=utils.wx_voice(ai_res_content) + + ret,ret_msg,res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,voice_url,voice_during) # 删除临时文件 - os.remove(react_silk_path) - os.remove(react_wav_path) - os.remove(reply_text_voice_path) - os.remove(reply_silk_path) - get_messages_from_cache(hash_key, {"role": "assistant", "content": ai_res_content}) + if ret==200: + get_messages_from_cache(hash_key, {"role": "assistant", "content": ai_res_content}) + logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}')) -def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): + # 构造对话消息并发送到 Kafka + input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}] + input_message = utils.dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s", input_message) + else: + logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}')) + +def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid): + ''' + 处理xml + ''' msg_content_xml=msg_data["Content"]["string"] root = ET.fromstring(msg_content_xml) type_value = root.find(".//appmsg/type").text @@ -245,15 +255,17 @@ def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): } handler = handlers.get(type_value) if handler: - return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key) + return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid) else: print(f"xml消息 {type_value} 未解析") -def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): +def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57 ''' callback_to_user=from_wxid + hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' + msg_content= msg_data["PushContent"] prompt={"role": "user", "content": [{ @@ -269,7 +281,7 @@ def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash logger.info("发送对话 %s",input_message) # 回复的对话 - res=fast_gpt_api(messages_to_send,wxid) + res=fast_gpt_api(messages_to_send,f'{wxid}-{callback_to_user}') reply_content=res["choices"][0]["message"]["content"] input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) @@ -278,59 +290,82 @@ def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content) -def handle_add_friend_info(token_id,app_id, wxid,msg_data,from_wxid, to_wxid, hash_key): +def handle_add_friend_notice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 好友添加请求通知 ''' + logger.info('好友添加请求通知') + msg_content_xml=msg_data["Content"]["string"] root = ET.fromstring(msg_content_xml) + msg_content = root.attrib.get('content', None) + v3= root.attrib.get('encryptusername', None) + v4= root.attrib.get('ticket', None) + scene=root.attrib.get('scene', None) - message_hash_key = f"__AI_OPS_WX__:MESSAGES:{msg_data["ToUserName"]["string"]}" - prompt={"role": "user", "content": [{"type": "text","text": msg_content}]} - messages_to_send=get_messages_from_cache(message_hash_key, prompt) + + to_contact_wxid=root.attrib.get('fromusername', None) + wxid=msg_data["ToUserName"]["string"] - input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] - input_message=utils.dialogue_message(wxid,msg_data["ToUserName"]["string"],input_wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) + # 自动同意好友 + # print(v3) + # print(v4) + # print(scene) + # print(msg_content) + # 操作类型,2添加好友 3同意好友 4拒绝好友 + #option=2 + option=3 + reply_add_contact_contact="亲,我是你的美丽顾问" + ret,ret_msg=gewe_chat.wxchat.add_contacts(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact) + if ret==200: + logger.info('自动添加好友成功') + + # 好友发送的文字 + hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}' + prompt={"role": "user", "content": [{"type": "text","text": msg_content}]} + messages_to_send=get_messages_from_cache(hash_key, prompt) + input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] + input_message=utils.dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) - # # 更改通信方向 - to_wxid=wxid - wxid=msg_data["ToUserName"]["string"] + callback_to_user=to_contact_wxid + res=fast_gpt_api(messages_to_send,f'{wxid}-{callback_to_user}') + reply_content=res["choices"][0]["message"]["content"] - res=fast_gpt_api(messages_to_send,wxid) - reply_content=res["choices"][0]["message"]["content"] + #保存好友信息 + gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_contact_wxid]) - #保存好友信息 - gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_wxid]) + # 保存到缓存 + get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) - # 保存到缓存 - get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) + # 发送信息 + gewe_chat.wxchat.post_text(token_id,app_id, to_contact_wxid,reply_content) + + # 发送到kafka + input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] + input_message=utils.dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) - # 发送信息 - gewe_chat.wxchat.post_text(token_id,app_id, to_wxid,reply_content) - - # 发送到kafka - input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] - input_message=utils.dialogue_message(wxid,to_wxid,input_wx_content_dialogue_message,True) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) + else: + logger.warning("添加好友失败") def handle_mod_contacts(token_id,app_id,msg_data): ''' 好友通过验证及好友资料变更的通知消息 ''' + logger.info('好友通过验证及好友资料变更的通知消息') + contact_wxid = msg_data["UserName"]["string"] keys=redis_helper.redis_helper.client.keys('__AI_OPS_WX__:LOGININFO:*') wxid="" for k in keys: - # 将字节字符串转换为普通字符串 - # key_set.add(key.decode('utf-8')) key=k.decode('utf-8') hash_key=f"__AI_OPS_WX__:LOGININFO:{key}" cache_app_id=redis_helper.redis_helper.get_hash_field(hash_key,"appId") @@ -347,17 +382,12 @@ def handle_mod_contacts(token_id,app_id,msg_data): c["nickName"] = msg_data["NickName"] c["snsBgImg"] = msg_data["SnsBgimgId"] c["smallHeadImgUrl"] = msg_data["SmallHeadImgUrl"] + c["signature"]= msg_data["Signature"] # 更新缓存,如果有修改过的话 if cache: updated_cache_str = json.dumps(cache) redis_helper.redis_helper.set_hash_field(hash_key, "data", updated_cache_str) - - - - - - def get_messages_from_cache(hash_key,object:object)->object: messages=redis_helper.redis_helper.get_hash(hash_key) @@ -367,14 +397,10 @@ def get_messages_from_cache(hash_key,object:object)->object: messages.append(object) redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600) else: - messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") messages = json.loads(messages_str) if messages_str else [] #判断是否含有图片 last_message = messages[-1] - # print(last_message) - # print('~~~~~~~~~~') - # 获取 content 并判断其是否为列表 content = last_message.get("content", []) if isinstance(content, list) and content: last_content_type = content[-1].get("type") @@ -385,11 +411,6 @@ def get_messages_from_cache(hash_key,object:object)->object: messages.append(object) else: messages.append(object) - - - # messages.append({"role": "user", "content": msg_content}) - #messages.append(object) - # print(messages) redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600) return messages @@ -416,3 +437,22 @@ def fast_gpt_api(messages:list,session_id:str): logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False))) print(response_data) return response_data + + +def get_token_id_by_app_id(app_id: str) -> str: + # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描 + cursor = 0 + while True: + cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*') + + # 批量获取所有键的 hash 数据 + for k in login_keys: + r = redis_helper.redis_helper.get_hash(k) + if r.get("appId") == app_id: + return r.get("tokenId", "") + + # 如果游标为 0,则表示扫描完成 + if cursor == 0: + break + + return "" diff --git a/resources/messages_resource.txt b/resources/messages_resource.txt deleted file mode 100644 index 8cf3ef9..0000000 --- a/resources/messages_resource.txt +++ /dev/null @@ -1,243 +0,0 @@ -from flask_restful import Resource, reqparse -from flask import jsonify,request -from bridge.context import ContextType -import requests,json -from wechat import gewe_chat -from voice.ali.ali_voice import AliVoice -from common import utils,redis_helper,memory,kafka_helper -from common.log import logger -import openai -import xml.etree.ElementTree as ET - -import os - -from voice import audio_convert - - -class MessagesResource(Resource): - def __init__(self): - self.parser = reqparse.RequestParser() - - def post(self): - """处理微信回调的POST请求""" - msg = request.get_json() - if 'Data' not in msg: - return jsonify({"message": "无效的数据格式"}), 400 # 如果数据格式不正确,返回错误信息 - - msg_data = msg.get("Data") - msg_type = msg_data.get("MsgType") - - # 根据不同的消息类型调用对应的处理函数 - if msg_type == 1: # 文字消息 - return self.handle_text_message(msg) - elif msg_type == 3: # 图片消息 - return self.handle_image_message(msg) - elif msg_type == 34: # 语音消息 - return self.handle_voice_message(msg) - elif msg_type == 49: # 引用消息 - return self.handle_replied_message(msg) - - return jsonify({"message": "不支持的消息类型"}), 400 # 如果消息类型不支持,返回错误信息 - - def handle_text_message(self, msg): - """处理文字消息""" - msg_data = msg["Data"] - wxid = msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - app_id = msg["Appid"] - callback_to_user = msg_data["FromUserName"]["string"] - msg_content = msg_data["Content"]["string"] - - # 构造 GPT 请求的 prompt - prompt = {"role": "user", "content": [{"type": "text", "text": msg_content}]} - messages_to_send = self.get_messages_from_cache(hash_key, prompt) - - # 将消息发送到 Kafka 中 - self.send_to_kafka(wxid, callback_to_user, msg_content) - - # 调用 GPT API 获取回复 - res = self.fast_gpt_api(messages_to_send, wxid) - reply_content = self.process_reply_content(res, wxid) - - # 将 GPT 回复的内容发送到微信 - gewe_chat.wxchat.post_text(msg["TokenId"], app_id, callback_to_user, reply_content) - self.get_messages_from_cache(hash_key, {"role": "assistant", "content": reply_content}) - - return jsonify({"message": "文字消息处理成功"}) - - def handle_image_message(self, msg): - """处理图片消息""" - msg_data = msg["Data"] - wxid = msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - app_id = msg["Appid"] - callback_to_user = msg_data["FromUserName"]["string"] - img_url = msg_data["Content"]["string"] - - # 图片下载并上传至 OSS - wx_img_url = gewe_chat.wxchat.download_image_msg(msg["TokenId"], app_id, img_url) - # img_url = self.upload_image_to_oss(wx_img_url) - oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" - oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" - oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" - oss_bucket_name="cow-agent" - oss_prefix="cow" - - img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix) - - - # 发送确认消息 - gewe_chat.wxchat.post_text(msg["TokenId"], app_id, callback_to_user, '已经上传了图片,有什么可以为您服务') - - # 构造消息并发送到 Kafka - wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": img_url}}] - self.send_to_kafka(wxid, callback_to_user, wx_content_dialogue_message) - - return jsonify({"message": "图片消息处理成功"}) - - def handle_voice_message(self, msg): - """处理语音消息""" - msg_data = msg["Data"] - wxid = msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - app_id = msg["Appid"] - callback_to_user = msg_data["FromUserName"]["string"] - msg_id = msg_data["MsgId"] - msg_content = msg_data["Content"]["string"] - - # 下载语音文件并转为文本 - file_url = gewe_chat.wxchat.download_audio_msg(msg["TokenId"], app_id, msg_id, msg_content) - react_silk_path = utils.save_to_local_from_url(file_url) - react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav" - audio_convert.any_to_wav(react_silk_path, react_wav_path) - react_voice_text = AliVoice().voiceToText(react_wav_path) - - # 发送到 GPT 获取回复 - messages_to_send = self.get_messages_from_cache(hash_key, {"role": "user", "content": react_voice_text}) - ai_res = self.fast_gpt_api(messages_to_send, wxid) - ai_res_content = ai_res["choices"][0]["message"]["content"] - - # 将 GPT 的文本转换成语音并上传到 OSS - reply_text_voice = AliVoice().textToVoice(ai_res_content) - reply_text_voice_path = os.path.join(os.getcwd(), reply_text_voice) - reply_silk_path = os.path.splitext(reply_text_voice_path)[0] + ".silk" - audio_convert.any_to_sil(reply_text_voice_path, reply_silk_path) - - file_url = self.upload_audio_to_oss(reply_silk_path) - - # 发送语音回复 - res = gewe_chat.wxchat.post_voice(msg["TokenId"], app_id, callback_to_user, file_url, int(reply_silk_during)) - - # 删除临时文件 - self.delete_temp_files([react_silk_path, react_wav_path, reply_text_voice_path, reply_silk_path]) - - return jsonify({"message": "语音消息处理成功"}) - - def handle_replied_message(self, msg): - """处理引用消息""" - msg_data = msg["Data"] - wxid = msg["Wxid"] - hash_key = f"__AI_OPS_WX__:MESSAGES:{wxid}" - app_id = msg["Appid"] - callback_to_user = msg_data["FromUserName"]["string"] - msg_content_xml = msg_data["Content"]["string"] - - # 解析 XML,获取引用消息 - root = ET.fromstring(msg_content_xml) - type_value = root.find(".//appmsg/type").text - if type_value == '57': # 如果是引用消息 - prompt = {"role": "user", "content": [{"type": "text", "text": msg_content_xml}]} - messages_to_send = self.get_messages_from_cache(hash_key, prompt) - - # 发送到 Kafka - self.send_to_kafka(wxid, callback_to_user, msg_content_xml) - - # 获取 GPT 回复并发送 - res = self.fast_gpt_api(messages_to_send, wxid) - reply_content = res["choices"][0]["message"]["content"] - gewe_chat.wxchat.post_text(msg["TokenId"], app_id, callback_to_user, reply_content) - - return jsonify({"message": "引用消息处理成功"}) - - def get_messages_from_cache(self, hash_key, object: object) -> object: - """从缓存中获取消息并更新缓存""" - messages = redis_helper.redis_helper.get_hash(hash_key) - if not messages: - messages = [{"role": "system", "content": ""}] - messages.append(object) - redis_helper.redis_helper.set_hash(hash_key, {"data": json.dumps(messages, ensure_ascii=False)}, 3600) - else: - messages_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") - messages = json.loads(messages_str) if messages_str else [] - last_message = messages[-1] - content = last_message.get("content", []) - if isinstance(content, list) and content: - last_content_type = content[-1].get("type") - if last_content_type == 'image_url': - content.append(object['content'][0]) - messages[-1]['content'] = content - else: - messages.append(object) - else: - messages.append(object) - - redis_helper.redis_helper.set_hash(hash_key, {"data": json.dumps(messages, ensure_ascii=False)}, 3600) - return messages - - def send_to_kafka(self, wxid, callback_to_user, msg_content): - """将消息发送到 Kafka""" - input_wx_content_dialogue_message = [{"type": "text", "text": msg_content}] - input_message = utils.dialogue_message(callback_to_user, wxid, input_wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s", input_message) - - def fast_gpt_api(self, messages: list, session_id: str): - """调用 GPT API 获取回复""" - api_key = "sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w" - api_url = "http://106.15.182.218:3000/api/v1/chat/completions" - headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} - data = {"model": "", "messages": messages, "chatId": session_id, "detail": True} - - try: - response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) - response.raise_for_status() - return response.json() - except requests.exceptions.RequestException as e: - logger.error(f"GPT API 请求失败: {e}") - return {"error": "API 请求失败"} - - def process_reply_content(self, res, wxid): - """处理 GPT 回复内容""" - reply_content = res["choices"][0]["message"]["content"] - if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): - return self.process_interactive_reply(reply_content, wxid) - elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): - return self.process_text_reply(reply_content, wxid) - else: - return reply_content - - def process_interactive_reply(self, reply_content, wxid): - """处理交互式回复""" - description = '' - user_select_options = [] - for item in reply_content: - if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": - params = item["interactive"]["params"] - description = params.get("description") - user_select_options = params.get("userSelectOptions", []) - values_string = "\n".join(option["value"] for option in user_select_options) - - if description: - memory.USER_INTERACTIVE_CACHE[wxid] = {"interactive": True} - return f"{description}\n------------------------------\n{values_string}" - return reply_content - - def process_text_reply(self, reply_content, wxid): - """处理文本回复""" - memory.USER_INTERACTIVE_CACHE[wxid] = {"interactive": False} - text = next((item["text"]["content"] for item in reply_content if item["type"] == "text"), '') - if not text: - messages_to_send = self.get_messages_from_cache(f"__AI_OPS_WX__:MESSAGES:{wxid}", {"role": "user", "content": text}) - res = self.fast_gpt_api(messages_to_send, wxid) - return res["choices"][0]["message"]["content"] - return text diff --git a/wechat/biz.py b/wechat/biz.py index cd8a571..86045c0 100644 --- a/wechat/biz.py +++ b/wechat/biz.py @@ -2,93 +2,155 @@ import threading from common import kafka_helper,redis_helper,utils -import json,time,re,random +import json,time,re,random,os from common.log import logger, log_exception from wechat import gewe_chat -def wx_messages_process_callback(message): - wxchat=gewe_chat.wxchat - msg_content= message - cleaned_content = clean_json_string(msg_content) - content=json.loads(cleaned_content) - data = content.get("data", {}) - msg_type_data=data.get("msg_type",None) - content_data = data.get("content",{}) - agent_tel=content_data.get("agent_tel",None) - # print(message) - print(msg_type_data) +def wx_messages_process_callback(agent_tel,message): + try: + # print(f'手机号 {agent_tel}') + wxchat = gewe_chat.wxchat + msg_content = message + cleaned_content = clean_json_string(msg_content) + content = json.loads(cleaned_content) + data = content.get("data", {}) + + msg_type_data = data.get("msg_type", None) + content_data = data.get("content", {}) + agent_tel = content_data.get("agent_tel", None) + + if msg_type_data == 'group-sending': + process_group_sending(wxchat, content_data, agent_tel) + + except json.JSONDecodeError as e: + print(f"JSON解码错误: {e}, 消息内容: {message}") + except Exception as e: + print(f"处理消息时发生错误: {e}, 消息内容: {message}") +def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str): + # 获取登录信息 + hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}" + logininfo = redis_helper.redis_helper.get_hash(hash_key) + + if not logininfo: + logger.warning(f"未找到 {agent_tel} 的登录信息") + return + + token_id = logininfo.get('tokenId') + app_id = logininfo.get('appId') + agent_wxid = logininfo.get('wxid') + + # 获取联系人列表并计算交集 + # contacts_list = wxchat.fetch_contacts_list(token_id, app_id) + # contacts_list = wxchat.fetch_contacts_list(token_id, app_id) + # friend_wxids = contacts_list['friends'] + hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}" + friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") + friend_wxids_list=json.loads(friend_wxids_str) if friend_wxids_str else [] + friend_wxids=[f["userName"] for f in friend_wxids_list] + print(friend_wxids) + wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])] + intersection_wxids = list(set(friend_wxids) & set(wxid_contact_list_content_data)) + + # 发送消息 + wx_content_list = content_data.get("wx_content", []) + for wx_content in wx_content_list: + if wx_content["type"] == "text": + send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) + elif wx_content["type"] == "image_url": + send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url")) + elif wx_content["type"] == "tts": + send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) - # # 更新好友缓存 - # save_friends_to_redis(agent_tel,agent_nickname_data,friends) - if msg_type_data=='group-sending': - hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}" - # print('群发') +def send_text_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text): + for t in intersection_wxids: + # 发送文本消息 + ret,ret_msg,res = wxchat.post_text(token_id, app_id, t, text) + logger.info(f'{agent_wxid} 向 {t} 发送文字【{text}】') + + # 构造对话消息并发送到 Kafka + input_wx_content_dialogue_message = [{"type": "text", "text": text}] + input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s", input_message) - logininfo=redis_helper.redis_helper.get_hash(hash_key) - if logininfo: - token_id=logininfo.get('tokenId') - app_id=logininfo.get('appId') - agent_wxid=logininfo.get('wxid') - # print(token_id) - # print(app_id) - contacts_list=wxchat.fetch_contacts_list(token_id, app_id) - friend_wxids=contacts_list['friends'] - wxid_contact_list_content_data=[c['wxid'] for c in content_data.get("contact_list",None)] - intersection = list(set(friend_wxids) & set(wxid_contact_list_content_data)) - # print(intersection) - wx_content_list=content_data.get("wx_content",[]) - for wx_content in wx_content_list: - if wx_content["type"]=="text": - for t in intersection: - res=wxchat.post_text(token_id,app_id,t,wx_content["text"]) - logger.info(f'{agent_wxid} 向 {t} 发送 文字【{wx_content["text"]}】') - - input_wx_content_dialogue_message=[{"type": "text", "text": wx_content["text"]}] - input_message=utils.dialogue_message(agent_wxid,t,input_wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - # 等待随机时间 - time.sleep(random.uniform(5, 15)) - if wx_content["type"]=="image_url": - aeskey="" - cdnthumburl="" - cdnthumblength=0 - cdnthumbheight=0 - cdnthumbwidth=0 - length=0 - md5="" - image_url= wx_content.get("image_url",{}) - url=image_url.get("url",None) - for t in intersection: - if t == intersection[0]: - res=wxchat.post_image(token_id,app_id,t,url) - aeskey=res["aesKey"] - cdnthumburl=res["fileId"] - cdnthumblength=res["cdnThumbLength"] - cdnthumbheight=res["height"] - cdnthumbwidth=res["width"] - length=res["length"] - md5=res["md5"] - logger.info(f'{agent_wxid} 向 {t} 发送 图片【{url}】') - else: - res=wxchat.forward_image(token_id,app_id,t,aeskey,cdnthumburl,cdnthumblength,cdnthumbheight,cdnthumbwidth,length,md5) - logger.info(f'{agent_wxid} 向 {t} 转发送 图片【{url}】') - - wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": url}}] - input_message=utils.dialogue_message(agent_wxid,t,wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) + # 等待随机时间 + time.sleep(random.uniform(5, 15)) - time.sleep(random.uniform(5, 15)) - +def send_image_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, image_url): - + aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, "" + + for t in intersection_wxids: + if t == intersection_wxids[0]: + # 发送图片 + ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url) + if ret==200: + aeskey = res["aesKey"] + cdnthumburl = res["fileId"] + cdnthumblength = res["cdnThumbLength"] + cdnthumbheight = res["height"] + cdnthumbwidth = res["width"] + length = res["length"] + md5 = res["md5"] + logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') + else: + logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') + else: + if aeskey !="": + # 转发图片 + res,ret,ret_msg= wxchat.forward_image(token_id, app_id, t, aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5) + logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}') + else: + # 发送图片 + ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url) + if ret==200: + aeskey = res["aesKey"] + cdnthumburl = res["fileId"] + cdnthumblength = res["cdnThumbLength"] + cdnthumbheight = res["height"] + cdnthumbwidth = res["width"] + length = res["length"] + md5 = res["md5"] + logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') + else: + logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') + + # 构造对话消息并发送到 Kafka + wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}] + input_message = utils.dialogue_message(agent_wxid, t, wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s", input_message) + + # 等待随机时间 + time.sleep(random.uniform(5, 15)) +def send_tts_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text): + + voice_during,voice_url=utils.wx_voice(text) + for t in intersection_wxids: + # 发送送语音消息 + if voice_url: + ret,ret_msg,res = wxchat.post_voice(token_id, app_id, t, voice_url,voice_during) + if ret==200: + logger.info(f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}') + + # 构造对话消息并发送到 Kafka + input_wx_content_dialogue_message = [{"type": "text", "text": text}] + input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s", input_message) + else: + logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}')) + else: + logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】出错')) + + # 等待随机时间 + time.sleep(random.uniform(5, 15)) def clean_json_string(json_str): @@ -97,6 +159,7 @@ def clean_json_string(json_str): # 启动 Kafka 消费者线程 def start_kafka_consumer_thread(): - consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(wx_messages_process_callback,)) + agent_tel=os.environ.get('tel', '18029274615') + consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(agent_tel,wx_messages_process_callback,)) consumer_thread.daemon = True # 设置为守护线程,应用退出时会自动结束 consumer_thread.start() \ No newline at end of file diff --git a/wechat/gewe_chat.py b/wechat/gewe_chat.py index 239f595..7ee3fe2 100644 --- a/wechat/gewe_chat.py +++ b/wechat/gewe_chat.py @@ -14,319 +14,7 @@ from common import redis_helper wxchat=None - -class GeWeChat: - def __init__(self, base_url): - self.base_url = base_url -############################### 登录模块 ############################### - def get_token_id(self): - ''' - 获取header 的 token - ''' - - api_url = f"{self.base_url}/v2/api/tools/getTokenId" - response = requests.post(url=api_url) - response_data = response.json() - print(response_data) - return response_data.get('data') - - def get_login_qr_code(self, token_id): - ''' - 获取登录二维码(步骤2) - - - appId参数为设备ID,首次登录传空,会自动触发创建设备,掉线后重新登录则必须传接口返回的appId,注意同一个号避免重复创建设备,以免触发官方风控 - - 取码时传的appId需要与上次登录扫码的微信一致,否则会导致登录失败 - - 响应结果中的qrImgBase64为微信二维码图片的base64,前端需要将二维码图片展示给用户并进行手机扫码操作(PS: 扫码后调用步骤2,手机上才显示登录)。 - (或使用响应结果中的qrData生成二维码) - ''' - api_url = f"{self.base_url}/v2/api/login/getLoginQrCode" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": "" - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - return response_data.get('data') - - def check_login(self, token_id, app_id, uuid): - ''' - 执行登录(步骤3) - - 获取到登录二维码后需每间隔5s调用本接口来判断是否登录成功 - - 新设备登录平台,次日凌晨会掉线一次,重新登录时需调用获取二维码且传appId取码,登录成功后则可以长期在线 - - 登录成功后请保存appId与wxid的对应关系,后续接口中会用到 - - ''' - api_url = f"{self.base_url}/v2/api/login/checkLogin" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id, - "uuid": uuid - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - - def qrCallback(self,uuid, base64_string): - try: - from PIL import Image - base64_string = base64_string.split(',')[1] - img_data = base64.b64decode(base64_string) - img = Image.open(io.BytesIO(img_data)) - _thread = threading.Thread(target=img.show, args=("QRCode",)) - _thread.setDaemon(True) - _thread.start() - except Exception as e: - pass - - import qrcode - - # url = f"https://login.weixin.qq.com/l/{uuid}" - # http://weixin.qq.com/x/4b7fY2d93zNCXhHFkNk8 - - url = f"http://weixin.qq.com/x/{uuid}" - - qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url) - qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url) - qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url) - qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url) - print("You can also scan QRCode in any website below:") - print(qr_api3) - print(qr_api4) - print(qr_api2) - print(qr_api1) - # _send_qr_code([qr_api3, qr_api4, qr_api2, qr_api1]) - qr = qrcode.QRCode(border=1) - qr.add_data(url) - qr.make(fit=True) - qr.print_ascii(invert=True) - - def callback_collect(self,token_id,url): - ''' - 设置消息回调地址 - ''' - api_url = f"{self.base_url}/v2/api/tools/setCallback" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "token": token_id, - "callbackUrl": url - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('msg') - -############################### 账号管理 ############################### - def reconnection(self,token_id,app_id): - ''' - 断线重连 - - 当系统返回账号已离线,但是手机顶部还显示ipad在线,可用此接口尝试重连,若返回错误/失败则必须重新调用步骤一登录 - - 本接口非常用接口,可忽略 - - ''' - api_url = f"{self.base_url}/v2/api/login/reconnection" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - - def logout(self,token_id,app_id): - ''' - 退出 - ''' - - api_url = f"{self.base_url}/v2/api/login/logout" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - - def check_online(self,token_id,app_id): - ''' - 检查是否在线 - - 响应结果的data=true则是在线,反之为离线 - ''' - - api_url = f"{self.base_url}/v2/api/login/checkOnline" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - - -############################### 联系人模块 ############################### - def fetch_contacts_list(self, token_id, app_id): - ''' - 获取通讯录列表 - - 本接口为长耗时接口,耗时时间根据好友数量递增,若接口返回超时可通过获取通讯录列表缓存接口获取响应结果 - - 本接口返回的群聊仅为保存到通讯录中的群聊,若想获取会话列表中的所有群聊,需要通过消息订阅做二次处理。 - 原因:当未获取的群有成员在群内发消息的话会有消息回调, 开发者此刻调用获取群详情接口查询群信息入库保存即可, - 比如说手机上三年前不说话的群,侧滑删除了,用户手机上也不会看到被删除的群聊的 ,但是有群成员说了话他会显示, - 原理就是各个终端(Android、IOS、桌面版微信)取得了消息回调,又去获取群详情信息,本地数据库缓存了下来,显示的手机群聊,让用户感知的。 - ''' - - api_url = f"{self.base_url}/v2/api/contacts/fetchContactsList" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - - def fetch_contacts_list_cache(self, token_id, app_id): - ''' - 获取通讯录列表缓存 - - 通讯录列表数据缓存10分钟,超时则需要重新调用获取通讯录列表接口 - ''' - - api_url = f"{self.base_url}/v2/api/contacts/fetchContactsListCache" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - - def get_brief_info(self,token_id, app_id,wxids): - ''' - 获取群/好友简要信息 - 1<= wxids <=100 - ''' - api_url = f"{self.base_url}/v2/api/contacts/getBriefInfo" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id, - "wxids":wxids # list 1<= wxids <=100 - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - - def get_detail_info(self,token_id, app_id,wxids): - ''' - 获取群/好友详细信息 - 1<= wxids <=20 - ''' - api_url = f"{self.base_url}/v2/api/contacts/getDetailInfo" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id, - "wxids":wxids # list 1<= wxids <=20 - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - -############################### 消息模块 ############################### - def post_text(self,token_id,app_id,to_wxid,content): - api_url = f"{self.base_url}/v2/api/message/postText" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id, - "toWxid": to_wxid, - "content": content - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - - def post_image(self,token_id,app_id,to_wxid,img_url): - api_url = f"{self.base_url}/v2/api/message/postImage" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id, - "toWxid": to_wxid, - "imgUrl": img_url - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - - def post_voice(self,token_id,app_id,to_wxid,voice_url,voice_duration): - api_url = f"{self.base_url}/v2/api/message/postVoice" - headers = { - 'X-GEWE-TOKEN': token_id, - 'Content-Type': 'application/json' - } - data = { - "appId": app_id, - "toWxid": to_wxid, - "voiceUrl": voice_url, - "voiceDuration":voice_duration - } - response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') - + class GeWeChatCom: def __init__(self, base_url): self.base_url = base_url @@ -580,9 +268,8 @@ class GeWeChatCom: "content": content } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') + response_object = response.json() + return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def post_image(self,token_id,app_id,to_wxid,img_url): api_url = f"{self.base_url}/v2/api/message/postImage" @@ -596,9 +283,8 @@ class GeWeChatCom: "imgUrl": img_url } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') + response_object = response.json() + return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def post_voice(self,token_id,app_id,to_wxid,voice_url,voice_duration): api_url = f"{self.base_url}/v2/api/message/postVoice" @@ -613,9 +299,8 @@ class GeWeChatCom: "voiceDuration":voice_duration } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') + response_object = response.json() + return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def forward_image(self,token_id,app_id,to_wxid,aeskey,cdnthumburl,cdnthumblength,cdnthumbheight,cdnthumbwidth,length,md5): api_url = f"{self.base_url}/v2/api/message/forwardImage" @@ -630,9 +315,28 @@ class GeWeChatCom: } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) - response_data = response.json() - print(response_data) - return response_data.get('data') + response_object = response.json() + return response_object.get('data',None),response_object.get('ret',None),response_object.get('msg',None) + + def add_contacts(self,token_id:str,app_id:str,scene:int,option:int,v3:str,v4:str,content:str): + api_url = f"{self.base_url}/v2/api/contacts/addContacts" + headers = { + 'X-GEWE-TOKEN': token_id, + 'Content-Type': 'application/json' + } + data = { + "appId": app_id, + "scene": scene, + "option": option, + "v3":v3, + "v4":v4, + "content":content + } + response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) + response_object = response.json() + print(response_object) + return response_object.get('ret',None),response_object.get('msg',None) + ############################### 下载模块 ############################### @@ -712,41 +416,7 @@ class GeWeChatCom: print(f"请求失败,状态码: {response.status_code}") ############################### 其他 ############################### - # def save_contacts_brief_to_cache(self, token_id, app_id, wxid, contacts_wxids: list): - # # 将联系人信息保存到 Redis,使用一个合适的 key - # hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" - - # # 获取缓存中的数据,如果缓存不存在则初始化为空列表 - # cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") - # cache = json.loads(cache_str) if cache_str else [] - - # # 如果缓存为空,获取并保存联系人详细信息 - # if not cache: - # friends_brief = self.get_brief_info(token_id, app_id, contacts_wxids) - # contacts_no_info=[] - # for f in friends_brief: - # if f["nickName"]: - # cache.append(f) - # else: - # contacts_no_info.append(f) - # if len(contacts_no_info)!=0: - # f_detail = self.get_detail_info(token_id, app_id, contacts_no_info) - # cache.extend(f_detail) - - # # 更新缓存 - # redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) - # else: - # # 获取现有缓存中的 userName 列表,避免重复 - # existing_usernames = {contact['userName'] for contact in cache} - # new_contacts_wxids = [contact_wxid for contact_wxid in contacts_wxids if contact_wxid not in existing_usernames] - - # if new_contacts_wxids: - # f_detail = self.get_detail_info(token_id, app_id, new_contacts_wxids) - # print(f_detail) - # cache.extend(f_detail) - # # 更新缓存 - # redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) - + def save_contacts_brief_to_cache(self, token_id, app_id, wxid, contacts_wxids: list): """ 将联系人信息保存到 Redis 缓存。