No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

239 líneas
11KB

  1. from fastapi import APIRouter,Request,FastAPI
  2. from pydantic import BaseModel
  3. from fastapi import APIRouter, Depends
  4. from pydantic import BaseModel, ValidationError
  5. from common.log import logger
  6. from model.models import AgentConfig,validate_wxid
  7. from services.gewe_service import GeWeService,get_gewe_service
  8. from services.redis_service import RedisService
  9. from model.models import AgentConfig,validate_wxid
  10. from common.utils import *
  11. import time,asyncio
  12. agent_router = APIRouter(prefix="/api/agent")
  13. class GetAgentLoginRequest(BaseModel):
  14. tel: str
  15. class GetWxQRCodeRequest(BaseModel):
  16. tel: str
  17. tokenId:str
  18. regionId:str
  19. agentTokenId:str
  20. class LogincCaptchCode(BaseModel):
  21. tel: str
  22. captchCode:str
  23. class DeleteAgentRequest(BaseModel):
  24. tel: str
  25. @agent_router.post("/getlogin", response_model=None)
  26. async def get_login(request: Request, body: GetAgentLoginRequest, ):
  27. tel = body.tel
  28. return await request.app.state.gewe_service.get_login_info_from_cache_async(tel)
  29. @agent_router.post("/getwxqrcode", response_model=None)
  30. async def get_wx_qrcode(request: Request, body: GetWxQRCodeRequest, ):
  31. tel = body.tel
  32. token_id =body.tokenId
  33. region_id= body.regionId
  34. agent_token_id= body.agentTokenId
  35. loginfo=await request.app.state.gewe_service.get_login_info_from_cache_async(tel)
  36. status=loginfo.get('status','0')
  37. if status=='1':
  38. msg=f'手机号{tel},wx_token{token_id} 已经微信登录,终止登录流程'
  39. logger.warning(msg)
  40. return {'code': 501, 'message': msg}
  41. now=time.time()
  42. expried_time=int(now)+150
  43. flag=await request.app.state.gewe_service.acquire_login_lock_async(tel,150)
  44. if not flag:
  45. msg=f'手机号{tel}, wx_token{token_id} 登录进行中,稍后再试'
  46. logger.warning(msg)
  47. return {'code': 501, 'message': msg}
  48. app_id=loginfo.get('appId','')
  49. qr_code = await request.app.state.gewe_service.get_login_qr_code_async(token_id, app_id,region_id)
  50. if not qr_code:
  51. msg=f"获取二维码失败,qr_code: {qr_code}"
  52. await request.app.state.gewe_service.release_login_lock_async(tel)
  53. logger.warning(msg)
  54. return {'code': 501, 'message': msg}
  55. uuid = qr_code.get('uuid',None)
  56. if not uuid:
  57. msg=f"uuid获取二维码失败,uuid: {uuid}"
  58. await request.app.state.gewe_service.release_login_lock_async(tel)
  59. logger.warning(msg)
  60. return {'code': 501, 'message': msg}
  61. app_id = app_id or qr_code.get('appId')
  62. base64_string = qr_code.get('qrImgBase64',None)
  63. await request.app.state.gewe_service.qrCallback(uuid,base64_string)
  64. hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}"
  65. print(hash_key)
  66. # thread = threading.Thread(target=waitting_login_result, args=(gewe_chat.wxchat,token_id, app_id,region_id, agent_token_id,hash_key, uuid,now))
  67. # thread.daemon = True
  68. # thread.start()
  69. loop = asyncio.get_event_loop()
  70. future = asyncio.run_coroutine_threadsafe(
  71. waitting_login_result(request,token_id, app_id,region_id, agent_token_id,hash_key, uuid,now),
  72. loop
  73. )
  74. return {
  75. "tokenId": token_id,
  76. "tel": tel,
  77. "base64Img": base64_string,
  78. "expiredTime": expried_time,
  79. }
  80. async def waitting_login_result(request: Request, token_id, app_id,region_id, agent_token_id,hash_key, uuid,start_time):
  81. agent_tel=hash_key.split(":")[-1]
  82. try:
  83. while True:
  84. now = time.time()
  85. if now - start_time > 150:
  86. logger.info(f'{token_id} 使用 {app_id} 扫二维码登录超时')
  87. break
  88. logger.info(f"{token_id} 使用 {app_id},等待扫码登录,二维码有效时间 {150 - int(now - start_time)} 秒")
  89. captch_code = await request.app.state.gewe_service.get_login_wx_captch_code_from_cache_async(agent_tel)
  90. captch_code= captch_code if captch_code else ''
  91. logger.info(f"{token_id} 使用 {app_id} 的验证码 {captch_code}")
  92. ret,msg,res = await request.app.state.gewe_service.check_login_async(token_id, app_id, uuid,captch_code)
  93. if ret == 200:
  94. flag = res.get('status')
  95. if flag == 2:
  96. logger.info(f"登录成功: {res}")
  97. head_img_url=res.get('headImgUrl','')
  98. login_info = res.get('loginInfo', {})
  99. wxid=login_info.get('wxid',agent_tel)
  100. cache_login_info=await request.app.state.gewe_service.get_login_info_from_cache_async(agent_tel)
  101. cache_wxid=cache_login_info.get('wxid','')
  102. if not cache_login_info and cache_wxid!=wxid and cache_wxid!='':
  103. logger.warning(f"agent_tel {agent_tel} , wxid {wxid} 与 cache_wxid {cache_wxid} 不匹配,登录失败")
  104. await request.app.state.gewe_service.logout_async(token_id,app_id)
  105. # k_message=utils.login_result_message(token_id,agent_tel,region_id,agent_token_id,'')
  106. # kafka_helper.kafka_client.produce_message(k_message)
  107. break
  108. login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1,'headImgUrl':head_img_url,'regionId':region_id})
  109. cache_login_info=await request.app.state.redis_service.get_hash(hash_key)
  110. if 'appId' not in cache_login_info:
  111. login_info.update({"create_at":int(time.time()),"modify_at":int(time.time())})
  112. # 默认配置
  113. config=AgentConfig.model_validate({
  114. "chatroomIdWhiteList": [],
  115. "agentTokenId": agent_token_id,
  116. "agentEnabled": True,
  117. "addContactsFromChatroomIdWhiteList": [],
  118. "chatWaitingMsgEnabled": True,
  119. "privateGroupChatEnabled": False
  120. })
  121. else:
  122. login_info.update({"modify_at":int(time.time())})
  123. # 已有配置
  124. config_cache=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  125. config=AgentConfig.model_validate(config_cache)
  126. cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()}
  127. #wxid=cleaned_login_info.get('wxid',agent_tel)
  128. # 保存配置信息
  129. config_dict=config.model_dump()
  130. await request.app.state.gewe_service.save_wxchat_config_async(wxid,config_dict)
  131. # 保存登录信息
  132. await request.app.state.redis_service.set_hash(hash_key, cleaned_login_info)
  133. await request.app.state.gewe_service.release_login_lock_async(agent_tel)
  134. # 登录结果推送到kafka
  135. k_message=login_result_message(token_id,agent_tel,region_id,agent_token_id,wxid)
  136. await request.app.state.kafka_service.send_message_async(k_message)
  137. # 同步联系人列表
  138. ret,msg,contacts_list=await request.app.state.gewe_service.fetch_contacts_list_async(token_id,app_id)
  139. if ret!=200:
  140. logger.warning(f"同步联系人列表失败: {ret}-{msg}")
  141. break
  142. friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong']] # 可以调整截取范围
  143. data=await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
  144. chatrooms=contacts_list['chatrooms']
  145. # 同步群列表
  146. logger.info(f'{wxid} 的群数量 {len(chatrooms)}')
  147. logger.info(f'{wxid} 同步群列表')
  148. await request.app.state.gewe_service.save_groups_info_to_cache_async(token_id, app_id, wxid, chatrooms)
  149. logger.info(f'{wxid} 同步群成员')
  150. # 同步群成员
  151. await request.app.state.gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms)
  152. logger.info(f'{wxid} 全量好友信息推送到kafka')
  153. # 联系人推送到kafka
  154. #k_message = wx_all_contacts_message(wxid, data)
  155. k_message = wx_all_contacts_key_message(wxid)
  156. await request.app.state.kafka_service.send_message_async(k_message)
  157. # 全量群信息推送到kafka
  158. logger.info(f'{wxid} 全量群信息推送到kafka')
  159. #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
  160. #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
  161. k_message=wx_groups_info_members_key_message(wxid)
  162. await request.app.state.kafka_service.send_message_async(k_message)
  163. # 消息锁
  164. request.app.state.message_locks[app_id]=asyncio.Lock()
  165. break
  166. else:
  167. logger.info(f"登录检查中: {ret}-{msg}-{res}")
  168. await asyncio.sleep(5)
  169. finally:
  170. await request.app.state.gewe_service.release_login_lock_async(agent_tel)
  171. @agent_router.post("/logincaptchcode", response_model=None)
  172. async def login_captch_code(request: Request, body: LogincCaptchCode, ):
  173. tel = body.tel
  174. captch_code=body.captchCode
  175. res=await request.app.state.gewe_service.save_login_wx_captch_code_to_cache_async(tel,captch_code)
  176. return {'message': '操作成功'}
  177. @agent_router.post("/delete", response_model=None)
  178. async def delete_agent(request: Request, body: DeleteAgentRequest):
  179. tel = body.tel
  180. loginfo = await request.app.state.gewe_service.get_login_info_from_cache_async(tel)
  181. wxid = loginfo.get('wxid', '')
  182. # 获取所有匹配的 Redis 键
  183. ai_ops_wx_keys = [key async for key in await request.app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:*')]
  184. # 定义正则表达式模式
  185. pattern = re.compile(rf'^__AI_OPS_WX__.*{wxid}.*')
  186. # 过滤掉符合模式的键
  187. filtered_keys = [key for key in ai_ops_wx_keys if not pattern.match(key)]
  188. # 并发删除过滤后的键
  189. await asyncio.gather(*[request.app.state.redis_service.delete_hash(key) for key in filtered_keys])
  190. # 删除登录信息
  191. await request.app.state.gewe_service.delete_hash(f'__AI_OPS_WX__:LOGININFO:{tel}')
  192. return {'message': '操作成功'}