Coverage for core/middleware/auth_middleware.py: 62.07%
29 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 18:58 +0000
1"""
2认证中间件
3"""
5import os
7from fastapi import Depends, HTTPException, Request, status
8from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
10from core.models.user import User
11from core.services.auth_service import AuthService
12from infrastructure.config.settings import settings
14security = HTTPBearer(auto_error=False) # 设置为不自动抛出错误
16auth_service = AuthService()
19async def get_current_user(
20 request: Request, credentials: HTTPAuthorizationCredentials = Depends(security)
21) -> User:
22 """获取当前用户"""
23 token = None
25 # 1. 首先尝试从请求头获取token
26 if credentials:
27 token = credentials.credentials
29 # 2. 如果非生产环境且配置中有AUTHORIZATION,则使用配置作为fallback
30 if not token and settings.environment != "production":
31 env_auth = settings.authorization
32 if env_auth:
33 # 处理 "Bearer token" 格式
34 if env_auth.startswith("Bearer "):
35 token = env_auth[7:] # 移除 "Bearer " 前缀
36 else:
37 token = env_auth
38 print(f"🔧 使用环境变量AUTHORIZATION进行认证: {token[:20]}...")
40 if not token:
41 raise HTTPException(
42 status_code=status.HTTP_401_UNAUTHORIZED,
43 detail="未提供认证令牌",
44 headers={"WWW-Authenticate": "Bearer"},
45 )
47 user = auth_service.get_current_user(token)
49 if not user:
50 raise HTTPException(
51 status_code=status.HTTP_401_UNAUTHORIZED,
52 detail="无效的认证令牌",
53 headers={"WWW-Authenticate": "Bearer"},
54 )
56 return user
59async def get_admin_user(current_user: User = Depends(get_current_user)) -> User:
60 """获取管理员用户"""
61 if current_user.user_type != "admin":
62 raise HTTPException(
63 status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限"
64 )
66 return current_user