|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734 |
- import aiohttp
- import asyncio
- import json
- import base64
- import io
- import json
- import os
- import threading
- import time,datetime
- import uuid
- from fastapi import FastAPI, Depends
- from common.singleton import singleton
- from typing import Optional
- from model.models import WxChatMessage
-
- from aiohttp import ClientError
- from json.decoder import JSONDecodeError
- from common.log import logger
- from common.utils import check_chatroom
- from model.models import AddGroupContactsHistory
- from services.redis_service import RedisService
-
- #@singleton
- class GeWeService:
- _instance = None
- _lock = asyncio.Lock() # 异步锁,确保单例初始化线程安全
-
- def __init__(self,redis_service:RedisService, base_url: str):
- if GeWeService._instance is not None:
- raise RuntimeError("请使用 get_instance() 获取单例!")
- self.base_url = base_url
- self.redis_service=redis_service
-
- @classmethod
- async def get_instance(cls, app:FastAPI,base_url: str = "http://api.geweapi.com/gewe"):
- """
- 获取 GeWeChatCom 单例,确保只初始化一次。
- """
- async with cls._lock: # 确保多个协程不会并发创建多个实例
- if cls._instance is None:
- cls._instance = cls(app,base_url)
- return cls._instance
-
-
-
- ############################### 登录模块 ###############################
- async def check_login_async(self, token_id: str, app_id: str, uuid: str, captch_code: str = ""):
- """
- 执行登录(步骤3)
- 获取到登录二维码后需每间隔5s调用本接口来判断是否登录成功。
- """
- api_url = f"{self.base_url}/v2/api/login/checkLogin"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {"appId": app_id, "uuid": uuid}
- if captch_code:
- data["captchCode"] = captch_code
-
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
-
- logger.info(f'登录验证码-请求参数 {json.dumps(data)}')
- logger.info(f'登录验证码-返回响应 {response_object}')
-
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
-
- async def qrCallback(self, uuid: str, base64_string: str):
- """
- 处理二维码回调,显示二维码图片,并提供多种二维码访问链接
- """
- try:
- from PIL import Image
- base64_string = base64_string.split(',')[1]
- img_data = base64.b64decode(base64_string)
- img = Image.open(io.BytesIO(img_data))
- _thread = threading.Thread(target=img.show, args=("QRCode",))
- _thread.setDaemon(True)
- _thread.start()
- except Exception as e:
- logger.error(f"处理二维码回调时发生错误: {e}")
-
- url = f"http://weixin.qq.com/x/{uuid}"
- qr_api1 = f"https://api.isoyu.com/qr/?m=1&e=L&p=20&url={url}"
- qr_api2 = f"https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={url}"
- qr_api3 = f"https://api.pwmqr.com/qrcode/create/?url={url}"
- qr_api4 = f"https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={url}"
- logger.info("您也可以通过以下网站扫描二维码:")
- logger.info(f"{qr_api3}\n{qr_api4}\n{qr_api2}\n{qr_api1}")
- return [qr_api1, qr_api2, qr_api3, qr_api4]
-
- async def get_login_qr_code_async(self, token_id: str, app_id: str = "", region_id: str = "440000"):
- """
- 获取登录二维码(步骤2)
- 第一次登录时传空,后续登录时需传appId。
- """
- api_url = f"{self.base_url}/v2/api/login/getLoginQrCode"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "regionId": region_id
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_data = await response.json()
- logger.info(f'{token_id} 的登录APP信息:{json.dumps(response_data, separators=(",", ":"), ensure_ascii=False)}')
- return response_data.get('data')
-
- ############################### 账号管理 ###############################
- async def reconnection(self, token_id, app_id):
- '''
- 断线重连
- 当系统返回账号已离线,但是手机顶部还显示ipad在线,可用此接口尝试重连,若返回错误/失败则必须重新调用步骤一登录
- 本接口非常用接口,可忽略
- '''
- api_url = f"{self.base_url}/v2/api/login/reconnection"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object
-
- async def logout_async(self, token_id, app_id):
- '''
- 退出
- '''
- api_url = f"{self.base_url}/v2/api/login/logout"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_data = await response.json()
- return response_data.get('data')
-
- async def check_online(self, token_id, app_id):
- '''
- 检查是否在线
- 响应结果的data=true则是在线,反之为离线
- '''
- api_url = f"{self.base_url}/v2/api/login/checkOnline"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_data = await response.json()
- return response_data.get('data')
-
- ############################### 联系人模块 ###############################
- async def fetch_contacts_list_async(self, token_id, app_id):
- '''
- 获取通讯录列表
-
- 本接口为长耗时接口,耗时时间根据好友数量递增,若接口返回超时可通过获取通讯录列表缓存接口获取响应结果
- 本接口返回的群聊仅为保存到通讯录中的群聊,若想获取会话列表中的所有群聊,需要通过消息订阅做二次处理。
- 原因:当未获取的群有成员在群内发消息的话会有消息回调, 开发者此刻调用获取群详情接口查询群信息入库保存即可,
- 比如说手机上三年前不说话的群,侧滑删除了,用户手机上也不会看到被删除的群聊的 ,但是有群成员说了话他会显示,
- 原理就是各个终端(Android、IOS、桌面版微信)取得了消息回调,又去获取群详情信息,本地数据库缓存了下来,显示的手机群聊,让用户感知的。
- '''
- api_url = f"{self.base_url}/v2/api/contacts/fetchContactsList"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def fetch_contacts_list_cache_async(self, token_id, app_id):
- '''
- 获取通讯录列表缓存
- 通讯录列表数据缓存10分钟,超时则需要重新调用获取通讯录列表接口
- '''
- api_url = f"{self.base_url}/v2/api/contacts/fetchContactsListCache"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_data = await response.json()
- return response_data.get('data')
-
- async def get_brief_info_async(self, token_id, app_id, wxids: list):
- '''
- 获取群/好友简要信息
- 1<= wxids <=100
- '''
- api_url = f"{self.base_url}/v2/api/contacts/getBriefInfo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxids": wxids # list 1<= wxids <=100
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_data = await response.json()
- return response_data.get('data')
-
- async def get_detail_info_async(self, token_id, app_id, wxids):
- '''
- 获取群/好友详细信息
- 1<= wxids <=20
- '''
- api_url = f"{self.base_url}/v2/api/contacts/getDetailInfo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxids": wxids # list 1<= wxids <=20
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_data = await response.json()
- return response_data.get('data')
-
- async def delete_friend_async(self, token_id, app_id, friend_wxid):
- '''
- 删除好友
- '''
- api_url = f"{self.base_url}/v2/api/contacts/deleteFriend"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxid": friend_wxid
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def set_friend_remark(self, token_id, app_id, friend_wxid, remark):
- '''
- 设置好友备注
- '''
- api_url = f"{self.base_url}/v2/api/contacts/setFriendRemark"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxid": friend_wxid,
- "remark": remark
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- ############################### 消息模块 ###############################
- async def post_text_async(self, token_id, app_id, to_wxid, content):
- api_url = f"{self.base_url}/v2/api/message/postText"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "content": content
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def post_image_async(self, token_id, app_id, to_wxid, img_url):
- api_url = f"{self.base_url}/v2/api/message/postImage"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "imgUrl": img_url
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers= headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def post_voice_async(self, token_id, app_id, to_wxid, voice_url, voice_duration):
- api_url = f"{self.base_url}/v2/api/message/postVoice"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "voiceUrl": voice_url,
- "voiceDuration": voice_duration
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def post_video_async(self, token_id, app_id, to_wxid, video_url, video_thumb_url, video_duration):
- api_url = f"{self.base_url}/v2/api/message/postVideo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "videoUrl": video_url,
- "videoDuration": video_duration,
- "thumbUrl": video_thumb_url
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- # 发送文件消息
- async def post_file_async(self, token_id, app_id, to_wxid, file_url, file_name):
- api_url = f"{self.base_url}/v2/api/message/postFile"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "fileUrl": file_url,
- "fileName": file_name,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def post_mini_app_aysnc(self, token_id, app_id,to_wxid, mini_app_id,display_name,page_path,cover_img_url,title,user_name):
- '''
- 发送小程序消息
- '''
- api_url = f"{self.base_url}/v2/api/message/postMiniApp"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "miniAppId": mini_app_id,
- "displayName": display_name,
- "pagePath": page_path,
- "coverImgUrl": cover_img_url,
- "title": title,
- "userName": user_name
- }
-
-
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def post_link_async(self,token_id, app_id,to_wxid, title,desc,link_url,thumb_url):
- '''
- 发送链接消息
- '''
- api_url = f"{self.base_url}/v2/api/message/postLink"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "title": title,
- "desc": desc,
- "linkUrl": link_url,
- "thumbUrl": thumb_url
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def post_name_card_async(self,token_id, app_id,to_wxid,nickName,nameCard_wxid):
- '''
- 发送名片消息
- '''
- api_url = f"{self.base_url}/v2/api/message/postNameCard"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "nickName": nickName,
- "nameCardWxid": nameCard_wxid
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- # 发送emoji消息
- async def post_emoji_async(self,token_id, app_id,to_wxid,emoji_md5,emoji_size):
- api_url = f"{self.base_url}/v2/api/message/postEmoji"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "emojiMd5": emoji_md5,
- "emojiSize": emoji_size
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- # 发送appmsg消息
- async def post_app_msg_async(self,token_id, app_id,to_wxid,appmsg):
- '''
- 本接口可用于发送所有包含节点的消息,例如:音乐分享、视频号、引用消息等等
- '''
- api_url = f"{self.base_url}/v2/api/message/postAppMsg"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "appmsg": appmsg
- }
-
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
-
-
- async def forward_image_async(self, token_id, app_id, to_wxid, aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5):
-
- api_url = f"{self.base_url}/v2/api/message/forwardImage"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "xml": f"<?xml version=\"1.0\"?>\n<msg>\n\t<img aeskey=\"{aeskey}\" encryver=\"1\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnthumburl}\" cdnthumblength=\"{cdnthumblength}\" cdnthumbheight=\"{cdnthumbheight}\" cdnthumbwidth=\"{cdnthumbwidth}\" cdnmidheight=\"0\" cdnmidwidth=\"0\" cdnhdheight=\"0\" cdnhdwidth=\"0\" cdnmidimgurl=\"{cdnthumburl}\" length=\"{length}\" md5=\"{md5}\" />\n\t<platform_signature></platform_signature>\n\t<imgdatahash></imgdatahash>\n</msg>"
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('data', None), response_object.get('ret', None), response_object.get('msg', None)
-
- async def forward_video_async(self, token_id, app_id, to_wxid, aeskey, cdnvideourl, length,video_duration):
- api_url = f"{self.base_url}/v2/api/message/forwardVideo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "xml": f"<?xml version=\"1.0\"?>\n<msg>\n\t<videomsg aeskey=\"{aeskey}\" cdnvideourl=\"{cdnvideourl}\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnvideourl}\" length=\"{length}\" playlength=\"{video_duration}\" cdnthumblength=\"8192\" cdnthumbwidth=\"135\" cdnthumbheight=\"240\" fromusername=\"zhangchuan2288\" md5=\"8804c121e9db91dd844f7a34035beb88\" newmd5=\"\" isplaceholder=\"0\" rawmd5=\"\" rawlength=\"0\" cdnrawvideourl=\"\" cdnrawvideoaeskey=\"\" overwritenewmsgid=\"0\" originsourcemd5=\"\" isad=\"0\" />\n</msg>"
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- # 转发文件
- async def forward_file_async(self, token_id, app_id, to_wxid, xml):
- api_url = f"{self.base_url}/v2/api/message/forwardFile"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "xml": f"<?xml version=\"1.0\"?>\n<msg>\n\t<appmsg appid=\"\" sdkver=\"0\">\n\t\t<title>info.json</title>\n\t\t<des />\n\t\t<action />\n\t\t<type>6</type>\n\t\t<showtype>0</showtype>\n\t\t<soundtype>0</soundtype>\n\t\t<mediatagname />\n\t\t<messageext />\n\t\t<messageaction />\n\t\t<content />\n\t\t<contentattr>0</contentattr>\n\t\t<url />\n\t\t<lowurl />\n\t\t<dataurl />\n\t\t<lowdataurl />\n\t\t<appattach>\n\t\t\t<totallen>63</totallen>\n\t\t\t<attachid>@cdn_3057020100044b304902010002043904752002032f7d6d02046bb5bade02046593760c042433653765306131612d646138622d346662322d383239362d3964343665623766323061370204051400050201000405004c53d900_f46be643aa0dc009ae5fb63bbc73335d_1</attachid>\n\t\t\t<emoticonmd5 />\n\t\t\t<fileext>json</fileext>\n\t\t\t<cdnattachurl>3057020100044b304902010002043904752002032f7d6d02046bb5bade02046593760c042433653765306131612d646138622d346662322d383239362d3964343665623766323061370204051400050201000405004c53d900</cdnattachurl>\n\t\t\t<aeskey>f46be643aa0dc009ae5fb63bbc73335d</aeskey>\n\t\t\t<encryver>0</encryver>\n\t\t\t<overwrite_newmsgid>594239960546299206</overwrite_newmsgid>\n\t\t\t<fileuploadtoken>v1_0bgfyCkUmoZYYyvXys0cCiJdd2R/pKPdD2TNi9IY6FOt+Tvlhp3ijUoupZHzyB2Lp7xYgdVFaUGL4iu3Pm9/YACCt20egPGpT+DKe+VymOzD7tJfsS8YW7JObTbN8eVoFEetU5HSRWTgS/48VVsPZMoDF6Gz1XJDLN/dWRxvzrbOzVGGNvmY4lpXb0kRwXkSxwL+dO4=</fileuploadtoken>\n\t\t</appattach>\n\t\t<extinfo />\n\t\t<sourceusername />\n\t\t<sourcedisplayname />\n\t\t<thumburl />\n\t\t<md5>d16070253eee7173e467dd7237d76f60</md5>\n\t\t<statextstr />\n\t</appmsg>\n\t<fromusername>zhangchuan2288</fromusername>\n\t<scene>0</scene>\n\t<appinfo>\n\t\t<version>1</version>\n\t\t<appname></appname>\n\t</appinfo>\n\t<commenturl></commenturl>\n</msg>"
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- # 转发链接
- async def forward_link_async(self, token_id, app_id, to_wxid, xml):
- api_url = f"{self.base_url}/v2/api/message/forwardLink"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "xml": xml
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- # 转发小程序
- async def forward_mini_app_async(self, token_id, app_id, to_wxid, xml,cover_img_url):
- api_url = f"{self.base_url}/v2/api/message/forwardMiniApp"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "toWxid": to_wxid,
- "xml": xml,
- "coverImgUrl":cover_img_url
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def add_contacts_async(self, token_id: str, app_id: str, scene: int, option: int, v3: str, v4: str, content: str):
- api_url = f"{self.base_url}/v2/api/contacts/addContacts"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "scene": scene,
- "option": option,
- "v3": v3,
- "v4": v4,
- "content": content
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None)
-
- async def check_relation(self, token_id, app_id, wxids: list):
- '''
- 检查好友关系
- '''
- api_url = f"{self.base_url}/v2/api/contacts/checkRelation"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxids": wxids # list 1<= wxids <=20
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
-
-
- ############################### 下载模块 ###############################
- async def download_audio_msg_async(self, token_id: str, app_id: str, msg_id: int, xml: str):
- data = {
- "appId": app_id,
- "msgId": msg_id,
- "xml": xml
- }
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- url = f'{self.base_url}/v2/api/message/downloadVoice'
- async with aiohttp.ClientSession() as session:
- async with session.post(url, json=data, headers=headers) as response:
- if response.ok:
- data = await response.json()
- if data['ret'] == 200:
- return data['data']['fileUrl']
- else:
- return False
- else:
- return False
-
- async def download_image_msg_async(self, token_id: str, app_id: str, xml: str):
- data = {
- "appId": app_id,
- "type": 2,
- "xml": xml
- }
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(f"{self.base_url}/v2/api/message/downloadImage", json=data, headers=headers) as response:
- if response.ok:
- data = await response.json()
- response_object = await response.json()
- if data['ret'] == 200:
- return data['data']['fileUrl']
- else:
- logger.warning(f'下载图片失败 {data.get("ret", None),data.get("msg", None), data.get("data", None)}')
- return ""
- else:
- logger.warning(f'下载图片失败 {response.status} {response.reason}')
- return ""
-
- async def download_cdn_msg_async(self, token_id:str,aeskey: str, file_id: str, type: str,total_size:str,suffix:str):
-
- api_url = f"{self.base_url}/v2/api/message/downloadCdn"
- data = {
- "aeskey": aeskey,
- "fileId":file_id,
- "type": type, #下载的文件类型 1:高清图片 2:常规图片 3:缩略图 4:视频 5:文件
- "totalSize":total_size, #文件大小
- "suffix": suffix #下载类型为文件时,传文件的后缀(例:doc)
- }
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
-
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def download_audio_file(self, fileUrl: str, file_name: str):
- local_filename = f'./silk/{file_name}.silk'
- async with aiohttp.ClientSession() as session:
- async with session.get(fileUrl) as response:
- if response.status == 200:
- with open(local_filename, 'wb') as f:
- while True:
- chunk = await response.content.read(1024)
- if not chunk:
- break
- f.write(chunk)
- return local_filename
- else:
- return None
-
- ############################### 群模块 ###############################
- async def get_chatroom_info_async(self, token_id, app_id, chatroom_id):
- '''
- 获取群信息
- '''
- api_url = f"{self.base_url}/v2/api/group/getChatroomInfo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "chatroomId": chatroom_id
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def add_group_member_as_friend_async(self, token_id, app_id, chatroom_id, member_wxid, content):
- '''
- 添加群成员为好友
- '''
- api_url = f"{self.base_url}/v2/api/group/addGroupMemberAsFriend"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "chatroomId": chatroom_id,
- "content": content,
- "memberWxid": member_wxid,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def save_contract_list_async(self, token_id, app_id, chatroom_id, oper_type):
- '''
- 群保存到通讯录
- 操作类型 3保存到通讯录 2从通讯录移除
- '''
- api_url = f"{self.base_url}/v2/api/group/saveContractList"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "chatroomId": chatroom_id,
- "operType": oper_type
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def get_group_memberlist_async(self, token_id, app_id, chatroom_id):
- '''
- 获取群成员列表
- '''
- api_url = f"{self.base_url}/v2/api/group/getChatroomMemberList"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "chatroomId": chatroom_id,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def agree_join_room_async(self, token_id, app_id, url):
- '''
- 同意入群
- '''
- api_url = f"{self.base_url}/v2/api/group/agreeJoinRoom"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "url": url,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
-
- ############################### 个人模块 ###############################
- async def get_profile_async(self, token_id, app_id):
- api_url = f"{self.base_url}/v2/api/personal/getProfile"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- ############################### 朋友圈模块 ###################################
- # 在新设备登录后的1-3天内,您将无法使用朋友圈发布、点赞、评论等功能。在此期间,如果尝试进行这些操作,您将收到来自微信团队的提醒。请注意遵守相关规定。
-
- async def sns_visible_scope(self, token_id, app_id, option):
- '''
- 朋友圈可见范围 option 可选项
- 1:全部
- 2:最近半年
- 3:最近一个月
- 4:最近三天
- '''
- api_url = f"{self.base_url}/v2/api/sns/snsVisibleScope"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "option": option,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def stranger_visibility_enabled(self, token_id, app_id, enabled: bool):
- '''
- 是否允许陌生人查看朋友圈
- '''
- api_url = f"{self.base_url}/v2/api/sns/strangerVisibilityEnabled"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "enabled": enabled
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def send_text_sns_async(self, token_id, app_id, content):
- '''
- 发送文字朋友圈
- '''
- api_url = f"{self.base_url}/v2/api/sns/sendTextSns"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "content": content
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def send_image_sns_async(self, token_id, app_id, content, img_infos: list):
- '''
- 发送图片朋友圈
- '''
- api_url = f"{self.base_url}/v2/api/sns/sendImgSns"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "allowWxIds": [],
- "atWxIds": [],
- "disableWxIds": [],
- "content": content,
- "imgInfos": img_infos, # 通过上传朋友圈图片接口获取
- "privacy": False
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def upload_sns_image_async(self, token_id, app_id, img_urls: list):
- '''
- 上传朋友圈图片
- '''
- api_url = f"{self.base_url}/v2/api/sns/uploadSnsImage"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "imgUrls": img_urls
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def upload_send_image_sns_async(self, token_id, app_id, content, img_infos: list):
- '''
- 上传并发送图片朋友圈
- '''
- ret, msg, data = await self.upload_sns_image_async(token_id, app_id, img_infos)
- if ret != 200:
- logger.warning(f'上传图片失败 {ret} {msg} {data}')
- return ret, msg, data
- ret, msg, data = await self.send_image_sns_async(token_id, app_id, content, data)
- if ret != 200:
- logger.warning(f'发送图片失败 {ret} {msg} {data}')
- return ret, msg, data
-
-
- async def send_video_sns_async(self, token_id, app_id, content: str, video_info: dict):
- '''
- 发送视频朋友圈
- '''
- api_url = f"{self.base_url}/v2/api/sns/sendVideoSns"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "content": content,
- "allowWxIds": [],
- "atWxIds": [],
- "disableWxIds": [],
- "videoInfo": video_info,
- "privacy": False
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def upload_sns_video_async(self, token_id, app_id, video_url: str, video_thumb_url: str):
- '''
- 上传朋友圈视频
- '''
- api_url = f"{self.base_url}/v2/api/sns/uploadSnsVideo"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "thumbUrl": video_thumb_url,
- "videoUrl": video_url,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def upload_send_video_sns_async(self, token_id, app_id, content: str, video_url: str, video_thumb_url: str):
- '''
- 上传并发送视频朋友圈
- '''
- ret, msg, data = self.upload_sns_video_async(token_id, app_id, video_url, video_thumb_url)
- if ret != 200:
- logger.warning(f'上传视频失败 {ret} {msg} {data}')
- return ret, msg, data
- ret,msg,data=self.send_video_sns_async(token_id, app_id, content, data)
- if ret != 200:
- logger.warning(f'发送视频失败 {ret} {msg} {data}')
- return ret,msg,data
-
- async def get_sns_list_async(self, token_id, app_id, sns_id: str):
- api_url = f"{self.base_url}/v2/api/sns/snsList"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "snsId": sns_id,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
-
- ############################### 标签模块 ###############################
- async def label_add_async(self, token_id, app_id, label_name):
- api_url = f"{self.base_url}/v2/api/label/add"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "labelName": label_name,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def label_delete_async(self, token_id, app_id, label_ids: list):
- api_url = f"{self.base_url}/v2/api/label/delete"
- label_ids_str = ','.join(map(str, label_ids))
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "labelIds": label_ids_str,
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- async def label_list_async(self, token_id, app_id):
- api_url = f"{self.base_url}/v2/api/label/list"
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- }
-
- try:
- async with aiohttp.ClientSession() as session:
- try:
- async with session.post(api_url, headers=headers, json=data) as response:
- # 检查响应状态码
- if response.status != 200:
- return response.status, f"HTTP Error: {response.status}", None
-
- try:
- response_object = await response.json()
- except JSONDecodeError:
- return 501, "Invalid JSON response", None
-
- # 检查返回的数据结构
- if not isinstance(response_object, dict):
- return 501, "Invalid response format", None
-
- # 返回处理后的数据
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
- except ClientError as e:
- return 501, f"Network error: {str(e)}", None
-
- except Exception as e:
- return None, f"Unexpected error: {str(e)}", None
-
- async def label_modify_members_async(self, token_id, app_id,label_ids: list,wx_ids: list):
- '''
- 注意
- 由于好友标签信息存储在用户客户端,因此每次在修改时都需要进行全量修改。举例来说,考虑好友A(wxid_asdfaihp123),该好友已经被标记为标签ID为1和2。
-
- 在添加标签ID为3时,传递的参数如下:labelIds:1,2,3,wxIds:[wxid_asdfaihp123]。这表示要给好友A添加标签ID为3,同时保留已有的标签ID 1和2。
-
- 而在删除标签ID为1时,传递的参数如下:labelIds:2,3 ,wxIds:[wxid_asdfaihp123]。这表示要将好友A的标签ID 1删除,而保留标签ID 2。
- '''
-
- api_url = f"{self.base_url}/v2/api/label/modifyMemberList"
- label_ids_str = ','.join(map(str, label_ids))
-
- headers = {
- 'X-GEWE-TOKEN': token_id,
- 'Content-Type': 'application/json'
- }
- data = {
- "appId": app_id,
- "wxIds": wx_ids,
- "labelIds": label_ids_str
- }
- async with aiohttp.ClientSession() as session:
- async with session.post(api_url, headers=headers, json=data) as response:
- response_object = await response.json()
- return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
-
-
- ############################### 其他 ###############################
-
- async def save_session_messages_to_cache_async(self, hash_key,item:object)->list:
- '''
- 对话列表
- '''
- messages = await self.redis_service.get_hash(hash_key)
- wxid=hash_key.split(':')[-1]
- if not messages:
- messages=[{"role": "system", "content": ""}]
- messages.append(item)
- await self.redis_service.set_hash(hash_key, {"data": json.dumps(messages, ensure_ascii=False)}, 600)
- else:
- messages_str = await self.redis_service.get_hash_field(hash_key, "data")
- messages = json.loads(messages_str) if messages_str else []
- #判断是否含有图片
- last_message = messages[-1]
- content = last_message.get("content", [])
- if isinstance(content, list) and content:
- last_content_type = content[-1].get("type")
- if last_content_type == 'image_url':
- content.append(item['content'][0])
- messages[-1]['content']=content
- else:
- messages.append(item)
- else:
- if last_message!= item:
- messages.append(item)
- await self.redis_service.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},600)
- return messages
-
- async def get_contacts_brief_from_cache_async(self, wxid)->list:
- """
- 获取联系人信息保存到 Redis 缓存。
- """
- hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
- cache_str = await self.redis_service.get_hash_field(hash_key, "data")
- return json.loads(cache_str) if cache_str else []
-
- async def save_contacts_brief_to_cache_async(self, token_id, app_id, wxid, contacts_wxids: list)->list:
- """
- 将联系人信息保存到 Redis 缓存。
- """
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
-
- # 获取缓存中的数据
- cache_str = await self.redis_service.get_hash_field(hash_key, "data")
- cache = json.loads(cache_str) if cache_str else []
- # 回调处理
- if len(contacts_wxids) == 1:
- cache_wxids = [f['userName'] for f in cache]
- friends_brief = await self.get_brief_info_async(token_id, app_id, contacts_wxids)
- if contacts_wxids[0] in cache_wxids:
- # 替换已经存在的数据
- for i in range(len(cache)):
- if cache[i]['userName'] == contacts_wxids[0]:
- cache[i] = friends_brief[0]
- else:
- cache.extend(f for f in friends_brief if f["nickName"])
- friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]]
- if friends_no_brief_wxid:
- detailed_info = await self.get_detail_info_async(token_id, app_id, friends_no_brief_wxid)
- cache.extend(detailed_info)
-
- # 分批处理
- else:
- cache=[]
- # 缓存为空,分批处理 contacts_wxids
- batch_size = 100
- for i in range(0, len(contacts_wxids), batch_size):
- batch = contacts_wxids[i:i + batch_size]
- friends_brief = await self.get_brief_info_async(token_id, app_id, batch)
-
- cache.extend(f for f in friends_brief if f["nickName"])
-
- friends_no_brief_wxid = [f['userName'] for f in friends_brief if not f["nickName"]]
- if friends_no_brief_wxid:
- detailed_info = await self.get_detail_info_async(token_id, app_id, friends_no_brief_wxid)
- cache.extend(detailed_info)
-
- # 更新缓存
-
- await self.redis_service.update_hash_field(hash_key, "data", json.dumps(cache, ensure_ascii=False))
- return cache
-
- async def delete_contacts_brief_from_cache_async(self, wxid, contacts_wxids: list):
- """
- 删除联系人信息保存到 Redis 缓存。
- """
- hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
- cache_str = await self.redis_service.get_hash_field(hash_key, "data")
- cache = json.loads(cache_str) if cache_str else []
-
-
- # 将 contacts_wxids 转换为集合,提高查找效率
- wxids_set = set(contacts_wxids)
-
- # 过滤 cache:保留 userName 不在集合中的对象
- filtered_cache_async = [contact for contact in cache if contact["userName"] not in wxids_set]
-
- # # 如果需要原地修改原 cache 列表:
- # cache[:] = filtered_cache_async
- await self.redis_service.update_hash_field(hash_key, "data", json.dumps(filtered_cache_async, ensure_ascii=False))
-
- async def save_groups_info_to_cache_async(self, token_id, app_id, wxid, chatroom_ids: list)->list:
- """
- 将群信息保存到 Redis 缓存。
- """
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
-
- # 获取当前缓存中所有的 chatroom_id
- existing_chatrooms = await self.redis_service.get_hash(hash_key)
-
- # 找出需要删除的 chatroom_ids
- chatrooms_to_delete = set(existing_chatrooms.keys()) - set(chatroom_ids)
-
- # 删除缓存中不再需要的 chatroom_id 数据
- for chatroom_id in chatrooms_to_delete:
- await self.redis_service.delete_hash_field(hash_key, chatroom_id)
-
- for chatroom_id in chatroom_ids:
-
- if not check_chatroom(chatroom_id):
- await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
- await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
- logger.info(f'{chatroom_id} 不是有效的群,不处理')
-
- continue
-
- # 获取群信息
- ret, msg, data =await self.get_chatroom_info_async(token_id, app_id, chatroom_id)
- if ret != 200:
- continue
- # print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
- # print('群信息save_groups_info_to_cache_async')
- # print(data)
- # print(bool(not data.get('memberList', [])))
- # print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
- if not data.get('memberList', []):
- await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
- await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
- logger.info(f'通过 成员列表为空 清理 {chatroom_id}群成员信息')
- continue
- # 更新缓存
- await self.redis_service.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
- await asyncio.sleep(0.1)
-
- async def save_groups_members_to_cache_async(self, token_id, app_id, wxid, chatroom_ids: list):
- """
- 将群成员保存到 Redis 缓存。
- """
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
-
- # 获取当前缓存中所有的 chatroom_id
- existing_chatrooms = await self.redis_service.get_hash(hash_key)
-
- # 找出需要删除的 chatroom_ids
- chatrooms_to_delete = set(existing_chatrooms.keys()) - set(chatroom_ids)
-
- # 删除缓存中不再需要的 chatroom_id 数据
- for chatroom_id in chatrooms_to_delete:
- await self.redis_service.delete_hash_field(hash_key, chatroom_id)
-
- for chatroom_id in chatroom_ids:
- # 获取群信息
- ret, msg, data = await self.get_group_memberlist_async(token_id, app_id, chatroom_id)
- if ret != 200:
- continue
- # 更新缓存
- await self.redis_service.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
- await asyncio.sleep(0.1)
-
- async def update_group_members_to_cache_async(self, token_id, app_id, wxid, chatroom_id: str):
- """
- 更新将群信息保存到 Redis 缓存。
- """
-
- if not check_chatroom(chatroom_id):
- return
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
-
- # 获取群信息
- ret, msg, data = await self.get_group_memberlist_async(token_id, app_id, chatroom_id)
- if ret != 200:
- if msg in '获取群成员列表异常:null':
- await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
- await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
- logger.info(f'通过 [获取群成员列表异常:null] 清理 {chatroom_id}群成员信息')
- return
- else:
- logger.error(f"获取{chatroom_id}群成员信息失败,错误信息:{ret} {msg}")
- return
- await self.redis_service.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
-
- async def get_group_members_from_cache_async(self, wxid,chatroom_id)->dict:
- """
- 获取缓存中单个群成员。
- """
- hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
- cache = await self.redis_service.get_hash_field(hash_key,chatroom_id)
- groups=json.loads(cache) if cache else {}
- return groups
-
- async def get_groups_members_from_cache_async(self, wxid)->list:
- """
- 获取缓存中所有群成员。
- """
- hash_key = f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
- cache = await self.redis_service.get_hash(hash_key)
- groups=[json.loads(v) for v in cache.values()]
- return groups
-
- async def get_groups_info_members_from_cache_async(self, wxid)->list:
- groups_info_hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
- groups_info_cache = await self.redis_service.get_hash(groups_info_hash_key)
-
- groups_menbers_hash_key= f"__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}"
- groups_menbers_cache = await self.redis_service.get_hash(groups_menbers_hash_key)
-
- groups_info_cache_list=[{"chatroom_id": field, "value": json.loads(value)} for field, value in groups_info_cache.items()]
- groups_menbers_cache_list=[{"chatroom_id": field, "value": json.loads(value)} for field, value in groups_menbers_cache.items()]
-
- # 合并逻辑
- merged_data = []
-
- # 遍历 group_info
- for info in groups_info_cache_list:
- chatroom_id = info['chatroom_id']
- group_data = info['value']
-
- # 查找对应的 group_members
- members = next((m['value'] for m in groups_menbers_cache_list if m['chatroom_id'] == chatroom_id), None)
-
- if members:
- # 合并数据
- merged_group = {
- "nickName": group_data['nickName'],
- "chatroomId": group_data['chatroomId'],
- "memberList": members['memberList'],
- "chatroomOwner": members.get('chatroomOwner', None),
- "adminWxid": members.get('adminWxid',None)
- }
- merged_data.append(merged_group)
-
- return merged_data
-
- async def get_group_info_members_from_cache_async(self, wxid,chatroom_id)->dict:
- group_info_cache= await self.get_group_info_from_cache_async(wxid,chatroom_id)
- group_menbers_cache= await self.get_group_members_from_cache_async(wxid,chatroom_id)
- group_info_members = {
- "nickName": group_info_cache['nickName'],
- "chatroomId": group_info_cache['chatroomId'],
- "memberList": group_menbers_cache['memberList'],
- "chatroomOwner": group_menbers_cache['chatroomOwner'],
- "adminWxid": group_menbers_cache['adminWxid']
- }
- return group_info_members
-
- async def update_group_info_to_cache_async(self, token_id, app_id, wxid, chatroom_id: str):
- """
- 更新将群信息保存到 Redis 缓存。
- """
- if not check_chatroom(chatroom_id):
- return
- # Redis 缓存的 key
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
- if not check_chatroom(chatroom_id):
- await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
- await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
- return
- # 获取群信息
- ret, msg, data =await self.get_chatroom_info_async(token_id, app_id, chatroom_id)
- if ret != 200:
- logger.error(f"获取{chatroom_id}群成员信息失败,错误信息:{ret} {msg}")
- return
-
- await self.redis_service.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
-
- async def get_groups_info_from_cache_async(self, wxid)->list:
- """
- 获取群信息保存到 Redis 缓存。
- """
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
- cache = await self.redis_service.get_hash(hash_key)
- groups=[json.loads(v) for v in cache.values()]
- return groups
-
- async def get_group_info_from_cache_async(self, wxid,chatroom_id)->dict:
- """
- 获取群信息保存到 Redis 缓存。
- """
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"
- cache = await self.redis_service.get_hash_field(hash_key,chatroom_id)
- groups=json.loads(cache) if cache else {}
- return groups
-
- async def get_wxchat_config_from_cache_async(self, wxid):
- """
- 获取配置信息
- """
- hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG"
- config = await self.redis_service.get_hash_field(hash_key, wxid)
- return json.loads(config) if config else {}
-
- async def save_wxchat_config_async(self, wxid, config:dict):
- """
- 保存配置信息
- """
- hash_key = f"__AI_OPS_WX__:WXCHAT_CONFIG"
- await self.redis_service.update_hash_field(hash_key, wxid, json.dumps(config, ensure_ascii=False))
-
- async def get_global_config_from_cache_async(self):
- """
- 获取配置信息
- """
- hash_key = f"__AI_OPS_WX__:GLOBAL_CONFIG"
- config = await self.redis_service.get_hash_field(hash_key,"data")
- return json.loads(config) if config else {}
-
- async def save_global_config_async(self, config:dict):
- """
- 保存配置信息
- """
- hash_key = f"__AI_OPS_WX__:GLOBAL_CONFIG"
- await self.redis_service.update_hash_field(hash_key, "data,", json.dumps(config, ensure_ascii=False))
-
- async def get_login_info_from_cache_async(self,tel):
- hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}"
- cache = await self.redis_service.get_hash(hash_key)
- return cache
-
- async def get_login_info_by_wxid_async(self,wxid:str)->tuple:
- cursor = 0
- while True:
- cursor, login_keys =await self.redis_service.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
- # 批量获取所有键的 hash 数据
- for k in login_keys:
- r = await self.redis_service.get_hash(k)
- if r.get("wxid") == wxid:
- return k,r
-
- # 如果游标为 0,则表示扫描完成
- if cursor == 0:
- break
-
- return None,None
-
- async def get_login_info_by_app_id_async(self,app_id:str)->tuple:
- cursor = 0
- while True:
- cursor, login_keys =await self.redis_service.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
- #print(f'login_keys:{login_keys}')
- # 批量获取所有键的 hash 数据
- for k in login_keys:
- r = await self.redis_service.get_hash(k)
- #print(r)
- if r.get("appId") == app_id:
- return k,r
-
- # 如果游标为 0,则表示扫描完成
- if cursor == 0:
- break
-
- return None,None
-
- async def get_token_id_by_app_id_async(self,app_id: str) -> str:
- # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
- cursor = 0
- while True:
- cursor, login_keys =await self.redis_service.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
-
- # 批量获取所有键的 hash 数据
- for k in login_keys:
- r = await self.redis_service.get_hash(k)
- if r.get("appId") == app_id:
- return r.get("tokenId", "")
-
- # 如果游标为 0,则表示扫描完成
- if cursor == 0:
- break
-
- return ""
-
- async def save_login_wx_captch_code_to_cache_async(self,tel,captch_code):
- hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{tel}"
- await self.redis_service.set_hash(hash_key,{"data":captch_code},30)
-
- # async def get_login_wx_captch_code_from_cache_async(self,token_id)->str:
- # hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}"
- # r=await self.redis_service.get_hash_field(hash_key,"data")
- # return r
-
- async def get_login_wx_captch_code_from_cache_async(self,tel)->str:
- hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{tel}"
- r=await self.redis_service.get_hash_field(hash_key,"data")
- return r
-
- async def acquire_login_lock_async(self, token_id, expire_time=10):
- hash_key = f"__AI_OPS_WX__:LOGINLOCK:{token_id}"
- identifier=str(uuid.uuid4())
- if await self.redis_service.client.setnx(hash_key, identifier):
- await self.redis_service.client.expire(hash_key, expire_time)
- return True
- return False
-
- async def release_login_lock_async(self, token_id):
- hash_key = f"__AI_OPS_WX__:LOGINLOCK:{token_id}"
- await self.redis_service.client.delete(hash_key)
-
- async def save_group_add_contacts_history_async(self,wxid,chatroom_id,contact_wxid,history:AddGroupContactsHistory):
- '''
- 保存群加好友历史
- '''
- hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}"
- data_str=await self.redis_service.get_hash_field(hash_key,contact_wxid)
- data=json.loads(data_str) if data_str else []
- data.append(history.model_dump())
- await self.redis_service.update_hash_field(hash_key, contact_wxid, json.dumps(data, ensure_ascii=False))
-
- async def get_group_add_contacts_history_async(self,wxid,chatroom_id,contact_wxid)->list[AddGroupContactsHistory]:
- '''
- 获取群加好友历史
- '''
- hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}"
- data_str=await self.redis_service.get_hash_field(hash_key,contact_wxid)
- data=json.loads(data_str) if data_str else []
- #TypeAdapter.validate_python(List[Models.AddGroupContactsHistory], data)
- return [AddGroupContactsHistory.model_validate(item) for item in data]
-
- async def check_wixd_group_add_contacts_history_async(self,wxid,chatroom_id):
- '''
- 返回群发送好友达到2次的wxid
- '''
- hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}"
- cache = await self.redis_service.get_hash(hash_key)
- wxids = [key for key, value in cache.items() if len(json.loads(value)) == 2]
- return wxids
-
- async def is_group_add_contacts_history_one_day_200_async(self, wxid, chatroom_id) -> bool:
- # 生成 hash_key
- hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}"
- cache = await self.redis_service.get_hash(hash_key)
- today_list = []
- today = datetime.datetime.now().date()
-
- for key, value in cache.items():
- value_data_list = json.loads(value)
- for value_data in value_data_list:
- add_time_date = datetime.datetime.fromtimestamp(value_data["addTime"]).date()
- if add_time_date == today:
- today_list.append(value_data)
- if len(today_list) == 200:
- return True
- return False
-
- async def is_group_add_contacts_history_one_day_90_async(self, wxid) -> bool:
-
- today_list = []
- today = datetime.datetime.now().date()
- cursor = 0
- hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:*"
- while True:
- cursor, history_keys =await self.redis_service.client.scan(cursor, match= hash_key)
- #print(f'login_keys:{login_keys}')
- # 批量获取所有键的 hash 数据
- for k in history_keys:
- cache = await self.redis_service.get_hash(k)
- for key, value in cache.items():
- value_data_list = json.loads(value)
- for value_data in value_data_list:
- add_time_date = datetime.datetime.fromtimestamp(value_data["addTime"]).date()
- if add_time_date == today:
- today_list.append(value_data)
- if len(today_list) == 90:
- return True
-
-
- # 如果游标为 0,则表示扫描完成
- if cursor == 0:
- break
-
-
- return False
-
- async def is_group_add_contacts_history_one_day_async(self, wxid,today_total=90) -> bool:
-
- today_list = []
- today = datetime.datetime.now().date()
- cursor = 0
- hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:*"
- while True:
- cursor, history_keys =await self.redis_service.client.scan(cursor, match= hash_key)
- #print(f'login_keys:{login_keys}')
- # 批量获取所有键的 hash 数据
- for k in history_keys:
- cache = await self.redis_service.get_hash(k)
- for key, value in cache.items():
- value_data_list = json.loads(value)
- for value_data in value_data_list:
- add_time_date = datetime.datetime.fromtimestamp(value_data["addTime"]).date()
- if add_time_date == today:
- today_list.append(value_data)
- if len(today_list) == today_total:
- return True
-
-
- # 如果游标为 0,则表示扫描完成
- if cursor == 0:
- break
-
-
- return False
-
- async def enqueue_to_add_contacts_async(self,wxid,scene:int,v3,v4):
- """
- 入列待添加好友
- """
- hash_key = f'__AI_OPS_WX__:TO_ADD_CONTACTS:{wxid}'
- data_str=json.dumps({"wxid":wxid,"scene":scene,"v3":v3,"v4":v4},ensure_ascii=False)
- await self.redis_service.enqueue(hash_key,data_str)
-
- async def dequeue_to_add_contacts_async(self,wxid)->dict:
- """
- 出列待添加好友
- """
- hash_key = f'__AI_OPS_WX__:TO_ADD_CONTACTS:{wxid}'
- data=await self.redis_service.dequeue(hash_key)
- return json.loads(data) if data else {}
-
-
- async def enqueue_wxchat_message_async(self,msg:WxChatMessage):
- """
- 入列消息
- """
- hash_key = f'__AI_OPS_WX__:WX_CHAT_MESSAGE:{msg.belongToWxid}'
- data_str=json.dumps(msg.model_dump(),ensure_ascii=False)
- await self.redis_service.enqueue(hash_key,data_str)
-
-
-
- async def dequeue_wxchat_message_async(self,wxid)->Optional[WxChatMessage]:
- """
- 出列消息
- """
- hash_key = f'__AI_OPS_WX__:WX_CHAT_MESSAGE:{wxid}'
- data = await self.redis_service.dequeue(hash_key)
-
- if not data:
- return None
- try:
- # 将 JSON 数据反序列化为 WxChatMessage 对象
- return WxChatMessage.model_validate(data)
- except Exception as e:
- # 处理反序列化错误
- print(f"Failed to deserialize message: {e}")
- return None
-
-
- async def save_task_run_time_async(self,task_name,log:list,expire_time=None):
- '''
- 任务运行锁
- '''
- hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}"
- await self.redis_service.set_hash(hash_key,{"data": json.dumps(log, ensure_ascii=False)}, expire_time)
-
- async def get_task_run_time_async(self,task_name)->list:
- hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}"
- cache= await self.redis_service.get_hash_field(hash_key,"data")
- return json.loads(cache) if cache else []
-
- async def save_task_run_time_by_wxid_async(self,wxid,task_name,log:list,expire_time=None):
- '''
- 任务运行锁
- '''
- hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{wxid}:{task_name}"
- await self.redis_service.set_hash(hash_key,{"data": json.dumps(log, ensure_ascii=False)}, expire_time)
-
- async def get_task_run_time_by_wxid_async(self,wxid,task_name)->list:
- hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{wxid}:{task_name}"
- cache= await self.redis_service.get_hash_field(hash_key,"data")
- return json.loads(cache) if cache else []
-
-
- async def save_wx_expection_async(self,wxid,api_name,exception:str,expire_time=None):
- hash_key = f"__AI_OPS_WX__:WX_EXCEPTION:{wxid}:{api_name}"
- await self.redis_service.set_hash(hash_key,{"data": json.dumps(exception, ensure_ascii=False)}, expire_time)
-
- async def get_wx_expection_async(self,wxid,api_name:str)->str:
- hash_key = f"__AI_OPS_WX__:WX_EXCEPTION:{wxid}:{api_name}"
- cache= await self.redis_service.get_hash_field(hash_key,"data")
- return json.loads(cache) if cache else ""
-
- async def wx_add_contacts_from_chatroom_task_status_async(self,wxid,chatroom_id)->int:
- history_hash_key = f'__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}'
- cache=await self.redis_service.get_hash(history_hash_key)
-
-
- group=await self. get_group_members_from_cache_async(wxid,chatroom_id)
- chatroom_member_list = group.get('memberList', [])
- chatroom_owner_wxid = group.get('chatroomOwner', None)
-
- admin_wxids = group.get('adminWxid', [])
-
- admin_wxids = group.get('adminWxid')
- if admin_wxids is None:
- admin_wxids = []
- if chatroom_owner_wxid:
- admin_wxids.append(chatroom_owner_wxid)
-
- #unavailable_wixds=set(admin_wxids)
-
- available_members=[m["wxid"] for m in chatroom_member_list if m["wxid"] not in admin_wxids]
- if len(available_members) > len(cache.keys()):
- return 1
-
- for key, value in cache.items():
- value_data_list = json.loads(value)
- if len(value_data_list) <2:
- return 1
- return 2
-
- async def delete_group_add_contacts_history_async(self,wxid,chatroom_id)->dict:
- hash_key = f"__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}"
- await self.redis_service.delete_hash(hash_key)
-
- async def is_human_handle_msg_async(self,wxid)->bool:
- hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG:{wxid}"
- cache= await self.redis_service.get_hash_field(hash_key,"data")
- return True if cache else False
-
- async def set_human_handle_msg_async(self,wxid,expire_time=60*30):
- hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG:{wxid}"
- await self.redis_service.set_hash(hash_key,{"data": json.dumps(int(time.time()), ensure_ascii=False)}, expire_time)
-
- async def is_human_handle_msg_with_contact_wxid_async(self,wxid,contact_wxid)->bool:
- hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_WTIH_CONTACT:{wxid}:{contact_wxid}"
- cache= await self.redis_service.get_hash_field(hash_key,"data")
- return True if cache else False
-
- async def set_human_handle_msg_with_contact_wxid_async(self,wxid,contact_wxid,expire_time=60*30)->bool:
- hash_key = f"__AI_OPS_WX__:HUMAN_HANDLE_MSG_WTIH_CONTACT:{wxid}:{contact_wxid}"
- cache= await self.redis_service.set_hash(hash_key,{"data": json.dumps(int(time.time()), ensure_ascii=False)}, expire_time)
- return True if cache else False
-
-
-
- # 依赖项:获取 GeWeChatCom 单例
- async def get_gewe_service(app: FastAPI = Depends()) -> GeWeService:
- return app.state.gewe_service
|