import requests import json import base64 import io import json import os import threading import time import uuid import requests from io import BytesIO from PIL import Image from common import redis_helper from common.log import logger wxchat=None class GeWeChatCom: def __init__(self, base_url): self.base_url = base_url ############################### 登录模块 ############################### def check_login(self, token_id, app_id, uuid,captch_code=""): ''' 执行登录(步骤3) 获取到登录二维码后需每间隔5s调用本接口来判断是否登录成功 新设备登录平台,次日凌晨会掉线一次,重新登录时需调用获取二维码且传appId取码,登录成功后则可以长期在线 登录成功后请保存appId与wxid的对应关系,后续接口中会用到 ''' api_url = f"{self.base_url}/v2/api/login/checkLogin" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "uuid": uuid, "captchCode":captch_code } if captch_code=="": data = { "appId": app_id, "uuid": uuid } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) #response_data = response.json() # print(response_data) # return response_data.get('data') response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def get_login_qr_code(self, token_id,app_id="",region_id="440000"): ''' 获取登录二维码(步骤2) appId参数为设备ID,首次登录传空,会自动触发创建设备,掉线后重新登录则必须传接口返回的appId,注意同一个号避免重复创建设备,以免触发官方风控 取码时传的appId需要与上次登录扫码的微信一致,否则会导致登录失败 响应结果中的qrImgBase64为微信二维码图片的base64,前端需要将二维码图片展示给用户并进行手机扫码操作(PS: 扫码后调用步骤2,手机上才显示登录)。 (或使用响应结果中的qrData生成二维码) ''' api_url = f"{self.base_url}/v2/api/login/getLoginQrCode" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } # if app_id=="": # data = { # "appId": app_id # } # else: # data = { # "appId": app_id, # "regionId":region_id # } data = { "appId": app_id, "regionId":region_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() data=json.dumps(response_data, separators=(',', ':'),ensure_ascii=False) #logger.info(f'{token_id} 的登录APP信息:{data}') return response_data.get('data') def qrCallback(self,uuid, base64_string): try: from PIL import Image base64_string = base64_string.split(',')[1] img_data = base64.b64decode(base64_string) img = Image.open(io.BytesIO(img_data)) _thread = threading.Thread(target=img.show, args=("QRCode",)) _thread.setDaemon(True) _thread.start() except Exception as e: pass import qrcode # url = f"https://login.weixin.qq.com/l/{uuid}" # http://weixin.qq.com/x/4b7fY2d93zNCXhHFkNk8 url = f"http://weixin.qq.com/x/{uuid}" qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url) qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url) qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url) qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url) print("You can also scan QRCode in any website below:") print(qr_api3) print(qr_api4) print(qr_api2) print(qr_api1) # _send_qr_code([qr_api3, qr_api4, qr_api2, qr_api1]) qr = qrcode.QRCode(border=1) qr.add_data(url) qr.make(fit=True) qr.print_ascii(invert=True) return [qr_api1 ,qr_api2, qr_api3, qr_api4] ############################### 账号管理 ############################### def reconnection(self,token_id,app_id): ''' 断线重连 当系统返回账号已离线,但是手机顶部还显示ipad在线,可用此接口尝试重连,若返回错误/失败则必须重新调用步骤一登录 本接口非常用接口,可忽略 ''' api_url = f"{self.base_url}/v2/api/login/reconnection" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response = response.json() print(response) return response def logout(self,token_id,app_id): ''' 退出 ''' api_url = f"{self.base_url}/v2/api/login/logout" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() print(response_data) return response_data.get('data') def check_online(self,token_id,app_id): ''' 检查是否在线 响应结果的data=true则是在线,反之为离线 ''' api_url = f"{self.base_url}/v2/api/login/checkOnline" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() print(response_data) return response_data.get('data') ############################### 联系人模块 ############################### def fetch_contacts_list(self, token_id, app_id): ''' 获取通讯录列表 本接口为长耗时接口,耗时时间根据好友数量递增,若接口返回超时可通过获取通讯录列表缓存接口获取响应结果 本接口返回的群聊仅为保存到通讯录中的群聊,若想获取会话列表中的所有群聊,需要通过消息订阅做二次处理。 原因:当未获取的群有成员在群内发消息的话会有消息回调, 开发者此刻调用获取群详情接口查询群信息入库保存即可, 比如说手机上三年前不说话的群,侧滑删除了,用户手机上也不会看到被删除的群聊的 ,但是有群成员说了话他会显示, 原理就是各个终端(Android、IOS、桌面版微信)取得了消息回调,又去获取群详情信息,本地数据库缓存了下来,显示的手机群聊,让用户感知的。 ''' api_url = f"{self.base_url}/v2/api/contacts/fetchContactsList" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def fetch_contacts_list_cache(self, token_id, app_id): ''' 获取通讯录列表缓存 通讯录列表数据缓存10分钟,超时则需要重新调用获取通讯录列表接口 ''' api_url = f"{self.base_url}/v2/api/contacts/fetchContactsListCache" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() print(response_data) return response_data.get('data') def get_brief_info(self,token_id, app_id,wxids): ''' 获取群/好友简要信息 1<= wxids <=100 ''' api_url = f"{self.base_url}/v2/api/contacts/getBriefInfo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxids":wxids # list 1<= wxids <=100 } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() # print(response_data) return response_data.get('data') def get_detail_info(self,token_id, app_id,wxids): ''' 获取群/好友详细信息 1<= wxids <=20 ''' api_url = f"{self.base_url}/v2/api/contacts/getDetailInfo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxids":wxids # list 1<= wxids <=20 } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() print(response_data) return response_data.get('data') def delete_friend(self,token_id, app_id,friend_wxid): ''' 删除好友 ''' api_url = f"{self.base_url}/v2/api/contacts/deleteFriend" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxid":friend_wxid } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def set_friend_remark(self,token_id, app_id,friend_wxid,remark): ''' 设置好友备注 ''' api_url = f"{self.base_url}/v2/api/contacts/setFriendRemark" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxid":friend_wxid, "remark":remark } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) ############################### 消息模块 ############################### def post_text(self,token_id,app_id,to_wxid,content): api_url = f"{self.base_url}/v2/api/message/postText" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "content": content } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def post_image(self,token_id,app_id,to_wxid,img_url): api_url = f"{self.base_url}/v2/api/message/postImage" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "imgUrl": img_url } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def post_voice(self,token_id,app_id,to_wxid,voice_url,voice_duration): api_url = f"{self.base_url}/v2/api/message/postVoice" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "voiceUrl": voice_url, "voiceDuration":voice_duration } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def post_video(self,token_id,app_id,to_wxid,video_url,video_thumb_url,video_duration): api_url = f"{self.base_url}/v2/api/message/postVideo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "videoUrl": video_url, "videoDuration":video_duration, "videoThumbUrl":video_thumb_url } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def forward_image(self,token_id,app_id,to_wxid,aeskey,cdnthumburl,cdnthumblength,cdnthumbheight,cdnthumbwidth,length,md5): api_url = f"{self.base_url}/v2/api/message/forwardImage" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "xml": f"\n\n\t\n\t\n\t\n" } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('data',None),response_object.get('ret',None),response_object.get('msg',None) def forward_video(self,token_id,app_id,to_wxid,aeskey,cdnvideourl,length): api_url = f"{self.base_url}/v2/api/message/forwardVideo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "xml": f"\n\n\t\n" } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('data',None),response_object.get('ret',None),response_object.get('msg',None) def add_contacts(self,token_id:str,app_id:str,scene:int,option:int,v3:str,v4:str,content:str): api_url = f"{self.base_url}/v2/api/contacts/addContacts" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "scene": scene, "option": option, "v3":v3, "v4":v4, "content":content } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() print(response_object) return response_object.get('ret',None),response_object.get('msg',None) def check_relation(self,token_id, app_id,wxids:list): ''' 检查好友关系 ''' api_url = f"{self.base_url}/v2/api/contacts/checkRelation" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxids":wxids # list 1<= wxids <=20 } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) ############################### 下载模块 ############################### def download_audio_msg(self,token_id:str,app_id:str,msg_id: int, xml: str): data = { "appId": app_id, "msgId": msg_id, "xml": xml } print(json.dumps(data)) headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } # http://api.geweapi.com/gewe/v2/api/gewe/v2/api/message/downloadVoice # response = requests.post(f"{self.base_url}/v2/api/gewe/v2/api/message/downloadVoice", json=data, headers=headers) url='http://api.geweapi.com/gewe/v2/api/message/downloadVoice' # url='http://api.geweapi.com/gewe/v2/api/gewe/v2/api/message/downloadVoice' response = requests.post(f"{url}", json=data, headers=headers) if response.ok: data = response.json() print(data) if data['ret'] == 200: print("Gewe download audio msg successfully.") print(data['data']['fileUrl']) return data['data']['fileUrl'] else: print("Gewe download audio msg in error.") return False else: return False def download_image_msg(self,token_id:str,app_id:str,xml: str): data = { "appId": app_id, "type": 2, "xml": xml } print(json.dumps(data)) headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } response = requests.post(f"{self.base_url}/v2/api/message/downloadImage", json=data, headers=headers) if response.ok: data = response.json() # print(data) if data['ret'] == 200: print("Gewe download image msg successfully.") print(data['data']['fileUrl']) return data['data']['fileUrl'] else: print("Gewe download image msg in error.") return False else: return False def download_audio_file(fileUrl: str, file_name: str): # 定义保存文件的本地路径和文件名 local_filename = f'./silk/{file_name}.silk' # 使用requests库的get方法获取文件内容 response = requests.get(fileUrl, stream=True) # 检查请求是否成功 if response.status_code == 200: # 打开文件以二进制写入模式 with open(local_filename, 'wb') as f: # 逐块写入文件,通常使用1024字节的块大小 for chunk in response.iter_content(1024): f.write(chunk) print(f"文件已成功下载到 {local_filename}") else: print(f"请求失败,状态码: {response.status_code}") ############################### 群模块 ############################### def get_chatroom_info(self, token_id, app_id, chatroom_id): ''' 获取群信息 ''' api_url = f"{self.base_url}/v2/api/group/getChatroomInfo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "chatroomId": chatroom_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def add_group_member_as_friend(self, token_id, app_id, chatroom_id, member_wxid, content): ''' 添加群成员为好友 ''' api_url = f"{self.base_url}/v2/api/group/addGroupMemberAsFriend" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "chatroomId": chatroom_id, "content": content, "memberWxid": member_wxid, } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def save_contract_list(self, token_id, app_id, chatroom_id,oper_type): ''' 群保存到通讯录 操作类型 3保存到通讯录 2从通讯录移除 ''' api_url = f"{self.base_url}/v2/api/group/saveContractList" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "chatroomId": chatroom_id, "operType": oper_type } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def get_group_memberlist(self,token_id,app_id,chatroom_id): api_url = f"{self.base_url}/v2/api/group/getChatroomMemberList" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "chatroomId": chatroom_id, } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() #print(response_object) return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) ############################### 朋友圈模块 ################################### # 在新设备登录后的1-3天内,您将无法使用朋友圈发布、点赞、评论等功能。在此期间,如果尝试进行这些操作,您将收到来自微信团队的提醒。请注意遵守相关规定。 def sns_visible_scope(self, token_id, app_id,option): ''' 朋友圈可见范围 option 可选项 1:全部 2:最近半年 3:最近一个月 4:最近三天 ''' api_url = f"{self.base_url}/v2/api/sns/snsVisibleScope" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "option": option, } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def stranger_visibility_enabled(self, token_id, app_id,enabled:bool): ''' 是否允许陌生人查看朋友圈 ''' api_url = f"{self.base_url}/v2/api/sns/strangerVisibilityEnabled" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "enabled": enabled } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def send_text_sns(self, token_id, app_id,content): ''' 发送文字朋友圈 ''' api_url = f"{self.base_url}/v2/api/sns/sendTextSns" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "content": content } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def send_image_sns(self, token_id, app_id,content,img_infos:list): ''' 发送图片朋友圈 ''' api_url = f"{self.base_url}/v2/api/sns/sendImgSns" print(api_url) headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "allowWxIds": [], "atWxIds": [], "disableWxIds": [], "content":content, "imgInfos": img_infos, # 通过上传朋友圈图片接口获取 "privacy": False } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() print(response_object) return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def send_video_sns(self, token_id, app_id,content:str,video_info:object): ''' 发送视频朋友圈 ''' api_url = f"{self.base_url}/v2/api/sns/sendVideoSns" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "content":content, "allowWxIds": [], "atWxIds": [], "disableWxIds": [], "videoInfo":video_info, "privacy": False } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def upload_sns_image(self, token_id, app_id,img_urls:list): ''' 上传朋友圈图片 ''' api_url = f"{self.base_url}/v2/api/sns/uploadSnsImage" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "imgUrls": img_urls } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def upload_sns_video(self, token_id, app_id,video_url:str,video_thumb_url:str): ''' 上传朋友圈视频 ''' api_url = f"{self.base_url}/v2/api/sns/uploadSnsVideo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "thumbUrl": video_thumb_url, "videoUrl": video_url, } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) ############################### 其他 ############################### def save_session_messages_to_cache(self, hash_key,item:object)->list: ''' 对话列表 ''' messages=redis_helper.redis_helper.get_hash(hash_key) wxid=hash_key.split(':')[-1] if not messages: messages=[{"role": "system", "content": ""}] messages.append(item) redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},600) else: messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") messages = json.loads(messages_str) if messages_str else [] #判断是否含有图片 last_message = messages[-1] content = last_message.get("content", []) if isinstance(content, list) and content: last_content_type = content[-1].get("type") if last_content_type == 'image_url': content.append(item['content'][0]) messages[-1]['content']=content else: messages.append(item) else: if last_message!= item: messages.append(item) redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},600) return messages def get_contacts_brief_from_cache(self, wxid)->list: """ 获取联系人信息保存到 Redis 缓存。 """ hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") return json.loads(cache_str) if cache_str else [] def save_contacts_brief_to_cache(self, token_id, app_id, wxid, contacts_wxids: list)->list: """ 将联系人信息保存到 Redis 缓存。 """ # Redis 缓存的 key hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" # 获取缓存中的数据 cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") cache = json.loads(cache_str) if cache_str else [] # 回调处理 if len(contacts_wxids) == 1: cache_wxids = [f['userName'] for f in cache] friends_brief = self.get_brief_info(token_id, app_id, contacts_wxids) if contacts_wxids[0] in cache_wxids: # 替换已经存在的数据 for i in range(len(cache)): if cache[i]['userName'] == contacts_wxids[0]: cache[i] = friends_brief[0] else: cache.extend(f for f in friends_brief if f["nickName"]) friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]] if friends_no_brief_wxid: detailed_info = self.get_detail_info(token_id, app_id, friends_no_brief_wxid) cache.extend(detailed_info) # 分批处理 else: cache=[] # 缓存为空,分批处理 contacts_wxids batch_size = 100 for i in range(0, len(contacts_wxids), batch_size): batch = contacts_wxids[i:i + batch_size] friends_brief = self.get_brief_info(token_id, app_id, batch) cache.extend(f for f in friends_brief if f["nickName"]) friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]] if friends_no_brief_wxid: detailed_info = self.get_detail_info(token_id, app_id, friends_no_brief_wxid) cache.extend(detailed_info) # 更新缓存 redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) return cache def delete_contacts_brief_from_cache(self, wxid, contacts_wxids: list): """ 删除联系人信息保存到 Redis 缓存。 """ hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") cache = json.loads(cache_str) if cache_str else [] # 将 contacts_wxids 转换为集合,提高查找效率 wxids_set = set(contacts_wxids) # 过滤 cache:保留 userName 不在集合中的对象 filtered_cache = [contact for contact in cache if contact["userName"] not in wxids_set] # # 如果需要原地修改原 cache 列表: # cache[:] = filtered_cache redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(filtered_cache, ensure_ascii=False)) def save_groups_info_to_cache(self, token_id, app_id, wxid, chatroom_ids: list): """ 将群信息保存到 Redis 缓存。 """ # Redis 缓存的 key hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" # 获取当前缓存中所有的 chatroom_id existing_chatrooms = redis_helper.redis_helper.get_hash(hash_key) # 找出需要删除的 chatroom_ids chatrooms_to_delete = set(existing_chatrooms.keys()) - set(chatroom_ids) # 删除缓存中不再需要的 chatroom_id 数据 for chatroom_id in chatrooms_to_delete: redis_helper.redis_helper.delete_hash_field(hash_key, chatroom_id) for chatroom_id in chatroom_ids: # 获取群信息 ret, msg, data = self.get_chatroom_info(token_id, app_id, chatroom_id) if ret != 200: continue # 更新缓存 redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False)) time.sleep(1) def save_groups_members_to_cache(self, token_id, app_id, wxid, chatroom_ids: list): """ 将群成员保存到 Redis 缓存。 """ # Redis 缓存的 key hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}" # 获取当前缓存中所有的 chatroom_id existing_chatrooms = redis_helper.redis_helper.get_hash(hash_key) # 找出需要删除的 chatroom_ids chatrooms_to_delete = set(existing_chatrooms.keys()) - set(chatroom_ids) # 删除缓存中不再需要的 chatroom_id 数据 for chatroom_id in chatrooms_to_delete: redis_helper.redis_helper.delete_hash_field(hash_key, chatroom_id) for chatroom_id in chatroom_ids: # 获取群信息 ret, msg, data = self.get_group_memberlist(token_id, app_id, chatroom_id) if ret != 200: continue # 更新缓存 redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False)) time.sleep(1) def update_group_members_to_cache(self, token_id, app_id, wxid, chatroom_id: str): """ 更新将群信息保存到 Redis 缓存。 """ # Redis 缓存的 key hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}" # 获取群信息 ret, msg, data = self.get_group_memberlist(token_id, app_id, chatroom_id) redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False)) def get_group_members_from_cache(self, wxid,chatroom_id)->dict: """ 获取缓存中群成员。 """ hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}" cache = redis_helper.redis_helper.get_hash_field(hash_key,chatroom_id) groups=json.loads(cache) if cache else {} return groups def update_group_info_to_cache(self, token_id, app_id, wxid, chatroom_id: str): """ 更新将群信息保存到 Redis 缓存。 """ # Redis 缓存的 key hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" # 获取群信息 ret, msg, data = self.get_chatroom_info(token_id, app_id, chatroom_id) redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False)) def get_groups_info_from_cache(self, wxid)->list: """ 获取群信息保存到 Redis 缓存。 """ hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" cache = redis_helper.redis_helper.get_hash(hash_key) groups=[json.loads(v) for v in cache.values()] return groups def get_group_info_from_cache(self, wxid,chatroom_id)->dict: """ 获取群信息保存到 Redis 缓存。 """ hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" cache = redis_helper.redis_helper.get_hash_field(hash_key,chatroom_id) groups=json.loads(cache) if cache else {} return groups def get_wxchat_config_from_cache(self, wxid): """ 获取配置信息 """ hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG" config = redis_helper.redis_helper.get_hash_field(hash_key, wxid) return json.loads(config) if config else {} def save_wxchat_config(self, wxid, config): """ 保存配置信息 """ hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG" redis_helper.redis_helper.update_hash_field(hash_key, wxid, json.dumps(config, ensure_ascii=False)) def get_login_info_from_cache(self,tel): hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" cache = redis_helper.redis_helper.get_hash(hash_key) return cache def save_login_wx_captch_code_to_cache(self,token_id,captch_code): hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}" redis_helper.redis_helper.set_hash(hash_key,{"data":captch_code},30) def get_login_wx_captch_code_from_cache(self,token_id)->str: hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}" r=redis_helper.redis_helper.get_hash_field(hash_key,"data") return r def acquire_login_lock(self, token_id, expire_time=10): hash_key = f"__AI_OPS_WX__:LOGINLOCK:{token_id}" identifier=str(uuid.uuid4()) if redis_helper.redis_helper.client.setnx(hash_key, identifier): redis_helper.redis_helper.client.expire(hash_key, expire_time) return True return False def release_login_lock(self, token_id): hash_key = f"__AI_OPS_WX__:LOGINLOCK:{token_id}" redis_helper.redis_helper.client.delete(hash_key) def start(): global wxchat # base_url = "http://192.168.88.11:2531" # wxchat = GeWeChat(base_url) wxchat = GeWeChatCom('http://api.geweapi.com/gewe')