Major Features: - ✅ Multi-tenant architecture (tenant isolation) - ✅ Employee CRUD with lifecycle management (onboarding/offboarding) - ✅ Department tree structure with email domain management - ✅ Company info management (single-record editing) - ✅ System functions CRUD (permission management) - ✅ Email account management (multi-account per employee) - ✅ Keycloak SSO integration (auth.lab.taipei) - ✅ Redis session storage (10.1.0.254:6379) - Solves Cookie 4KB limitation - Cross-system session sharing - Sliding expiration (8 hours) - Automatic token refresh Technical Stack: Backend: - FastAPI + SQLAlchemy - PostgreSQL 16 (10.1.0.20:5433) - Keycloak Admin API integration - Docker Mailserver integration (SSH) - Alembic migrations Frontend: - Next.js 14 (App Router) - NextAuth 4 with Keycloak Provider - Redis session storage (ioredis) - Tailwind CSS Infrastructure: - Redis 7 (10.1.0.254:6379) - Session + Cache - Keycloak 26.1.0 (auth.lab.taipei) - Docker Mailserver (10.1.0.254) Architecture Highlights: - Session管理由 Keycloak + Redis 統一控制 - 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session - Token 自動刷新,異質服務整合 - 未來可無縫遷移到雲端 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
363 lines
10 KiB
Python
363 lines
10 KiB
Python
"""
|
|
認證 API
|
|
處理登入、登出、Token 管理
|
|
"""
|
|
from typing import Dict, Any
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db.session import get_db
|
|
from app.schemas.auth import LoginRequest, TokenResponse, UserInfo
|
|
from app.schemas.response import MessageResponse
|
|
from app.api.deps import get_current_user, require_auth
|
|
from app.services.keycloak_service import keycloak_service
|
|
from app.services.audit_service import audit_service
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
def login(
|
|
login_data: LoginRequest,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
用戶登入
|
|
|
|
使用 Keycloak Direct Access Grant (Resource Owner Password Credentials)
|
|
獲取 Access Token 和 Refresh Token
|
|
|
|
Args:
|
|
login_data: 登入憑證 (username, password)
|
|
request: HTTP Request (用於獲取 IP)
|
|
db: 資料庫 Session
|
|
|
|
Returns:
|
|
TokenResponse: Access Token 和 Refresh Token
|
|
|
|
Raises:
|
|
HTTPException: 登入失敗時拋出 401
|
|
"""
|
|
try:
|
|
# 使用 Keycloak 進行認證
|
|
token_response = keycloak_service.openid.token(
|
|
login_data.username,
|
|
login_data.password
|
|
)
|
|
|
|
# 記錄登入成功的審計日誌
|
|
audit_service.log_login(
|
|
db=db,
|
|
username=login_data.username,
|
|
ip_address=audit_service.get_client_ip(request),
|
|
success=True
|
|
)
|
|
|
|
return TokenResponse(
|
|
access_token=token_response["access_token"],
|
|
token_type=token_response["token_type"],
|
|
expires_in=token_response["expires_in"],
|
|
refresh_token=token_response.get("refresh_token")
|
|
)
|
|
|
|
except Exception as e:
|
|
# 記錄登入失敗的審計日誌
|
|
audit_service.log_login(
|
|
db=db,
|
|
username=login_data.username,
|
|
ip_address=audit_service.get_client_ip(request),
|
|
success=False
|
|
)
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
@router.post("/logout", response_model=MessageResponse)
|
|
def logout(
|
|
request: Request,
|
|
current_user: Dict[str, Any] = Depends(require_auth),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
用戶登出
|
|
|
|
記錄登出審計日誌
|
|
|
|
Args:
|
|
request: HTTP Request
|
|
current_user: 當前用戶資訊
|
|
db: 資料庫 Session
|
|
|
|
Returns:
|
|
MessageResponse: 登出成功訊息
|
|
"""
|
|
# 記錄登出審計日誌
|
|
audit_service.log_logout(
|
|
db=db,
|
|
username=current_user["username"],
|
|
ip_address=audit_service.get_client_ip(request)
|
|
)
|
|
|
|
# TODO: 可選擇在 Keycloak 端撤銷 Token
|
|
# keycloak_service.openid.logout(refresh_token)
|
|
|
|
return MessageResponse(
|
|
message=f"User {current_user['username']} logged out successfully"
|
|
)
|
|
|
|
|
|
@router.post("/refresh", response_model=TokenResponse)
|
|
def refresh_token(
|
|
refresh_token: str,
|
|
):
|
|
"""
|
|
刷新 Access Token
|
|
|
|
使用 Refresh Token 獲取新的 Access Token
|
|
|
|
Args:
|
|
refresh_token: Refresh Token
|
|
|
|
Returns:
|
|
TokenResponse: 新的 Access Token 和 Refresh Token
|
|
|
|
Raises:
|
|
HTTPException: Refresh Token 無效時拋出 401
|
|
"""
|
|
try:
|
|
# 使用 Refresh Token 獲取新的 Access Token
|
|
token_response = keycloak_service.openid.refresh_token(refresh_token)
|
|
|
|
return TokenResponse(
|
|
access_token=token_response["access_token"],
|
|
token_type=token_response["token_type"],
|
|
expires_in=token_response["expires_in"],
|
|
refresh_token=token_response.get("refresh_token")
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or expired refresh token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=UserInfo)
|
|
def get_current_user_info(
|
|
current_user: Dict[str, Any] = Depends(require_auth),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
獲取當前用戶資訊
|
|
|
|
從 JWT Token 解析用戶資訊,並查詢租戶資訊
|
|
|
|
Args:
|
|
current_user: 當前用戶資訊 (從 Token 解析)
|
|
db: 資料庫 Session
|
|
|
|
Returns:
|
|
UserInfo: 用戶詳細資訊(包含租戶資訊)
|
|
"""
|
|
# 查詢用戶所屬租戶
|
|
from app.models.tenant import Tenant
|
|
from app.models.employee import Employee
|
|
|
|
tenant_info = None
|
|
|
|
# 從 email 查詢員工,取得租戶資訊
|
|
email = current_user.get("email")
|
|
if email:
|
|
employee = db.query(Employee).filter(Employee.email == email).first()
|
|
if employee and employee.tenant_id:
|
|
tenant = db.query(Tenant).filter(Tenant.id == employee.tenant_id).first()
|
|
if tenant:
|
|
tenant_info = {
|
|
"id": tenant.id,
|
|
"code": tenant.code,
|
|
"name": tenant.name,
|
|
"is_sysmana": tenant.is_sysmana
|
|
}
|
|
|
|
return UserInfo(
|
|
sub=current_user.get("sub", ""),
|
|
username=current_user.get("username", ""),
|
|
email=current_user.get("email", ""),
|
|
first_name=current_user.get("first_name"),
|
|
last_name=current_user.get("last_name"),
|
|
email_verified=current_user.get("email_verified", False),
|
|
tenant=tenant_info
|
|
)
|
|
|
|
|
|
@router.post("/change-password", response_model=MessageResponse)
|
|
def change_password(
|
|
old_password: str,
|
|
new_password: str,
|
|
current_user: Dict[str, Any] = Depends(require_auth),
|
|
db: Session = Depends(get_db),
|
|
request: Request = None,
|
|
):
|
|
"""
|
|
修改密碼
|
|
|
|
用戶修改自己的密碼
|
|
|
|
Args:
|
|
old_password: 舊密碼
|
|
new_password: 新密碼
|
|
current_user: 當前用戶資訊
|
|
db: 資料庫 Session
|
|
request: HTTP Request
|
|
|
|
Returns:
|
|
MessageResponse: 成功訊息
|
|
|
|
Raises:
|
|
HTTPException: 舊密碼錯誤或修改失敗時拋出錯誤
|
|
"""
|
|
username = current_user["username"]
|
|
|
|
try:
|
|
# 1. 驗證舊密碼
|
|
try:
|
|
keycloak_service.openid.token(username, old_password)
|
|
except:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Old password is incorrect"
|
|
)
|
|
|
|
# 2. 獲取用戶 Keycloak ID
|
|
user = keycloak_service.get_user_by_username(username)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found in Keycloak"
|
|
)
|
|
|
|
user_id = user["id"]
|
|
|
|
# 3. 重設密碼 (非臨時密碼)
|
|
success = keycloak_service.reset_password(
|
|
user_id=user_id,
|
|
new_password=new_password,
|
|
temporary=False
|
|
)
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to change password"
|
|
)
|
|
|
|
# 4. 記錄審計日誌
|
|
audit_service.log(
|
|
db=db,
|
|
action="change_password",
|
|
resource_type="authentication",
|
|
performed_by=username,
|
|
details={"success": True},
|
|
ip_address=audit_service.get_client_ip(request) if request else None
|
|
)
|
|
|
|
return MessageResponse(
|
|
message="Password changed successfully"
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to change password: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/reset-password/{username}", response_model=MessageResponse)
|
|
def reset_user_password(
|
|
username: str,
|
|
new_password: str,
|
|
temporary: bool = True,
|
|
current_user: Dict[str, Any] = Depends(require_auth),
|
|
db: Session = Depends(get_db),
|
|
request: Request = None,
|
|
):
|
|
"""
|
|
重設用戶密碼 (管理員功能)
|
|
|
|
管理員為其他用戶重設密碼
|
|
|
|
Args:
|
|
username: 目標用戶名稱
|
|
new_password: 新密碼
|
|
temporary: 是否為臨時密碼 (用戶首次登入需修改)
|
|
current_user: 當前用戶資訊 (管理員)
|
|
db: 資料庫 Session
|
|
request: HTTP Request
|
|
|
|
Returns:
|
|
MessageResponse: 成功訊息
|
|
|
|
Raises:
|
|
HTTPException: 權限不足或重設失敗時拋出錯誤
|
|
"""
|
|
# TODO: 檢查是否為管理員
|
|
# 目前暫時允許所有已認證用戶
|
|
|
|
try:
|
|
# 1. 獲取目標用戶
|
|
user = keycloak_service.get_user_by_username(username)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {username} not found"
|
|
)
|
|
|
|
user_id = user["id"]
|
|
|
|
# 2. 重設密碼
|
|
success = keycloak_service.reset_password(
|
|
user_id=user_id,
|
|
new_password=new_password,
|
|
temporary=temporary
|
|
)
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to reset password"
|
|
)
|
|
|
|
# 3. 記錄審計日誌
|
|
audit_service.log(
|
|
db=db,
|
|
action="reset_password",
|
|
resource_type="authentication",
|
|
performed_by=current_user["username"],
|
|
details={
|
|
"target_user": username,
|
|
"temporary": temporary,
|
|
"success": True
|
|
},
|
|
ip_address=audit_service.get_client_ip(request) if request else None
|
|
)
|
|
|
|
return MessageResponse(
|
|
message=f"Password reset for user {username}"
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to reset password: {str(e)}"
|
|
)
|