import json import time from fastapi import FastAPI, Request,HTTPException from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse from pydantic import BaseModel from datetime import datetime import logging import uuid from common.log import logger class Result(BaseModel): code: int message: str status: str class ResponseData(BaseModel): data: dict|list|None result: Result timestamp: str # 设置最大请求大小(100MB) MAX_REQUEST_SIZE = 100 * 1024 * 1024 # 100MB async def http_context_v1(request: Request, call_next): # 记录请求信息 request_info = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "method": request.method, "url": str(request.url), "body": await request.body() if request.method in ["POST", "PUT", "PATCH"] else None, } logger.info(f"请求: {json.dumps(request_info, separators=(',', ':'), default=str, ensure_ascii=False)}") # 调用下一个中间件或路由处理程序 response = await call_next(request) # 如果响应状态码为 200,则格式化响应为统一格式 if response.status_code == 200: try: response_body = b"" async for chunk in response.body_iterator: response_body += chunk response_body_str = response_body.decode("utf-8") business_data = json.loads(response_body_str) except Exception as e: business_data = {"error": f"Unable to decode response body: {str(e)}"} if "code" in business_data: message=business_data.get("message","请求失败!") result = ResponseData( data=None, result=Result(code=business_data.get("code",500), message=message, status="failed"), timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") ) else: # 构造统一格式的响应 result = ResponseData( data=business_data, result=Result(code=200, message="请求成功!", status="succeed"), timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") ) response_info = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "status_code": response.status_code, "headers": dict(response.headers), "body": result.dict(), } logger.info(f"响应: {json.dumps(response_info, separators=(',', ':'), default=str, ensure_ascii=False)}") # 返回修改后的响应 return JSONResponse(content=result.model_dump()) else: message = "请求失败!" # 如果响应状态码不为 200,则记录响应信息 try: response_body = b"" async for chunk in response.body_iterator: response_body += chunk response_body_str = response_body.decode("utf-8") business_data = json.loads(response_body_str) except Exception as e: business_data = {"error": f"Unable to decode response body: {str(e)}"} # 根据不同状态码定制 message 字段 if response.status_code == 404: message = e.detail elif response.status_code == 400: message = "请求参数错误" elif response.status_code == 500: message = "服务器内部错误" # 你可以根据不同的状态码设置更详细的错误消息 # 构造统一格式的响应 result = ResponseData( data={}, # 返回空数据 result=Result( code=response.status_code, message=message, # 根据状态码返回详细信息 status="failed" ), timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") ) response_info = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "status_code": response.status_code, "headers": dict(response.headers), "body": result.dict(), } logger.info(f"响应: {json.dumps(response_info, separators=(',', ':'), default=str, ensure_ascii=False)}") # 返回修改后的响应 return JSONResponse(content=result.model_dump(), status_code=response.status_code) async def http_context(request: Request, call_next): request.state.max_request_size = MAX_REQUEST_SIZE # 生成唯一ID request_id = str(uuid.uuid4()) # 检查请求大小 content_length = request.headers.get("content-length") if content_length and int(content_length) > MAX_REQUEST_SIZE: return JSONResponse( content={ "data": None, "result": { "code": 413, "message": "请求体过大,最大支持 100MB", "status": "failed", }, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), }, status_code=413, # 413 Payload Too Large ) # 记录请求信息(只读取 body,防止重复读取导致错误) request_info = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "method": request.method, "url": str(request.url), "body": None, # 只在 POST/PUT/PATCH 时读取 body } if request.method in ["POST", "PUT", "PATCH"]: try: request_info["body"] = await request.body() except Exception as e: request_info["body"] = f"Error reading body: {str(e)}" logger.info(f"{request_id} 请求: {json.dumps(request_info, separators=(',', ':'), default=str, ensure_ascii=False)}") # 调用下一个中间件或路由 response = await call_next(request) # 处理响应 response_body = b"" async for chunk in response.body_iterator: response_body += chunk response_body_str = response_body.decode("utf-8") try: business_data = json.loads(response_body_str) except Exception as e: business_data = {"error": f"无法解析响应体: {str(e)}"} # 构造统一格式的响应 if response.status_code == 200: # result = ResponseData( # data=business_data, # result=Result(code=200, message="请求成功!", status="succeed"), # timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), # ) try: business_data = json.loads(response_body_str) except Exception as e: business_data = {"error": f"Unable to decode response body: {str(e)}"} if "code" in business_data: message=business_data.get("message","请求失败!") result = ResponseData( data=None, result=Result(code=business_data.get("code",500), message=message, status="failed"), timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") ) else: # 构造统一格式的响应 result = ResponseData( data=business_data, result=Result(code=200, message="请求成功!", status="succeed"), timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") ) response_info = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "status_code": response.status_code, "headers": dict(response.headers), "body": result.model_dump(), } logger.info(f"{request_id} 响应: {json.dumps(response_info, separators=(',', ':'), default=str, ensure_ascii=False)}") # 返回修改后的响应 return JSONResponse(content=result.model_dump()) else: message = "请求失败!" if response.status_code == 404: message = "资源未找到" elif response.status_code == 400: message = "请求参数错误" elif response.status_code == 500: message = "服务器内部错误" result = ResponseData( data=None, result=Result(code=response.status_code, message=message, status="failed"), timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), ) response_info = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "status_code": response.status_code, "headers": dict(response.headers), "body": result.model_dump(), } logger.info(f"{request_id} 响应: {json.dumps(response_info, separators=(',', ':'), default=str, ensure_ascii=False)}") return JSONResponse(content=result.model_dump(), status_code=response.status_code)