import json import os import requests headers = { 'X-GEWE-TOKEN': os.getenv("X_GEWE_TOKEN"), 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } def download_audio_msg(msgId: int, xml: str): data = { "appId": os.getenv("GEWE_APP_ID"), "msgId": msgId, "xml": xml } print(json.dumps(data)) response = requests.post("http://api.geweapi.com/gewe/v2/api/message/downloadVoice", json=data, headers=headers) if response.ok: data = response.json() if data['ret'] == 200: print("Gewe download audio msg successfully.") print(data['data']['fileUrl']) return data['data']['fileUrl'] else: print("Gewe download audio msg in error.") return False else: return False def download_image_msg(xml: str): data = { "appId": "wx_HtqjtTglwIKjQHSsSUYDi", "type": 2, "xml": xml } print(json.dumps(data)) response = requests.post("http://api.geweapi.com/gewe/v2/api/message/downloadImage", json=data, headers=headers) if response.ok: data = response.json() if data['ret'] == 200: print("Gewe download audio msg successfully.") print(data['data']['fileUrl']) return data['data']['fileUrl'] else: print("Gewe download audio msg in error.") return False else: return False def download_audio_file(fileUrl: str, file_name: str): # 定义保存文件的本地路径和文件名 local_filename = f'./silk/{file_name}.silk' # 使用requests库的get方法获取文件内容 response = requests.get(fileUrl, stream=True) # 检查请求是否成功 if response.status_code == 200: # 打开文件以二进制写入模式 with open(local_filename, 'wb') as f: # 逐块写入文件,通常使用1024字节的块大小 for chunk in response.iter_content(1024): f.write(chunk) print(f"文件已成功下载到 {local_filename}") else: print(f"请求失败,状态码: {response.status_code}")