""" API 依賴項 用於依賴注入 """ from typing import Generator, Optional, Dict, Any from fastapi import Depends, HTTPException, status, Header from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from app.db.session import get_db from app.schemas.base import PaginationParams from app.services.keycloak_service import keycloak_service from app.models import Tenant # OAuth2 Bearer Token Scheme security = HTTPBearer(auto_error=False) def get_pagination_params( page: int = 1, page_size: int = 20, ) -> PaginationParams: """ 獲取分頁參數 Args: page: 頁碼 (從 1 開始) page_size: 每頁數量 Returns: PaginationParams: 分頁參數 Raises: HTTPException: 參數驗證失敗 """ if page < 1: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Page must be greater than 0" ) if page_size < 1 or page_size > 100: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Page size must be between 1 and 100" ) return PaginationParams(page=page, page_size=page_size) def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), ) -> Optional[Dict[str, Any]]: """ 獲取當前登入用戶 (從 JWT Token) Args: credentials: HTTP Bearer Token Returns: dict: 用戶資訊 (包含 username, email, sub 等) None: 未提供 Token 或 Token 無效時 Raises: HTTPException: Token 無效時拋出 401 """ if not credentials: return None token = credentials.credentials # 驗證 Token user_info = keycloak_service.get_user_info_from_token(token) if not user_info: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) return user_info def require_auth( current_user: Optional[Dict[str, Any]] = Depends(get_current_user), ) -> Dict[str, Any]: """ 要求必須認證 Args: current_user: 當前用戶資訊 Returns: dict: 用戶資訊 Raises: HTTPException: 未認證時拋出 401 """ if not current_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Bearer"}, ) return current_user def check_permission(required_roles: list = None): """ 檢查用戶權限 (基於角色) Args: required_roles: 需要的角色列表 (例如: ["admin", "hr_manager"]) Returns: function: 權限檢查函數 使用範例: @router.get("/admin-only", dependencies=[Depends(check_permission(["admin"]))]) """ if required_roles is None: required_roles = [] def permission_checker( current_user: Dict[str, Any] = Depends(require_auth) ) -> Dict[str, Any]: """檢查用戶是否有所需權限""" # TODO: 從 Keycloak Token 解析用戶角色 # 目前暫時允許所有已認證用戶 user_roles = current_user.get("realm_access", {}).get("roles", []) if required_roles: has_permission = any(role in user_roles for role in required_roles) if not has_permission: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Required roles: {', '.join(required_roles)}" ) return current_user return permission_checker def get_current_tenant( current_user: Dict[str, Any] = Depends(require_auth), db: Session = Depends(get_db), ) -> Tenant: """ 獲取當前租戶 (從 JWT Token 的 realm) 多租戶架構:每個租戶對應一個 Keycloak Realm - JWT Token 來自哪個 Realm,就屬於哪個租戶 - 透過 iss (Issuer) 欄位解析 Realm 名稱 - 查詢 tenants 表找到對應租戶 Args: current_user: 當前用戶資訊 (含 iss) db: 資料庫 Session Returns: Tenant: 租戶物件 Raises: HTTPException: 租戶不存在或未啟用時拋出 403 範例: iss: "https://auth.lab.taipei/realms/porscheworld" → realm_name: "porscheworld" → tenant.keycloak_realm: "porscheworld" """ # 從 JWT iss 欄位解析 Realm 名稱 # iss 格式: "https://{domain}/realms/{realm_name}" iss = current_user.get("iss", "") if not iss: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token: missing issuer" ) # 解析 realm_name try: realm_name = iss.split("/realms/")[-1] except Exception: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token: cannot parse realm" ) # 查詢租戶 tenant = db.query(Tenant).filter( Tenant.keycloak_realm == realm_name, Tenant.is_active == True ).first() if not tenant: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Tenant not found or inactive: {realm_name}" ) return tenant def get_tenant_id( tenant: Tenant = Depends(get_current_tenant) ) -> int: """ 獲取當前租戶 ID (簡化版) 用於只需要 tenant_id 的場景 Args: tenant: 租戶物件 Returns: int: 租戶 ID """ return tenant.id