Coverage for core/middleware/auth_middleware.py: 62.07%

29 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-13 18:58 +0000

1""" 

2认证中间件 

3""" 

4 

5import os 

6 

7from fastapi import Depends, HTTPException, Request, status 

8from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer 

9 

10from core.models.user import User 

11from core.services.auth_service import AuthService 

12from infrastructure.config.settings import settings 

13 

14security = HTTPBearer(auto_error=False) # 设置为不自动抛出错误 

15 

16auth_service = AuthService() 

17 

18 

19async def get_current_user( 

20 request: Request, credentials: HTTPAuthorizationCredentials = Depends(security) 

21) -> User: 

22 """获取当前用户""" 

23 token = None 

24 

25 # 1. 首先尝试从请求头获取token 

26 if credentials: 

27 token = credentials.credentials 

28 

29 # 2. 如果非生产环境且配置中有AUTHORIZATION,则使用配置作为fallback 

30 if not token and settings.environment != "production": 

31 env_auth = settings.authorization 

32 if env_auth: 

33 # 处理 "Bearer token" 格式 

34 if env_auth.startswith("Bearer "): 

35 token = env_auth[7:] # 移除 "Bearer " 前缀 

36 else: 

37 token = env_auth 

38 print(f"🔧 使用环境变量AUTHORIZATION进行认证: {token[:20]}...") 

39 

40 if not token: 

41 raise HTTPException( 

42 status_code=status.HTTP_401_UNAUTHORIZED, 

43 detail="未提供认证令牌", 

44 headers={"WWW-Authenticate": "Bearer"}, 

45 ) 

46 

47 user = auth_service.get_current_user(token) 

48 

49 if not user: 

50 raise HTTPException( 

51 status_code=status.HTTP_401_UNAUTHORIZED, 

52 detail="无效的认证令牌", 

53 headers={"WWW-Authenticate": "Bearer"}, 

54 ) 

55 

56 return user 

57 

58 

59async def get_admin_user(current_user: User = Depends(get_current_user)) -> User: 

60 """获取管理员用户""" 

61 if current_user.user_type != "admin": 

62 raise HTTPException( 

63 status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限" 

64 ) 

65 

66 return current_user