|
- import requests
- import json
- import base64
- import io
- import json
- import os
- import threading
- import time
- import uuid
- import requests
-
- from io import BytesIO
- from PIL import Image
- from common import redis_helper
- from common.log import logger
-
- wxchat=None
-
-
- class GeWeChatCom:
- def __init__(self, base_url):
- self.base_url = base_url
-
- ############################### 登录模块 ###############################
- def check_login(self, token_id, app_id, uuid,captch_code=""):
- '''
- 执行登录(步骤3)
-
- 获取到登录二维码后需每间隔5s调用本接口来判断是否登录成功
-
- 新设备登录平台,次日凌晨会掉线一次,重新登录时需调用获取二维码且传appId取码,登录成功后则可以长期在线
-
- 登录成功后请保存appId与wxid的对应关系,后续接口中会用到
-
- '''
- api_url = f"{self.base_url}/v2/api/login/checkLogin"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "uuid": uuid,
- "captchCode":captch_code
- }
- if captch_code=="":
- data = {
- "appId": app_id,
- "uuid": uuid
- }
-
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- #response_data = response.json()
- # print(response_data)
- # return response_data.get('data')
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def get_login_qr_code(self, token_id,app_id="",region_id="440000"):
- '''
- 获取登录二维码(步骤2)
-
-
- appId参数为设备ID,首次登录传空,会自动触发创建设备,掉线后重新登录则必须传接口返回的appId,注意同一个号避免重复创建设备,以免触发官方风控
-
- 取码时传的appId需要与上次登录扫码的微信一致,否则会导致登录失败
-
- 响应结果中的qrImgBase64为微信二维码图片的base64,前端需要将二维码图片展示给用户并进行手机扫码操作(PS: 扫码后调用步骤2,手机上才显示登录)。
- (或使用响应结果中的qrData生成二维码)
- '''
- api_url = f"{self.base_url}/v2/api/login/getLoginQrCode"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- # if app_id=="":
-
- # data = {
- # "appId": app_id
- # }
- # else:
- # data = {
- # "appId": app_id,
- # "regionId":region_id
- # }
-
- data = {
- "appId": app_id,
- "regionId":region_id
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_data = response.json()
- data=json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)
- logger.info(f'{token_id} 的登录APP信息:{data}')
- return response_data.get('data')
-
- def qrCallback(self,uuid, base64_string):
- try:
- from PIL import Image
- base64_string = base64_string.split(',')[1]
- img_data = base64.b64decode(base64_string)
- img = Image.open(io.BytesIO(img_data))
- _thread = threading.Thread(target=img.show, args=("QRCode",))
- _thread.setDaemon(True)
- _thread.start()
- except Exception as e:
- pass
-
- import qrcode
-
- # url = f"https://login.weixin.qq.com/l/{uuid}"
- # http://weixin.qq.com/x/4b7fY2d93zNCXhHFkNk8
-
- url = f"http://weixin.qq.com/x/{uuid}"
-
- qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url)
- qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url)
- qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url)
- qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url)
- print("You can also scan QRCode in any website below:")
- print(qr_api3)
- print(qr_api4)
- print(qr_api2)
- print(qr_api1)
- # _send_qr_code([qr_api3, qr_api4, qr_api2, qr_api1])
- qr = qrcode.QRCode(border=1)
- qr.add_data(url)
- qr.make(fit=True)
- qr.print_ascii(invert=True)
- return [qr_api1 ,qr_api2, qr_api3, qr_api4]
- ############################### 账号管理 ###############################
- def reconnection(self,token_id,app_id):
- '''
- 断线重连
-
- 当系统返回账号已离线,但是手机顶部还显示ipad在线,可用此接口尝试重连,若返回错误/失败则必须重新调用步骤一登录
-
- 本接口非常用接口,可忽略
-
- '''
- api_url = f"{self.base_url}/v2/api/login/reconnection"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response = response.json()
- print(response)
- return response
-
- def logout(self,token_id,app_id):
- '''
- 退出
- '''
-
- api_url = f"{self.base_url}/v2/api/login/logout"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_data = response.json()
- print(response_data)
- return response_data.get('data')
-
- def check_online(self,token_id,app_id):
- '''
- 检查是否在线
-
- 响应结果的data=true则是在线,反之为离线
- '''
-
- api_url = f"{self.base_url}/v2/api/login/checkOnline"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_data = response.json()
- print(response_data)
- return response_data.get('data')
-
-
- ############################### 联系人模块 ###############################
- def fetch_contacts_list(self, token_id, app_id):
- '''
- 获取通讯录列表
-
- 本接口为长耗时接口,耗时时间根据好友数量递增,若接口返回超时可通过获取通讯录列表缓存接口获取响应结果
-
- 本接口返回的群聊仅为保存到通讯录中的群聊,若想获取会话列表中的所有群聊,需要通过消息订阅做二次处理。
- 原因:当未获取的群有成员在群内发消息的话会有消息回调, 开发者此刻调用获取群详情接口查询群信息入库保存即可,
- 比如说手机上三年前不说话的群,侧滑删除了,用户手机上也不会看到被删除的群聊的 ,但是有群成员说了话他会显示,
- 原理就是各个终端(Android、IOS、桌面版微信)取得了消息回调,又去获取群详情信息,本地数据库缓存了下来,显示的手机群聊,让用户感知的。
- '''
-
- api_url = f"{self.base_url}/v2/api/contacts/fetchContactsList"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def fetch_contacts_list_cache(self, token_id, app_id):
- '''
- 获取通讯录列表缓存
-
- 通讯录列表数据缓存10分钟,超时则需要重新调用获取通讯录列表接口
- '''
-
- api_url = f"{self.base_url}/v2/api/contacts/fetchContactsListCache"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_data = response.json()
- print(response_data)
- return response_data.get('data')
-
- def get_brief_info(self,token_id, app_id,wxids):
- '''
- 获取群/好友简要信息
- 1<= wxids <=100
- '''
- api_url = f"{self.base_url}/v2/api/contacts/getBriefInfo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxids":wxids # list 1<= wxids <=100
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_data = response.json()
- # print(response_data)
- return response_data.get('data')
-
- def get_detail_info(self,token_id, app_id,wxids):
- '''
- 获取群/好友详细信息
- 1<= wxids <=20
- '''
- api_url = f"{self.base_url}/v2/api/contacts/getDetailInfo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxids":wxids # list 1<= wxids <=20
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_data = response.json()
- print(response_data)
- return response_data.get('data')
-
- def delete_friend(self,token_id, app_id,friend_wxid):
- '''
- 删除好友
- '''
- api_url = f"{self.base_url}/v2/api/contacts/deleteFriend"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxid":friend_wxid
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def set_friend_remark(self,token_id, app_id,friend_wxid,remark):
- '''
- 设置好友备注
- '''
- api_url = f"{self.base_url}/v2/api/contacts/setFriendRemark"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxid":friend_wxid,
- "remark":remark
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- ############################### 消息模块 ###############################
- def post_text(self,token_id,app_id,to_wxid,content):
- api_url = f"{self.base_url}/v2/api/message/postText"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "content": content
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def post_image(self,token_id,app_id,to_wxid,img_url):
- api_url = f"{self.base_url}/v2/api/message/postImage"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "imgUrl": img_url
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def post_voice(self,token_id,app_id,to_wxid,voice_url,voice_duration):
- api_url = f"{self.base_url}/v2/api/message/postVoice"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "voiceUrl": voice_url,
- "voiceDuration":voice_duration
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def post_video(self,token_id,app_id,to_wxid,video_url,video_thumb_url,video_duration):
- api_url = f"{self.base_url}/v2/api/message/postVideo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "videoUrl": video_url,
- "videoDuration":video_duration,
- "videoThumbUrl":video_thumb_url
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def forward_image(self,token_id,app_id,to_wxid,aeskey,cdnthumburl,cdnthumblength,cdnthumbheight,cdnthumbwidth,length,md5):
- api_url = f"{self.base_url}/v2/api/message/forwardImage"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "xml": f"<?xml version=\"1.0\"?>\n<msg>\n\t<img aeskey=\"{aeskey}\" encryver=\"1\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnthumburl}\" cdnthumblength=\"{cdnthumblength}\" cdnthumbheight=\"{cdnthumbheight}\" cdnthumbwidth=\"{cdnthumbwidth}\" cdnmidheight=\"0\" cdnmidwidth=\"0\" cdnhdheight=\"0\" cdnhdwidth=\"0\" cdnmidimgurl=\"{cdnthumburl}\" length=\"{length}\" md5=\"{md5}\" />\n\t<platform_signature></platform_signature>\n\t<imgdatahash></imgdatahash>\n</msg>"
-
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('data',None),response_object.get('ret',None),response_object.get('msg',None)
-
- def forward_video(self,token_id,app_id,to_wxid,aeskey,cdnvideourl,length):
- api_url = f"{self.base_url}/v2/api/message/forwardVideo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "xml": f"<?xml version=\"1.0\"?>\n<msg>\n\t<videomsg aeskey=\"{aeskey}\" cdnvideourl=\"{cdnvideourl}\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnvideourl}\" length=\"{length}\" playlength=\"7\" cdnthumblength=\"8192\" cdnthumbwidth=\"135\" cdnthumbheight=\"240\" fromusername=\"zhangchuan2288\" md5=\"8804c121e9db91dd844f7a34035beb88\" newmd5=\"\" isplaceholder=\"0\" rawmd5=\"\" rawlength=\"0\" cdnrawvideourl=\"\" cdnrawvideoaeskey=\"\" overwritenewmsgid=\"0\" originsourcemd5=\"\" isad=\"0\" />\n</msg>"
-
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('data',None),response_object.get('ret',None),response_object.get('msg',None)
-
- def add_contacts(self,token_id:str,app_id:str,scene:int,option:int,v3:str,v4:str,content:str):
- api_url = f"{self.base_url}/v2/api/contacts/addContacts"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "scene": scene,
- "option": option,
- "v3":v3,
- "v4":v4,
- "content":content
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- print(response_object)
- return response_object.get('ret',None),response_object.get('msg',None)
-
- def check_relation(self,token_id, app_id,wxids:list):
- '''
- 检查好友关系
- '''
- api_url = f"{self.base_url}/v2/api/contacts/checkRelation"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
-
- data = {
- "appId": app_id,
- "wxids":wxids # list 1<= wxids <=20
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- ############################### 下载模块 ###############################
- def download_audio_msg(self,token_id:str,app_id:str,msg_id: int, xml: str):
- data = {
- "appId": app_id,
- "msgId": msg_id,
- "xml": xml
- }
- print(json.dumps(data))
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- # http://api.geweapi.com/gewe/v2/api/gewe/v2/api/message/downloadVoice
- # response = requests.post(f"{self.base_url}/v2/api/gewe/v2/api/message/downloadVoice", json=data, headers=headers)
- url='http://api.geweapi.com/gewe/v2/api/message/downloadVoice'
- # url='http://api.geweapi.com/gewe/v2/api/gewe/v2/api/message/downloadVoice'
- response = requests.post(f"{url}", json=data, headers=headers)
-
- if response.ok:
- data = response.json()
- print(data)
- if data['ret'] == 200:
- print("Gewe download audio msg successfully.")
- print(data['data']['fileUrl'])
- return data['data']['fileUrl']
- else:
- print("Gewe download audio msg in error.")
- return False
- else:
- return False
-
- def download_image_msg(self,token_id:str,app_id:str,xml: str):
- data = {
- "appId": app_id,
- "type": 2,
- "xml": xml
- }
- print(json.dumps(data))
-
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
-
- response = requests.post(f"{self.base_url}/v2/api/message/downloadImage", json=data, headers=headers)
- if response.ok:
- data = response.json()
- # print(data)
- if data['ret'] == 200:
- print("Gewe download image msg successfully.")
- print(data['data']['fileUrl'])
- return data['data']['fileUrl']
- else:
- print("Gewe download image msg in error.")
- return False
- else:
- return False
-
- def download_audio_file(fileUrl: str, file_name: str):
- # 定义保存文件的本地路径和文件名
- local_filename = f'./silk/{file_name}.silk'
-
- # 使用requests库的get方法获取文件内容
- response = requests.get(fileUrl, stream=True)
-
- # 检查请求是否成功
- if response.status_code == 200:
- # 打开文件以二进制写入模式
- with open(local_filename, 'wb') as f:
- # 逐块写入文件,通常使用1024字节的块大小
- for chunk in response.iter_content(1024):
- f.write(chunk)
- print(f"文件已成功下载到 {local_filename}")
- else:
- print(f"请求失败,状态码: {response.status_code}")
-
-
-
- ############################### 群模块 ###############################
- def get_chatroom_info(self, token_id, app_id, chatroom_id):
- '''
- 获取群信息
- '''
- api_url = f"{self.base_url}/v2/api/group/getChatroomInfo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "chatroomId": chatroom_id
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def add_group_member_as_friend(self, token_id, app_id, chatroom_id, member_wxid, content):
- '''
- 添加群成员为好友
- '''
- api_url = f"{self.base_url}/v2/api/group/addGroupMemberAsFriend"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "chatroomId": chatroom_id,
- "content": content,
- "memberWxid": member_wxid,
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def save_contract_list(self, token_id, app_id, chatroom_id,oper_type):
- '''
- 群保存到通讯录
- 操作类型 3保存到通讯录 2从通讯录移除
- '''
- api_url = f"{self.base_url}/v2/api/group/saveContractList"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
-
- data = {
- "appId": app_id,
- "chatroomId": chatroom_id,
- "operType": oper_type
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def get_group_memberlist(self,token_id,app_id,chatroom_id):
- api_url = f"{self.base_url}/v2/api/group/getChatroomMemberList"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "chatroomId": chatroom_id,
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- #print(response_object)
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- ############################### 朋友圈模块 ###################################
- # 在新设备登录后的1-3天内,您将无法使用朋友圈发布、点赞、评论等功能。在此期间,如果尝试进行这些操作,您将收到来自微信团队的提醒。请注意遵守相关规定。
-
- def sns_visible_scope(self, token_id, app_id,option):
- '''
- 朋友圈可见范围 option 可选项
- 1:全部
-
- 2:最近半年
-
- 3:最近一个月
-
- 4:最近三天
- '''
- api_url = f"{self.base_url}/v2/api/sns/snsVisibleScope"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "option": option,
-
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def stranger_visibility_enabled(self, token_id, app_id,enabled:bool):
- '''
- 是否允许陌生人查看朋友圈
- '''
- api_url = f"{self.base_url}/v2/api/sns/strangerVisibilityEnabled"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "enabled": enabled
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def send_text_sns(self, token_id, app_id,content):
- '''
- 发送文字朋友圈
- '''
- api_url = f"{self.base_url}/v2/api/sns/sendTextSns"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "content": content
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def send_image_sns(self, token_id, app_id,content,img_infos:list):
- '''
- 发送图片朋友圈
- '''
- api_url = f"{self.base_url}/v2/api/sns/sendImgSns"
- print(api_url)
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "allowWxIds": [],
- "atWxIds": [],
- "disableWxIds": [],
- "content":content,
- "imgInfos": img_infos, # 通过上传朋友圈图片接口获取
- "privacy": False
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- print(response_object)
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def send_video_sns(self, token_id, app_id,content:str,video_info:object):
- '''
- 发送视频朋友圈
- '''
- api_url = f"{self.base_url}/v2/api/sns/sendVideoSns"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "content":content,
- "allowWxIds": [],
- "atWxIds": [],
- "disableWxIds": [],
- "videoInfo":video_info,
- "privacy": False
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def upload_sns_image(self, token_id, app_id,img_urls:list):
- '''
- 上传朋友圈图片
- '''
- api_url = f"{self.base_url}/v2/api/sns/uploadSnsImage"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "imgUrls": img_urls
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- def upload_sns_video(self, token_id, app_id,video_url:str,video_thumb_url:str):
- '''
- 上传朋友圈视频
- '''
- api_url = f"{self.base_url}/v2/api/sns/uploadSnsVideo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "thumbUrl": video_thumb_url,
- "videoUrl": video_url,
- }
- response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
- response_object = response.json()
- return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
-
- ############################### 其他 ###############################
- def save_session_messages_to_cache(self, hash_key,item:object)->list:
- '''
- 对话列表
- '''
- messages=redis_helper.redis_helper.get_hash(hash_key)
- wxid=hash_key.split(':')[-1]
- if not messages:
- messages=[{"role": "system", "content": ""}]
- messages.append(item)
- redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},600)
- else:
- messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
- messages = json.loads(messages_str) if messages_str else []
- #判断是否含有图片
- last_message = messages[-1]
- content = last_message.get("content", [])
- if isinstance(content, list) and content:
- last_content_type = content[-1].get("type")
- if last_content_type == 'image_url':
- content.append(item['content'][0])
- messages[-1]['content']=content
- else:
- messages.append(item)
- else:
- if last_message!= item:
- messages.append(item)
- redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},600)
- return messages
-
- def get_contacts_brief_from_cache(self, wxid)->list:
- """
- 获取联系人信息保存到 Redis 缓存。
- """
- hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
- cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data")
- return json.loads(cache_str) if cache_str else []
-
- def save_contacts_brief_to_cache(self, token_id, app_id, wxid, contacts_wxids: list)->list:
- """
- 将联系人信息保存到 Redis 缓存。
- """
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
-
- # 获取缓存中的数据
- cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data")
- cache = json.loads(cache_str) if cache_str else []
- # 回调处理
- if len(contacts_wxids) == 1:
- cache_wxids = [f['userName'] for f in cache]
- friends_brief = self.get_brief_info(token_id, app_id, contacts_wxids)
- if contacts_wxids[0] in cache_wxids:
- # 替换已经存在的数据
- for i in range(len(cache)):
- if cache[i]['userName'] == contacts_wxids[0]:
- cache[i] = friends_brief[0]
- else:
- cache.extend(f for f in friends_brief if f["nickName"])
- friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]]
- if friends_no_brief_wxid:
- detailed_info = self.get_detail_info(token_id, app_id, friends_no_brief_wxid)
- cache.extend(detailed_info)
-
- # 分批处理
- else:
- cache=[]
- # 缓存为空,分批处理 contacts_wxids
- batch_size = 100
- for i in range(0, len(contacts_wxids), batch_size):
- batch = contacts_wxids[i:i + batch_size]
- friends_brief = self.get_brief_info(token_id, app_id, batch)
-
- cache.extend(f for f in friends_brief if f["nickName"])
-
- friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]]
- if friends_no_brief_wxid:
- detailed_info = self.get_detail_info(token_id, app_id, friends_no_brief_wxid)
- cache.extend(detailed_info)
-
- # 更新缓存
-
- redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False))
- return cache
-
- def delete_contacts_brief_from_cache(self, wxid, contacts_wxids: list):
- """
- 删除联系人信息保存到 Redis 缓存。
- """
- hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
- cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data")
- cache = json.loads(cache_str) if cache_str else []
-
-
- # 将 contacts_wxids 转换为集合,提高查找效率
- wxids_set = set(contacts_wxids)
-
- # 过滤 cache:保留 userName 不在集合中的对象
- filtered_cache = [contact for contact in cache if contact["userName"] not in wxids_set]
-
- # # 如果需要原地修改原 cache 列表:
- # cache[:] = filtered_cache
- redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(filtered_cache, ensure_ascii=False))
-
- def save_groups_info_to_cache(self, token_id, app_id, wxid, chatroom_ids: list):
- """
- 将群信息保存到 Redis 缓存。
- """
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
-
- # 获取当前缓存中所有的 chatroom_id
- existing_chatrooms = redis_helper.redis_helper.get_hash(hash_key)
-
- # 找出需要删除的 chatroom_ids
- chatrooms_to_delete = set(existing_chatrooms.keys()) - set(chatroom_ids)
-
- # 删除缓存中不再需要的 chatroom_id 数据
- for chatroom_id in chatrooms_to_delete:
- redis_helper.redis_helper.delete_hash_field(hash_key, chatroom_id)
-
- for chatroom_id in chatroom_ids:
- # 获取群信息
- ret, msg, data = self.get_chatroom_info(token_id, app_id, chatroom_id)
- if ret != 200:
- continue
-
- # 更新缓存
- redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
- time.sleep(1)
-
- def save_groups_members_to_cache(self, token_id, app_id, wxid, chatroom_ids: list):
- """
- 将群成员保存到 Redis 缓存。
- """
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
-
- # 获取当前缓存中所有的 chatroom_id
- existing_chatrooms = redis_helper.redis_helper.get_hash(hash_key)
-
- # 找出需要删除的 chatroom_ids
- chatrooms_to_delete = set(existing_chatrooms.keys()) - set(chatroom_ids)
-
- # 删除缓存中不再需要的 chatroom_id 数据
- for chatroom_id in chatrooms_to_delete:
- redis_helper.redis_helper.delete_hash_field(hash_key, chatroom_id)
-
- for chatroom_id in chatroom_ids:
- # 获取群信息
- ret, msg, data = self.get_group_memberlist(token_id, app_id, chatroom_id)
- if ret != 200:
- continue
-
- # 更新缓存
- redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
- time.sleep(1)
-
- def update_group_members_to_cache(self, token_id, app_id, wxid, chatroom_id: str):
- """
- 更新将群信息保存到 Redis 缓存。
- """
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
-
- # 获取群信息
- ret, msg, data = self.get_group_memberlist(token_id, app_id, chatroom_id)
- redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
-
- def get_group_members_from_cache(self, wxid,chatroom_id)->dict:
- """
- 获取缓存中群成员。
- """
- hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
- cache = redis_helper.redis_helper.get_hash_field(hash_key,chatroom_id)
- groups=json.loads(cache) if cache else {}
- return groups
-
- def update_group_info_to_cache(self, token_id, app_id, wxid, chatroom_id: str):
- """
- 更新将群信息保存到 Redis 缓存。
- """
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
-
- # 获取群信息
- ret, msg, data = self.get_chatroom_info(token_id, app_id, chatroom_id)
- redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
-
-
- def get_groups_info_from_cache(self, wxid)->list:
- """
- 获取群信息保存到 Redis 缓存。
- """
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
- cache = redis_helper.redis_helper.get_hash(hash_key)
- groups=[json.loads(v) for v in cache.values()]
- return groups
-
- def get_group_info_from_cache(self, wxid,chatroom_id)->dict:
- """
- 获取群信息保存到 Redis 缓存。
- """
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
- cache = redis_helper.redis_helper.get_hash_field(hash_key,chatroom_id)
- groups=json.loads(cache) if cache else {}
- return groups
-
- def get_wxchat_config_from_cache(self, wxid):
- """
- 获取配置信息
- """
- hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG"
- config = redis_helper.redis_helper.get_hash_field(hash_key, wxid)
- return json.loads(config) if config else {}
-
- def save_wxchat_config(self, wxid, config):
- """
- 保存配置信息
- """
- hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG"
- redis_helper.redis_helper.update_hash_field(hash_key, wxid, json.dumps(config, ensure_ascii=False))
-
- def get_login_info_from_cache(self,tel):
- hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}"
- cache = redis_helper.redis_helper.get_hash(hash_key)
- return cache
-
- def save_login_wx_captch_code_to_cache(self,token_id,captch_code):
- hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}"
- redis_helper.redis_helper.set_hash(hash_key,{"data":captch_code},15)
-
- def get_login_wx_captch_code_to_cache(self,token_id)->str:
- hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}"
- r=redis_helper.redis_helper.get_hash_field(hash_key,"data")
- return r
-
- def acquire_login_lock(self, token_id, expire_time=10):
- hash_key = f"__AI_OPS_WX__:LOGINLOCK:{token_id}"
- identifier=str(uuid.uuid4())
- if redis_helper.redis_helper.client.setnx(hash_key, identifier):
- redis_helper.redis_helper.client.expire(hash_key, expire_time)
- return True
- return False
-
- def release_login_lock(self, token_id):
- hash_key = f"__AI_OPS_WX__:LOGINLOCK:{token_id}"
- redis_helper.redis_helper.client.delete(hash_key)
-
-
- def start():
- global wxchat
- # base_url = "http://192.168.88.11:2531"
- # wxchat = GeWeChat(base_url)
- wxchat = GeWeChatCom('http://api.geweapi.com/gewe')
|