diff --git a/app/main.py b/app/main.py index 56334a2..93a8911 100644 --- a/app/main.py +++ b/app/main.py @@ -12,6 +12,7 @@ import uvicorn import logging from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler from starlette.middleware.base import BaseHTTPMiddleware +from starlette.middleware.gzip import GZipMiddleware from services.gewe_service import GeWeService # 导入 GeWeChatCom from common.log import logger from app.endpoints.config_endpoint import config_router @@ -203,7 +204,7 @@ async def lifespan(app: FastAPI): print('应用关闭') await kafka_service.stop() -app = FastAPI(lifespan=lifespan) +app = FastAPI(lifespan=lifespan,max_request_size=100 * 1024 * 1024) # 配置日志:输出到文件,文件最大 10MB,保留 5 个备份文件 # log_handler = RotatingFileHandler( @@ -228,6 +229,7 @@ app = FastAPI(lifespan=lifespan) app.add_middleware(BaseHTTPMiddleware, dispatch=http_context) +app.add_middleware(GZipMiddleware, minimum_size=1000) app.include_router(config_router) app.include_router(contacts_router) diff --git a/app/middleware.py b/app/middleware.py index 89afc6f..5df91c4 100644 --- a/app/middleware.py +++ b/app/middleware.py @@ -23,10 +23,11 @@ class ResponseData(BaseModel): result: Result timestamp: str +# 设置最大请求大小(100MB) +MAX_REQUEST_SIZE = 100 * 1024 * 1024 # 100MB - -async def http_context(request: Request, call_next): +async def http_context_v1(request: Request, call_next): # 记录请求信息 request_info = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), @@ -76,8 +77,6 @@ async def http_context(request: Request, call_next): return JSONResponse(content=result.model_dump()) else: - print(response) - print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~') message = "请求失败!" # 如果响应状态码不为 200,则记录响应信息 try: @@ -120,61 +119,81 @@ async def http_context(request: Request, call_next): # 返回修改后的响应 return JSONResponse(content=result.model_dump(), status_code=response.status_code) - -async def http_context_2(request: Request, call_next): - # 记录请求信息 - request_body = None - if request.method in ["POST", "PUT", "PATCH"]: - try: - request_body = await request.json() # 使用 .json(),避免影响 FastAPI 解析 - except Exception: - request_body = "无法解析 JSON" - + +async def http_context(request: Request, call_next): + # 检查请求大小 + content_length = request.headers.get("content-length") + if content_length and int(content_length) > MAX_REQUEST_SIZE: + return JSONResponse( + content={ + "data": None, + "result": { + "code": 413, + "message": "请求体过大,最大支持 100MB", + "status": "failed", + }, + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), + }, + status_code=413, # 413 Payload Too Large + ) + + # 记录请求信息(只读取 body,防止重复读取导致错误) request_info = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "method": request.method, "url": str(request.url), - "body": request_body, + "body": None, # 只在 POST/PUT/PATCH 时读取 body } - logger.info(f"请求: {json.dumps(request_info, separators=(',', ':'), ensure_ascii=False)}") + if request.method in ["POST", "PUT", "PATCH"]: + try: + request_info["body"] = await request.body() + except Exception as e: + request_info["body"] = f"Error reading body: {str(e)}" - # 继续处理请求 + logger.info(f"请求: {json.dumps(request_info, separators=(',', ':'), default=str, ensure_ascii=False)}") + + # 调用下一个中间件或路由 response = await call_next(request) - # 如果是 422 错误,直接返回,避免 Pydantic 解析错误 - if response.status_code == 422: - return response + # 处理响应 + response_body = b"" + async for chunk in response.body_iterator: + response_body += chunk + response_body_str = response_body.decode("utf-8") - # 处理正常请求 try: - response_body = b"" - async for chunk in response.body_iterator: - response_body += chunk - response_body_str = response_body.decode("utf-8") business_data = json.loads(response_body_str) except Exception as e: business_data = {"error": f"无法解析响应体: {str(e)}"} - if "code" in business_data: - message = business_data.get("message", "请求失败!") + # 构造统一格式的响应 + if response.status_code == 200: result = ResponseData( - data=None, - result=Result(code=business_data.get("code", 500), message=message, status="failed"), - timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + data=business_data, + result=Result(code=200, message="请求成功!", status="succeed"), + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), ) else: + message = "请求失败!" + if response.status_code == 404: + message = "资源未找到" + elif response.status_code == 400: + message = "请求参数错误" + elif response.status_code == 500: + message = "服务器内部错误" + result = ResponseData( - data=business_data, - result=Result(code=200, message="请求成功!", status="succeed"), - timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + data=None, + result=Result(code=response.status_code, message=message, status="failed"), + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), ) response_info = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "status_code": response.status_code, "headers": dict(response.headers), - "body": result.dict(), + "body": result.model_dump(), } - logger.info(f"响应: {json.dumps(response_info, separators=(',', ':'), ensure_ascii=False)}") + logger.info(f"响应: {json.dumps(response_info, separators=(',', ':'), default=str, ensure_ascii=False)}") - return JSONResponse(content=result.model_dump()) + return JSONResponse(content=result.model_dump(), status_code=response.status_code) \ No newline at end of file diff --git a/services/kafka_service.py b/services/kafka_service.py index 7846db8..ae94253 100644 --- a/services/kafka_service.py +++ b/services/kafka_service.py @@ -69,7 +69,8 @@ class KafkaService: target_topic = topic or self.producer_topic print(f'生产者topic:{target_topic}') - logger.info(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}") + #logger.info(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}") + print(f"生产者topic:{target_topic}\n生产者消息:{json.dumps(json.loads(message), separators=(',', ':'), default=str, ensure_ascii=False)}") try: await self.producer.send_and_wait( target_topic,