import requests import json import base64 import io import json import os import threading import time import requests from io import BytesIO from PIL import Image from common import redis_helper from common.log import logger wxchat=None class GeWeChatCom: def __init__(self, base_url): self.base_url = base_url ############################### 登录模块 ############################### def check_login(self, token_id, app_id, uuid,captch_code=""): ''' 执行登录(步骤3) 获取到登录二维码后需每间隔5s调用本接口来判断是否登录成功 新设备登录平台,次日凌晨会掉线一次,重新登录时需调用获取二维码且传appId取码,登录成功后则可以长期在线 登录成功后请保存appId与wxid的对应关系,后续接口中会用到 ''' api_url = f"{self.base_url}/v2/api/login/checkLogin" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "uuid": uuid, "captchCode":captch_code } if captch_code=="": data = { "appId": app_id, "uuid": uuid } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) #response_data = response.json() # print(response_data) # return response_data.get('data') response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def get_login_qr_code(self, token_id,app_id="",region_id="440000"): ''' 获取登录二维码(步骤2) appId参数为设备ID,首次登录传空,会自动触发创建设备,掉线后重新登录则必须传接口返回的appId,注意同一个号避免重复创建设备,以免触发官方风控 取码时传的appId需要与上次登录扫码的微信一致,否则会导致登录失败 响应结果中的qrImgBase64为微信二维码图片的base64,前端需要将二维码图片展示给用户并进行手机扫码操作(PS: 扫码后调用步骤2,手机上才显示登录)。 (或使用响应结果中的qrData生成二维码) ''' api_url = f"{self.base_url}/v2/api/login/getLoginQrCode" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } # if app_id=="": # data = { # "appId": app_id # } # else: # data = { # "appId": app_id, # "regionId":region_id # } data = { "appId": app_id, "regionId":region_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() data=json.dumps(response_data, separators=(',', ':'),ensure_ascii=False) logger.info(f'{token_id} 的登录APP信息:{data}') return response_data.get('data') def qrCallback(self,uuid, base64_string): try: from PIL import Image base64_string = base64_string.split(',')[1] img_data = base64.b64decode(base64_string) img = Image.open(io.BytesIO(img_data)) _thread = threading.Thread(target=img.show, args=("QRCode",)) _thread.setDaemon(True) _thread.start() except Exception as e: pass import qrcode # url = f"https://login.weixin.qq.com/l/{uuid}" # http://weixin.qq.com/x/4b7fY2d93zNCXhHFkNk8 url = f"http://weixin.qq.com/x/{uuid}" qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url) qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url) qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url) qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url) print("You can also scan QRCode in any website below:") print(qr_api3) print(qr_api4) print(qr_api2) print(qr_api1) # _send_qr_code([qr_api3, qr_api4, qr_api2, qr_api1]) qr = qrcode.QRCode(border=1) qr.add_data(url) qr.make(fit=True) qr.print_ascii(invert=True) return [qr_api1 ,qr_api2, qr_api3, qr_api4] ############################### 账号管理 ############################### def reconnection(self,token_id,app_id): ''' 断线重连 当系统返回账号已离线,但是手机顶部还显示ipad在线,可用此接口尝试重连,若返回错误/失败则必须重新调用步骤一登录 本接口非常用接口,可忽略 ''' api_url = f"{self.base_url}/v2/api/login/reconnection" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response = response.json() print(response) return response def logout(self,token_id,app_id): ''' 退出 ''' api_url = f"{self.base_url}/v2/api/login/logout" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() print(response_data) return response_data.get('data') def check_online(self,token_id,app_id): ''' 检查是否在线 响应结果的data=true则是在线,反之为离线 ''' api_url = f"{self.base_url}/v2/api/login/checkOnline" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() print(response_data) return response_data.get('data') ############################### 联系人模块 ############################### def fetch_contacts_list(self, token_id, app_id): ''' 获取通讯录列表 本接口为长耗时接口,耗时时间根据好友数量递增,若接口返回超时可通过获取通讯录列表缓存接口获取响应结果 本接口返回的群聊仅为保存到通讯录中的群聊,若想获取会话列表中的所有群聊,需要通过消息订阅做二次处理。 原因:当未获取的群有成员在群内发消息的话会有消息回调, 开发者此刻调用获取群详情接口查询群信息入库保存即可, 比如说手机上三年前不说话的群,侧滑删除了,用户手机上也不会看到被删除的群聊的 ,但是有群成员说了话他会显示, 原理就是各个终端(Android、IOS、桌面版微信)取得了消息回调,又去获取群详情信息,本地数据库缓存了下来,显示的手机群聊,让用户感知的。 ''' api_url = f"{self.base_url}/v2/api/contacts/fetchContactsList" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def fetch_contacts_list_cache(self, token_id, app_id): ''' 获取通讯录列表缓存 通讯录列表数据缓存10分钟,超时则需要重新调用获取通讯录列表接口 ''' api_url = f"{self.base_url}/v2/api/contacts/fetchContactsListCache" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() print(response_data) return response_data.get('data') def get_brief_info(self,token_id, app_id,wxids): ''' 获取群/好友简要信息 1<= wxids <=100 ''' api_url = f"{self.base_url}/v2/api/contacts/getBriefInfo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxids":wxids # list 1<= wxids <=100 } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() # print(response_data) return response_data.get('data') def get_detail_info(self,token_id, app_id,wxids): ''' 获取群/好友详细信息 1<= wxids <=20 ''' api_url = f"{self.base_url}/v2/api/contacts/getDetailInfo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxids":wxids # list 1<= wxids <=20 } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() print(response_data) return response_data.get('data') def delete_friend(self,token_id, app_id,friend_wxid): ''' 删除好友 ''' api_url = f"{self.base_url}/v2/api/contacts/deleteFriend" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxid":friend_wxid } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def set_friend_remark(self,token_id, app_id,friend_wxid,remark): ''' 设置好友备注 ''' api_url = f"{self.base_url}/v2/api/contacts/setFriendRemark" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxid":friend_wxid, "remark":remark } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) ############################### 消息模块 ############################### def post_text(self,token_id,app_id,to_wxid,content): api_url = f"{self.base_url}/v2/api/message/postText" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "content": content } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def post_image(self,token_id,app_id,to_wxid,img_url): api_url = f"{self.base_url}/v2/api/message/postImage" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "imgUrl": img_url } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def post_voice(self,token_id,app_id,to_wxid,voice_url,voice_duration): api_url = f"{self.base_url}/v2/api/message/postVoice" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "voiceUrl": voice_url, "voiceDuration":voice_duration } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def post_video(self,token_id,app_id,to_wxid,video_url,video_thumb_url,video_duration): api_url = f"{self.base_url}/v2/api/message/postVideo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "videoUrl": video_url, "videoDuration":video_duration, "videoThumbUrl":video_thumb_url } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def forward_image(self,token_id,app_id,to_wxid,aeskey,cdnthumburl,cdnthumblength,cdnthumbheight,cdnthumbwidth,length,md5): api_url = f"{self.base_url}/v2/api/message/forwardImage" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "xml": f"\n\n\t\n\t\n\t\n" } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('data',None),response_object.get('ret',None),response_object.get('msg',None) def forward_video(self,token_id,app_id,to_wxid,aeskey,cdnvideourl,length): api_url = f"{self.base_url}/v2/api/message/forwardVideo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "toWxid": to_wxid, "xml": f"\n\n\t\n" } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('data',None),response_object.get('ret',None),response_object.get('msg',None) def add_contacts(self,token_id:str,app_id:str,scene:int,option:int,v3:str,v4:str,content:str): api_url = f"{self.base_url}/v2/api/contacts/addContacts" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "scene": scene, "option": option, "v3":v3, "v4":v4, "content":content } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() print(response_object) return response_object.get('ret',None),response_object.get('msg',None) def check_relation(self,token_id, app_id,wxids:list): ''' 检查好友关系 ''' api_url = f"{self.base_url}/v2/api/contacts/checkRelation" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "wxids":wxids # list 1<= wxids <=20 } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) ############################### 下载模块 ############################### def download_audio_msg(self,token_id:str,app_id:str,msg_id: int, xml: str): data = { "appId": app_id, "msgId": msg_id, "xml": xml } print(json.dumps(data)) headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } # http://api.geweapi.com/gewe/v2/api/gewe/v2/api/message/downloadVoice # response = requests.post(f"{self.base_url}/v2/api/gewe/v2/api/message/downloadVoice", json=data, headers=headers) url='http://api.geweapi.com/gewe/v2/api/message/downloadVoice' # url='http://api.geweapi.com/gewe/v2/api/gewe/v2/api/message/downloadVoice' response = requests.post(f"{url}", json=data, headers=headers) if response.ok: data = response.json() print(data) if data['ret'] == 200: print("Gewe download audio msg successfully.") print(data['data']['fileUrl']) return data['data']['fileUrl'] else: print("Gewe download audio msg in error.") return False else: return False def download_image_msg(self,token_id:str,app_id:str,xml: str): data = { "appId": app_id, "type": 2, "xml": xml } print(json.dumps(data)) headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } response = requests.post(f"{self.base_url}/v2/api/message/downloadImage", json=data, headers=headers) if response.ok: data = response.json() # print(data) if data['ret'] == 200: print("Gewe download image msg successfully.") print(data['data']['fileUrl']) return data['data']['fileUrl'] else: print("Gewe download image msg in error.") return False else: return False def download_audio_file(fileUrl: str, file_name: str): # 定义保存文件的本地路径和文件名 local_filename = f'./silk/{file_name}.silk' # 使用requests库的get方法获取文件内容 response = requests.get(fileUrl, stream=True) # 检查请求是否成功 if response.status_code == 200: # 打开文件以二进制写入模式 with open(local_filename, 'wb') as f: # 逐块写入文件,通常使用1024字节的块大小 for chunk in response.iter_content(1024): f.write(chunk) print(f"文件已成功下载到 {local_filename}") else: print(f"请求失败,状态码: {response.status_code}") ############################### 群模块 ############################### def get_chatroom_info(self, token_id, app_id, chatroom_id): ''' 获取群信息 ''' api_url = f"{self.base_url}/v2/api/group/getChatroomInfo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "chatroomId": chatroom_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def add_group_member_as_friend(self, token_id, app_id, chatroom_id, member_wxid, content): ''' 添加群成员为好友 ''' api_url = f"{self.base_url}/v2/api/group/addGroupMemberAsFriend" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "chatroomId": chatroom_id, "content": content, "memberWxid": member_wxid, } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def save_contract_list(self, token_id, app_id, chatroom_id,oper_type): ''' 群保存到通讯录 操作类型 3保存到通讯录 2从通讯录移除 ''' api_url = f"{self.base_url}/v2/api/group/saveContractList" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "chatroomId": chatroom_id, "operType": oper_type } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) ############################### 朋友圈模块 ################################### # 在新设备登录后的1-3天内,您将无法使用朋友圈发布、点赞、评论等功能。在此期间,如果尝试进行这些操作,您将收到来自微信团队的提醒。请注意遵守相关规定。 def sns_visible_scope(self, token_id, app_id,option): ''' 朋友圈可见范围 option 可选项 1:全部 2:最近半年 3:最近一个月 4:最近三天 ''' api_url = f"{self.base_url}/v2/api/sns/snsVisibleScope" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "option": option, } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def stranger_visibility_enabled(self, token_id, app_id,enabled:bool): ''' 是否允许陌生人查看朋友圈 ''' api_url = f"{self.base_url}/v2/api/sns/strangerVisibilityEnabled" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "enabled": enabled } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def send_text_sns(self, token_id, app_id,content): ''' 发送文字朋友圈 ''' api_url = f"{self.base_url}/v2/api/sns/sendTextSns" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "content": content } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def send_image_sns(self, token_id, app_id,content,img_infos:list): ''' 发送图片朋友圈 ''' api_url = f"{self.base_url}/v2/api/sns/sendImgSns" print(api_url) headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "allowWxIds": [], "atWxIds": [], "disableWxIds": [], "content":content, "imgInfos": img_infos, # 通过上传朋友圈图片接口获取 "privacy": False } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() print(response_object) return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def send_video_sns(self, token_id, app_id,content:str,video_info:object): ''' 发送视频朋友圈 ''' api_url = f"{self.base_url}/v2/api/sns/sendVideoSns" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "content":content, "allowWxIds": [], "atWxIds": [], "disableWxIds": [], "videoInfo":video_info, "privacy": False } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def upload_sns_image(self, token_id, app_id,img_urls:list): ''' 上传朋友圈图片 ''' api_url = f"{self.base_url}/v2/api/sns/uploadSnsImage" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "imgUrls": img_urls } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) def upload_sns_video(self, token_id, app_id,video_url:str,video_thumb_url:str): ''' 上传朋友圈视频 ''' api_url = f"{self.base_url}/v2/api/sns/uploadSnsVideo" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, "thumbUrl": video_thumb_url, "videoUrl": video_url, } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_object = response.json() return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) ############################### 其他 ############################### def save_session_messages_to_cache(self, hash_key,item:object)->list: ''' 对话列表 ''' messages=redis_helper.redis_helper.get_hash(hash_key) wxid=hash_key.split(':')[-1] if not messages: messages=[{"role": "system", "content": ""}] messages.append(item) redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},600) else: messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") messages = json.loads(messages_str) if messages_str else [] #判断是否含有图片 last_message = messages[-1] content = last_message.get("content", []) if isinstance(content, list) and content: last_content_type = content[-1].get("type") if last_content_type == 'image_url': content.append(item['content'][0]) messages[-1]['content']=content else: messages.append(item) else: if last_message!= item: messages.append(item) redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},600) return messages def get_contacts_brief_from_cache(self, wxid)->list: """ 获取联系人信息保存到 Redis 缓存。 """ hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") return json.loads(cache_str) if cache_str else [] def save_contacts_brief_to_cache(self, token_id, app_id, wxid, contacts_wxids: list)->list: """ 将联系人信息保存到 Redis 缓存。 """ # Redis 缓存的 key hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" # 获取缓存中的数据 # cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") # cache = json.loads(cache_str) if cache_str else [] cache=[] # 缓存为空,分批处理 contacts_wxids batch_size = 100 for i in range(0, len(contacts_wxids), batch_size): batch = contacts_wxids[i:i + batch_size] friends_brief = self.get_brief_info(token_id, app_id, batch) friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]] cache.extend(f for f in friends_brief if f["nickName"]) if friends_no_brief_wxid: detailed_info = self.get_detail_info(token_id, app_id, friends_no_brief_wxid) cache.extend(detailed_info) # 更新缓存 redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) return cache def delete_contacts_brief_from_cache(self, wxid, contacts_wxids: list): """ 删除联系人信息保存到 Redis 缓存。 """ hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") cache = json.loads(cache_str) if cache_str else [] # 将 contacts_wxids 转换为集合,提高查找效率 wxids_set = set(contacts_wxids) # 过滤 cache:保留 userName 不在集合中的对象 filtered_cache = [contact for contact in cache if contact["userName"] not in wxids_set] # # 如果需要原地修改原 cache 列表: # cache[:] = filtered_cache redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(filtered_cache, ensure_ascii=False)) def save_contacts_brief_to_cache_prev(self, token_id, app_id, wxid, contacts_wxids: list): """ 将联系人信息保存到 Redis 缓存。 """ # Redis 缓存的 key hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}" # 获取缓存中的数据 cache_str = redis_helper.redis_helper.get_hash_field(hash_key, "data") cache = json.loads(cache_str) if cache_str else [] if not cache: # 缓存为空,分批处理 contacts_wxids batch_size = 100 for i in range(0, len(contacts_wxids), batch_size): batch = contacts_wxids[i:i + batch_size] friends_brief = self.get_brief_info(token_id, app_id, batch) friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]] cache.extend(f for f in friends_brief if f["nickName"]) if friends_no_brief_wxid: detailed_info = self.get_detail_info(token_id, app_id, friends_no_brief_wxid) cache.extend(detailed_info) # 更新缓存 redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) return # 缓存已存在,检查新联系人 existing_usernames = {contact['userName'] for contact in cache} new_contacts_wxids = [wxid for wxid in contacts_wxids if wxid not in existing_usernames] # 如果有新联系人,分批获取详细信息并更新缓存 if new_contacts_wxids: batch_size = 20 for i in range(0, len(new_contacts_wxids), batch_size): batch = new_contacts_wxids[i:i + batch_size] detailed_info = self.get_detail_info(token_id, app_id, batch) cache.extend(detailed_info) redis_helper.redis_helper.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False)) def save_groups_info_to_cache(self, token_id, app_id, wxid, chatroom_ids: list): """ 将群信息保存到 Redis 缓存。 """ # Redis 缓存的 key hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" # 获取当前缓存中所有的 chatroom_id existing_chatrooms = redis_helper.redis_helper.get_hash(hash_key) # 找出需要删除的 chatroom_ids chatrooms_to_delete = set(existing_chatrooms.keys()) - set(chatroom_ids) # 删除缓存中不再需要的 chatroom_id 数据 for chatroom_id in chatrooms_to_delete: redis_helper.redis_helper.delete_hash_field(hash_key, chatroom_id) for chatroom_id in chatroom_ids: # 获取群信息 ret, msg, data = self.get_chatroom_info(token_id, app_id, chatroom_id) if ret != 200: continue # 更新缓存 redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False)) time.sleep(1) def update_group_info_to_cache(self, token_id, app_id, wxid, chatroom_id: str): """ 更新将群信息保存到 Redis 缓存。 """ # Redis 缓存的 key hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" # 获取群信息 ret, msg, data = self.get_chatroom_info(token_id, app_id, chatroom_id) redis_helper.redis_helper.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False)) def get_groups_info_from_cache(self, wxid)->list: """ 获取群信息保存到 Redis 缓存。 """ hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" cache = redis_helper.redis_helper.get_hash(hash_key) groups=[json.loads(v) for v in cache.values()] return groups def get_group_info_from_cache(self, wxid,chatroom_id)->dict: """ 获取群信息保存到 Redis 缓存。 """ hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}" cache = redis_helper.redis_helper.get_hash_field(hash_key,chatroom_id) groups=json.loads(cache) if cache else {} return groups def get_wxchat_config_from_cache(self, wxid): """ 获取配置信息 """ hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG" config = redis_helper.redis_helper.get_hash_field(hash_key, wxid) return json.loads(config) if config else {} def save_wxchat_config(self, wxid, config): """ 保存配置信息 """ hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG" redis_helper.redis_helper.update_hash_field(hash_key, wxid, json.dumps(config, ensure_ascii=False)) def get_login_info_from_cache(self,tel): hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" cache = redis_helper.redis_helper.get_hash(hash_key) return cache def save_login_wx_captch_code_to_cache(self,token_id,captch_code): hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}" redis_helper.redis_helper.set_hash(hash_key,{"data":captch_code},15) def get_login_wx_captch_code_to_cache(self,token_id)->str: hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}" r=redis_helper.redis_helper.get_hash_field(hash_key,"data") return r def start(): global wxchat # base_url = "http://192.168.88.11:2531" # wxchat = GeWeChat(base_url) wxchat = GeWeChatCom('http://api.geweapi.com/gewe')