import requests
import json
import base64
import io
import json
import os
import threading
import time
import requests
from io import BytesIO
from PIL import Image
from common import redis_helper
from common.log import logger
wxchat=None
class GeWeChatCom:
def __init__(self, base_url):
self.base_url = base_url
############################### 登录模块 ###############################
def check_login(self, token_id, app_id, uuid,captch_code=""):
'''
执行登录(步骤3)
获取到登录二维码后需每间隔5s调用本接口来判断是否登录成功
新设备登录平台,次日凌晨会掉线一次,重新登录时需调用获取二维码且传appId取码,登录成功后则可以长期在线
登录成功后请保存appId与wxid的对应关系,后续接口中会用到
'''
api_url = f"{self.base_url}/v2/api/login/checkLogin"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"uuid": uuid,
"captchCode":captch_code
}
if captch_code=="":
data = {
"appId": app_id,
"uuid": uuid
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
#response_data = response.json()
# print(response_data)
# return response_data.get('data')
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def get_login_qr_code(self, token_id,app_id="",region_id="440000"):
'''
获取登录二维码(步骤2)
appId参数为设备ID,首次登录传空,会自动触发创建设备,掉线后重新登录则必须传接口返回的appId,注意同一个号避免重复创建设备,以免触发官方风控
取码时传的appId需要与上次登录扫码的微信一致,否则会导致登录失败
响应结果中的qrImgBase64为微信二维码图片的base64,前端需要将二维码图片展示给用户并进行手机扫码操作(PS: 扫码后调用步骤2,手机上才显示登录)。
(或使用响应结果中的qrData生成二维码)
'''
api_url = f"{self.base_url}/v2/api/login/getLoginQrCode"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
# if app_id=="":
# data = {
# "appId": app_id
# }
# else:
# data = {
# "appId": app_id,
# "regionId":region_id
# }
data = {
"appId": app_id,
"regionId":region_id
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_data = response.json()
data=json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)
logger.info(f'{token_id} 的登录APP信息:{data}')
return response_data.get('data')
def qrCallback(self,uuid, base64_string):
try:
from PIL import Image
base64_string = base64_string.split(',')[1]
img_data = base64.b64decode(base64_string)
img = Image.open(io.BytesIO(img_data))
_thread = threading.Thread(target=img.show, args=("QRCode",))
_thread.setDaemon(True)
_thread.start()
except Exception as e:
pass
import qrcode
# url = f"https://login.weixin.qq.com/l/{uuid}"
# http://weixin.qq.com/x/4b7fY2d93zNCXhHFkNk8
url = f"http://weixin.qq.com/x/{uuid}"
qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url)
qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url)
qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url)
qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url)
print("You can also scan QRCode in any website below:")
print(qr_api3)
print(qr_api4)
print(qr_api2)
print(qr_api1)
# _send_qr_code([qr_api3, qr_api4, qr_api2, qr_api1])
qr = qrcode.QRCode(border=1)
qr.add_data(url)
qr.make(fit=True)
qr.print_ascii(invert=True)
return [qr_api1 ,qr_api2, qr_api3, qr_api4]
############################### 账号管理 ###############################
def reconnection(self,token_id,app_id):
'''
断线重连
当系统返回账号已离线,但是手机顶部还显示ipad在线,可用此接口尝试重连,若返回错误/失败则必须重新调用步骤一登录
本接口非常用接口,可忽略
'''
api_url = f"{self.base_url}/v2/api/login/reconnection"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response = response.json()
print(response)
return response
def logout(self,token_id,app_id):
'''
退出
'''
api_url = f"{self.base_url}/v2/api/login/logout"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_data = response.json()
print(response_data)
return response_data.get('data')
def check_online(self,token_id,app_id):
'''
检查是否在线
响应结果的data=true则是在线,反之为离线
'''
api_url = f"{self.base_url}/v2/api/login/checkOnline"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_data = response.json()
print(response_data)
return response_data.get('data')
############################### 联系人模块 ###############################
def fetch_contacts_list(self, token_id, app_id):
'''
获取通讯录列表
本接口为长耗时接口,耗时时间根据好友数量递增,若接口返回超时可通过获取通讯录列表缓存接口获取响应结果
本接口返回的群聊仅为保存到通讯录中的群聊,若想获取会话列表中的所有群聊,需要通过消息订阅做二次处理。
原因:当未获取的群有成员在群内发消息的话会有消息回调, 开发者此刻调用获取群详情接口查询群信息入库保存即可,
比如说手机上三年前不说话的群,侧滑删除了,用户手机上也不会看到被删除的群聊的 ,但是有群成员说了话他会显示,
原理就是各个终端(Android、IOS、桌面版微信)取得了消息回调,又去获取群详情信息,本地数据库缓存了下来,显示的手机群聊,让用户感知的。
'''
api_url = f"{self.base_url}/v2/api/contacts/fetchContactsList"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def fetch_contacts_list_cache(self, token_id, app_id):
'''
获取通讯录列表缓存
通讯录列表数据缓存10分钟,超时则需要重新调用获取通讯录列表接口
'''
api_url = f"{self.base_url}/v2/api/contacts/fetchContactsListCache"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_data = response.json()
print(response_data)
return response_data.get('data')
def get_brief_info(self,token_id, app_id,wxids):
'''
获取群/好友简要信息
1<= wxids <=100
'''
api_url = f"{self.base_url}/v2/api/contacts/getBriefInfo"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"wxids":wxids # list 1<= wxids <=100
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_data = response.json()
# print(response_data)
return response_data.get('data')
def get_detail_info(self,token_id, app_id,wxids):
'''
获取群/好友详细信息
1<= wxids <=20
'''
api_url = f"{self.base_url}/v2/api/contacts/getDetailInfo"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"wxids":wxids # list 1<= wxids <=20
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_data = response.json()
print(response_data)
return response_data.get('data')
def delete_friend(self,token_id, app_id,friend_wxid):
'''
删除好友
'''
api_url = f"{self.base_url}/v2/api/contacts/deleteFriend"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"wxid":friend_wxid
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def set_friend_remark(self,token_id, app_id,friend_wxid,remark):
'''
设置好友备注
'''
api_url = f"{self.base_url}/v2/api/contacts/setFriendRemark"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"wxid":friend_wxid,
"remark":remark
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
############################### 消息模块 ###############################
def post_text(self,token_id,app_id,to_wxid,content):
api_url = f"{self.base_url}/v2/api/message/postText"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"toWxid": to_wxid,
"content": content
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def post_image(self,token_id,app_id,to_wxid,img_url):
api_url = f"{self.base_url}/v2/api/message/postImage"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"toWxid": to_wxid,
"imgUrl": img_url
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def post_voice(self,token_id,app_id,to_wxid,voice_url,voice_duration):
api_url = f"{self.base_url}/v2/api/message/postVoice"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"toWxid": to_wxid,
"voiceUrl": voice_url,
"voiceDuration":voice_duration
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def post_video(self,token_id,app_id,to_wxid,video_url,video_thumb_url,video_duration):
api_url = f"{self.base_url}/v2/api/message/postVideo"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"toWxid": to_wxid,
"videoUrl": video_url,
"videoDuration":video_duration,
"videoThumbUrl":video_thumb_url
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def forward_image(self,token_id,app_id,to_wxid,aeskey,cdnthumburl,cdnthumblength,cdnthumbheight,cdnthumbwidth,length,md5):
api_url = f"{self.base_url}/v2/api/message/forwardImage"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"toWxid": to_wxid,
"xml": f"\n\n\t
\n\t\n\t\n"
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('data',None),response_object.get('ret',None),response_object.get('msg',None)
def forward_video(self,token_id,app_id,to_wxid,aeskey,cdnvideourl,length):
api_url = f"{self.base_url}/v2/api/message/forwardVideo"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"toWxid": to_wxid,
"xml": f"\n\n\t\n"
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('data',None),response_object.get('ret',None),response_object.get('msg',None)
def add_contacts(self,token_id:str,app_id:str,scene:int,option:int,v3:str,v4:str,content:str):
api_url = f"{self.base_url}/v2/api/contacts/addContacts"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"scene": scene,
"option": option,
"v3":v3,
"v4":v4,
"content":content
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
print(response_object)
return response_object.get('ret',None),response_object.get('msg',None)
def check_relation(self,token_id, app_id,wxids:list):
'''
检查好友关系
'''
api_url = f"{self.base_url}/v2/api/contacts/checkRelation"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"wxids":wxids # list 1<= wxids <=20
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
############################### 下载模块 ###############################
def download_audio_msg(self,token_id:str,app_id:str,msg_id: int, xml: str):
data = {
"appId": app_id,
"msgId": msg_id,
"xml": xml
}
print(json.dumps(data))
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
# http://api.geweapi.com/gewe/v2/api/gewe/v2/api/message/downloadVoice
# response = requests.post(f"{self.base_url}/v2/api/gewe/v2/api/message/downloadVoice", json=data, headers=headers)
url='http://api.geweapi.com/gewe/v2/api/message/downloadVoice'
# url='http://api.geweapi.com/gewe/v2/api/gewe/v2/api/message/downloadVoice'
response = requests.post(f"{url}", json=data, headers=headers)
if response.ok:
data = response.json()
print(data)
if data['ret'] == 200:
print("Gewe download audio msg successfully.")
print(data['data']['fileUrl'])
return data['data']['fileUrl']
else:
print("Gewe download audio msg in error.")
return False
else:
return False
def download_image_msg(self,token_id:str,app_id:str,xml: str):
data = {
"appId": app_id,
"type": 2,
"xml": xml
}
print(json.dumps(data))
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
response = requests.post(f"{self.base_url}/v2/api/message/downloadImage", json=data, headers=headers)
if response.ok:
data = response.json()
# print(data)
if data['ret'] == 200:
print("Gewe download image msg successfully.")
print(data['data']['fileUrl'])
return data['data']['fileUrl']
else:
print("Gewe download image msg in error.")
return False
else:
return False
def download_audio_file(fileUrl: str, file_name: str):
# 定义保存文件的本地路径和文件名
local_filename = f'./silk/{file_name}.silk'
# 使用requests库的get方法获取文件内容
response = requests.get(fileUrl, stream=True)
# 检查请求是否成功
if response.status_code == 200:
# 打开文件以二进制写入模式
with open(local_filename, 'wb') as f:
# 逐块写入文件,通常使用1024字节的块大小
for chunk in response.iter_content(1024):
f.write(chunk)
print(f"文件已成功下载到 {local_filename}")
else:
print(f"请求失败,状态码: {response.status_code}")
############################### 群模块 ###############################
def get_chatroom_info(self, token_id, app_id, chatroom_id):
'''
获取群信息
'''
api_url = f"{self.base_url}/v2/api/group/getChatroomInfo"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"chatroomId": chatroom_id
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def add_group_member_as_friend(self, token_id, app_id, chatroom_id, member_wxid, content):
'''
添加群成员为好友
'''
api_url = f"{self.base_url}/v2/api/group/addGroupMemberAsFriend"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"chatroomId": chatroom_id,
"content": content,
"memberWxid": member_wxid,
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def save_contract_list(self, token_id, app_id, chatroom_id,oper_type):
'''
群保存到通讯录
操作类型 3保存到通讯录 2从通讯录移除
'''
api_url = f"{self.base_url}/v2/api/group/saveContractList"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"chatroomId": chatroom_id,
"operType": oper_type
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
############################### 朋友圈模块 ###################################
# 在新设备登录后的1-3天内,您将无法使用朋友圈发布、点赞、评论等功能。在此期间,如果尝试进行这些操作,您将收到来自微信团队的提醒。请注意遵守相关规定。
def sns_visible_scope(self, token_id, app_id,option):
'''
朋友圈可见范围 option 可选项
1:全部
2:最近半年
3:最近一个月
4:最近三天
'''
api_url = f"{self.base_url}/v2/api/sns/snsVisibleScope"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"option": option,
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def stranger_visibility_enabled(self, token_id, app_id,enabled:bool):
'''
是否允许陌生人查看朋友圈
'''
api_url = f"{self.base_url}/v2/api/sns/strangerVisibilityEnabled"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"enabled": enabled
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def send_text_sns(self, token_id, app_id,content):
'''
发送文字朋友圈
'''
api_url = f"{self.base_url}/v2/api/sns/sendTextSns"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"content": content
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def send_image_sns(self, token_id, app_id,content,img_infos:list):
'''
发送图片朋友圈
'''
api_url = f"{self.base_url}/v2/api/sns/sendImgSns"
print(api_url)
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"allowWxIds": [],
"atWxIds": [],
"disableWxIds": [],
"content":content,
"imgInfos": img_infos, # 通过上传朋友圈图片接口获取
"privacy": False
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
print(response_object)
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def send_video_sns(self, token_id, app_id,content:str,video_info:object):
'''
发送视频朋友圈
'''
api_url = f"{self.base_url}/v2/api/sns/sendVideoSns"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"content":content,
"allowWxIds": [],
"atWxIds": [],
"disableWxIds": [],
"videoInfo":video_info,
"privacy": False
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def upload_sns_image(self, token_id, app_id,img_urls:list):
'''
上传朋友圈图片
'''
api_url = f"{self.base_url}/v2/api/sns/uploadSnsImage"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"imgUrls": img_urls
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
def upload_sns_video(self, token_id, app_id,video_url:str,video_thumb_url:str):
'''
上传朋友圈视频
'''
api_url = f"{self.base_url}/v2/api/sns/uploadSnsVideo"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"thumbUrl": video_thumb_url,
"videoUrl": video_url,
}
response = requests.post(url=api_url, headers=headers, data=json.dumps(data))
response_object = response.json()
return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None)
############################### 其他 ###############################
def save_session_messages_to_cache(self, hash_key,item:object)->list:
'''
对话列表
'''
messages=redis_helper.redis_helper.get_hash(hash_key)
wxid=hash_key.split(':')[-1]
if not messages:
messages=[{"role": "system", "content": ""}]
messages.append(item)
redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},600)
else:
messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
messages = json.loads(messages_str) if messages_str else []
#判断是否含有图片
last_message = messages[-1]
content = last_message.get("content", [])
if isinstance(content, list) and content:
last_content_type = content[-1].get("type")
if last_content_type == 'image_url':
content.append(item['content'][0])
messages[-1]['content']=content
else:
messages.append(item)
else:
if last_message!= item:
messages.append(item)
redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},600)
return messages
def get_contacts_brief_from_cache(self, wxid)->list:
"""
获取联系人信息保存到 Redis 缓存。
"""
hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data")
return json.loads(cache_str) if cache_str else []
def save_contacts_brief_to_cache(self, token_id, app_id, wxid, contacts_wxids: list)->list:
"""
将联系人信息保存到 Redis 缓存。
"""
# Redis 缓存的 key
hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
# 获取缓存中的数据
# cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data")
# cache = json.loads(cache_str) if cache_str else []
cache=[]
# 缓存为空,分批处理 contacts_wxids
batch_size = 100
for i in range(0, len(contacts_wxids), batch_size):
batch = contacts_wxids[i:i + batch_size]
friends_brief = self.get_brief_info(token_id, app_id, batch)
friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]]
cache.extend(f for f in friends_brief if f["nickName"])
if friends_no_brief_wxid:
detailed_info = self.get_detail_info(token_id, app_id, friends_no_brief_wxid)
cache.extend(detailed_info)
# 更新缓存
redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False))
return cache
def delete_contacts_brief_from_cache(self, wxid, contacts_wxids: list):
"""
删除联系人信息保存到 Redis 缓存。
"""
hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data")
cache = json.loads(cache_str) if cache_str else []
# 将 contacts_wxids 转换为集合,提高查找效率
wxids_set = set(contacts_wxids)
# 过滤 cache:保留 userName 不在集合中的对象
filtered_cache = [contact for contact in cache if contact["userName"] not in wxids_set]
# # 如果需要原地修改原 cache 列表:
# cache[:] = filtered_cache
redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(filtered_cache, ensure_ascii=False))
def save_groups_info_to_cache(self, token_id, app_id, wxid, chatroom_ids: list):
"""
将群信息保存到 Redis 缓存。
"""
# Redis 缓存的 key
hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
# 获取当前缓存中所有的 chatroom_id
existing_chatrooms = redis_helper.redis_helper.get_hash(hash_key)
# 找出需要删除的 chatroom_ids
chatrooms_to_delete = set(existing_chatrooms.keys()) - set(chatroom_ids)
# 删除缓存中不再需要的 chatroom_id 数据
for chatroom_id in chatrooms_to_delete:
redis_helper.redis_helper.delete_hash_field(hash_key, chatroom_id)
for chatroom_id in chatroom_ids:
# 获取群信息
ret, msg, data = self.get_chatroom_info(token_id, app_id, chatroom_id)
if ret != 200:
continue
# 更新缓存
redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
time.sleep(1)
def update_group_info_to_cache(self, token_id, app_id, wxid, chatroom_id: str):
"""
更新将群信息保存到 Redis 缓存。
"""
# Redis 缓存的 key
hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
# 获取群信息
ret, msg, data = self.get_chatroom_info(token_id, app_id, chatroom_id)
redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
def get_groups_info_from_cache(self, wxid)->list:
"""
获取群信息保存到 Redis 缓存。
"""
hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
cache = redis_helper.redis_helper.get_hash(hash_key)
groups=[json.loads(v) for v in cache.values()]
return groups
def get_group_info_from_cache(self, wxid,chatroom_id)->dict:
"""
获取群信息保存到 Redis 缓存。
"""
hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
cache = redis_helper.redis_helper.get_hash_field(hash_key,chatroom_id)
groups=json.loads(cache) if cache else {}
return groups
def get_wxchat_config_from_cache(self, wxid):
"""
获取配置信息
"""
hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG"
config = redis_helper.redis_helper.get_hash_field(hash_key, wxid)
return json.loads(config) if config else {}
def save_wxchat_config(self, wxid, config):
"""
保存配置信息
"""
hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG"
redis_helper.redis_helper.update_hash_field(hash_key, wxid, json.dumps(config, ensure_ascii=False))
def get_login_info_from_cache(self,tel):
hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}"
cache = redis_helper.redis_helper.get_hash(hash_key)
return cache
def save_login_wx_captch_code_to_cache(self,token_id,captch_code):
hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}"
redis_helper.redis_helper.set_hash(hash_key,{"data":captch_code},15)
def get_login_wx_captch_code_to_cache(self,token_id)->str:
hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}"
r=redis_helper.redis_helper.get_hash_field(hash_key,"data")
return r
def start():
global wxchat
# base_url = "http://192.168.88.11:2531"
# wxchat = GeWeChat(base_url)
wxchat = GeWeChatCom('http://api.geweapi.com/gewe')