from fastapi import APIRouter,Request,FastAPI from pydantic import BaseModel from fastapi import APIRouter, Depends from pydantic import BaseModel, ValidationError from common.log import logger from model.models import AgentConfig,validate_wxid from services.gewe_service import GeWeService,get_gewe_service from services.redis_service import RedisService from model.models import AgentConfig,validate_wxid from common.utils import * import time,asyncio agent_router = APIRouter(prefix="/api/agent") class GetAgentLoginRequest(BaseModel): tel: str class GetWxQRCodeRequest(BaseModel): tel: str tokenId:str regionId:str agentTokenId:str class LogincCaptchCode(BaseModel): tel: str captchCode:str @agent_router.post("/getlogin", response_model=None) async def get_login(request: Request, body: GetAgentLoginRequest, ): tel = body.tel return await request.app.state.gewe_service.get_login_info_from_cache_async(tel) @agent_router.post("/getwxqrcode", response_model=None) async def get_wx_qrcode(request: Request, body: GetWxQRCodeRequest, ): tel = body.tel token_id =body.tokenId region_id= body.regionId agent_token_id= body.agentTokenId loginfo=await request.app.state.gewe_service.get_login_info_from_cache_async(tel) status=loginfo.get('status','0') if status=='1': msg=f'手机号{tel},wx_token{token_id} 已经微信登录,终止登录流程' logger.info(msg) return {'code': 501, 'message': msg} now=time.time() expried_time=int(now)+150 flag=await request.app.state.gewe_service.acquire_login_lock_async(token_id,150) if not flag: msg=f'手机号{tel}, wx_token{token_id} 登录进行中,稍后再试' logger.info(msg) return {'code': 501, 'message': msg} app_id=loginfo.get('appId','') qr_code = await request.app.state.gewe_service.get_login_qr_code_async(token_id, app_id,region_id) if not qr_code: msg=f"获取二维码失败,qr_code: {qr_code}" await request.app.state.gewe_service.release_login_lock_async(token_id) logger.info(msg) return {'code': 501, 'message': msg} uuid = qr_code.get('uuid',None) if not uuid: msg=f"uuid获取二维码失败,uuid: {uuid}" await request.app.state.gewe_service.release_login_lock_async(token_id) logger.info(msg) return {'code': 501, 'message': msg} app_id = app_id or qr_code.get('appId') base64_string = qr_code.get('qrImgBase64',None) await request.app.state.gewe_service.qrCallback(uuid,base64_string) hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" print(hash_key) # thread = threading.Thread(target=waitting_login_result, args=(gewe_chat.wxchat,token_id, app_id,region_id, agent_token_id,hash_key, uuid,now)) # thread.daemon = True # thread.start() loop = asyncio.get_event_loop() future = asyncio.run_coroutine_threadsafe( waitting_login_result(request,token_id, app_id,region_id, agent_token_id,hash_key, uuid,now), loop ) return { "tokenId": token_id, "tel": tel, "base64Img": base64_string, "expiredTime": expried_time, } async def waitting_login_result(request: Request, token_id, app_id,region_id, agent_token_id,hash_key, uuid,start_time): agent_tel=hash_key.split(":")[-1] try: while True: now = time.time() if now - start_time > 150: logger.info(f'{token_id} 使用 {app_id} 扫二维码登录超时') break logger.info(f"{token_id} 使用 {app_id},等待扫码登录,二维码有效时间 {150 - int(now - start_time)} 秒") captch_code = await request.app.state.gewe_service.get_login_wx_captch_code_from_cache_async(agent_tel) captch_code= captch_code if captch_code else '' logger.info(f"{token_id} 使用 {app_id} 的验证码 {captch_code}") ret,msg,res = await request.app.state.gewe_service.check_login_async(token_id, app_id, uuid,captch_code) if ret == 200: flag = res.get('status') if flag == 2: logger.info(f"登录成功: {res}") head_img_url=res.get('headImgUrl','') login_info = res.get('loginInfo', {}) wxid=login_info.get('wxid',agent_tel) cache_login_info=await request.app.state.gewe_service.get_login_info_from_cache_async(agent_tel) cache_wxid=cache_login_info.get('wxid','') if not cache_login_info and cache_wxid!=wxid and cache_wxid!='': logger.warning(f"agent_tel {agent_tel} , wxid {wxid} 与 cache_wxid {cache_wxid} 不匹配,登录失败") await request.app.state.gewe_service.logout_async(token_id,app_id) # k_message=utils.login_result_message(token_id,agent_tel,region_id,agent_token_id,'') # kafka_helper.kafka_client.produce_message(k_message) break login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1,'headImgUrl':head_img_url,'regionId':region_id}) cache_login_info=await request.app.state.redis_service.get_hash(hash_key) if 'appId' not in cache_login_info: login_info.update({"create_at":int(time.time()),"modify_at":int(time.time())}) # 默认配置 config=AgentConfig.model_validate({ "chatroomIdWhiteList": [], "agentTokenId": agent_token_id, "agentEnabled": True, "addContactsFromChatroomIdWhiteList": [], "chatWaitingMsgEnabled": True, "privateGroupChatEnabled": False }) else: login_info.update({"modify_at":int(time.time())}) # 已有配置 config_cache=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) config=AgentConfig.model_validate(config_cache) cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} #wxid=cleaned_login_info.get('wxid',agent_tel) # 保存配置信息 config_dict=config.model_dump() await request.app.state.gewe_service.save_wxchat_config_async(wxid,config_dict) # 保存登录信息 await request.app.state.redis_service.set_hash(hash_key, cleaned_login_info) await request.app.state.gewe_service.release_login_lock_async(token_id) # 登录结果推送到kafka k_message=login_result_message(token_id,agent_tel,region_id,agent_token_id,wxid) await request.app.state.kafka_service.send_message_async(k_message) # 同步联系人列表 ret,msg,contacts_list=await request.app.state.gewe_service.fetch_contacts_list_async(token_id,app_id) if ret!=200: logger.warning(f"同步联系人列表失败: {ret}-{msg}") break friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong']] # 可以调整截取范围 data=await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids) chatrooms=contacts_list['chatrooms'] # 同步群列表 logger.info(f'{wxid} 的群数量 {len(chatrooms)}') logger.info(f'{wxid} 同步群列表') await request.app.state.gewe_service.save_groups_info_to_cache_async(token_id, app_id, wxid, chatrooms) logger.info(f'{wxid} 同步群成员') # 同步群成员 await request.app.state.gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms) logger.info(f'{wxid} 全量好友信息推送到kafka') # 联系人推送到kafka #k_message = wx_all_contacts_message(wxid, data) k_message = wx_all_contacts_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka logger.info(f'{wxid} 全量群信息推送到kafka') #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) # 消息锁 request.app.state.message_locks[app_id]=asyncio.Lock() break else: logger.info(f"登录检查中: {ret}-{msg}-{res}") await asyncio.sleep(5) finally: await request.app.state.gewe_service.release_login_lock_async(token_id) @agent_router.post("/logincaptchcode", response_model=None) async def login_captch_code(request: Request, body: LogincCaptchCode, ): tel = body.tel captch_code=body.captchCode res=await request.app.state.gewe_service.save_login_wx_captch_code_to_cache_async(tel,captch_code) return {'message': '操作成功'}