|
- import json
- import os
-
- import requests
-
- headers = {
- 'X-GEWE-TOKEN': os.getenv("X_GEWE_TOKEN"),
- 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
- 'Content-Type': 'application/json'
- }
-
-
- def download_audio_msg(msgId: int, xml: str):
- data = {
- "appId": os.getenv("GEWE_APP_ID"),
- "msgId": msgId,
- "xml": xml
- }
- print(json.dumps(data))
- response = requests.post("http://api.geweapi.com/gewe/v2/api/message/downloadVoice", json=data, headers=headers)
- if response.ok:
- data = response.json()
- if data['ret'] == 200:
- print("Gewe download audio msg successfully.")
- print(data['data']['fileUrl'])
- return data['data']['fileUrl']
- else:
- print("Gewe download audio msg in error.")
- return False
- else:
- return False
-
-
- def download_image_msg(xml: str):
- data = {
- "appId": "wx_HtqjtTglwIKjQHSsSUYDi",
- "type": 2,
- "xml": xml
- }
- print(json.dumps(data))
- response = requests.post("http://api.geweapi.com/gewe/v2/api/message/downloadImage", json=data, headers=headers)
- if response.ok:
- data = response.json()
- if data['ret'] == 200:
- print("Gewe download audio msg successfully.")
- print(data['data']['fileUrl'])
- return data['data']['fileUrl']
- else:
- print("Gewe download audio msg in error.")
- return False
- else:
- return False
-
-
- def download_audio_file(fileUrl: str, file_name: str):
- # 定义保存文件的本地路径和文件名
- local_filename = f'./silk/{file_name}.silk'
-
- # 使用requests库的get方法获取文件内容
- response = requests.get(fileUrl, stream=True)
-
- # 检查请求是否成功
- if response.status_code == 200:
- # 打开文件以二进制写入模式
- with open(local_filename, 'wb') as f:
- # 逐块写入文件,通常使用1024字节的块大小
- for chunk in response.iter_content(1024):
- f.write(chunk)
- print(f"文件已成功下载到 {local_filename}")
- else:
- print(f"请求失败,状态码: {response.status_code}")
|