Files
hr-portal/backend/app/api/v1/identities.py
Porsche Chen 360533393f feat: HR Portal - Complete Multi-Tenant System with Redis Session Storage
Major Features:
-  Multi-tenant architecture (tenant isolation)
-  Employee CRUD with lifecycle management (onboarding/offboarding)
-  Department tree structure with email domain management
-  Company info management (single-record editing)
-  System functions CRUD (permission management)
-  Email account management (multi-account per employee)
-  Keycloak SSO integration (auth.lab.taipei)
-  Redis session storage (10.1.0.254:6379)
  - Solves Cookie 4KB limitation
  - Cross-system session sharing
  - Sliding expiration (8 hours)
  - Automatic token refresh

Technical Stack:
Backend:
- FastAPI + SQLAlchemy
- PostgreSQL 16 (10.1.0.20:5433)
- Keycloak Admin API integration
- Docker Mailserver integration (SSH)
- Alembic migrations

Frontend:
- Next.js 14 (App Router)
- NextAuth 4 with Keycloak Provider
- Redis session storage (ioredis)
- Tailwind CSS

Infrastructure:
- Redis 7 (10.1.0.254:6379) - Session + Cache
- Keycloak 26.1.0 (auth.lab.taipei)
- Docker Mailserver (10.1.0.254)

Architecture Highlights:
- Session管理由 Keycloak + Redis 統一控制
- 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session
- Token 自動刷新,異質服務整合
- 未來可無縫遷移到雲端

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-23 20:12:43 +08:00

365 lines
11 KiB
Python

"""
員工身份管理 API
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.employee import Employee
from app.models.employee_identity import EmployeeIdentity
from app.models.business_unit import BusinessUnit
from app.models.department import Department
from app.schemas.employee_identity import (
EmployeeIdentityCreate,
EmployeeIdentityUpdate,
EmployeeIdentityResponse,
EmployeeIdentityListItem,
)
from app.schemas.response import MessageResponse
router = APIRouter()
@router.get("/", response_model=List[EmployeeIdentityListItem])
def get_identities(
db: Session = Depends(get_db),
employee_id: Optional[int] = Query(None, description="員工 ID 篩選"),
business_unit_id: Optional[int] = Query(None, description="事業部 ID 篩選"),
department_id: Optional[int] = Query(None, description="部門 ID 篩選"),
is_active: Optional[bool] = Query(None, description="是否活躍"),
):
"""
獲取員工身份列表
支援多種篩選條件
"""
query = db.query(EmployeeIdentity)
if employee_id:
query = query.filter(EmployeeIdentity.employee_id == employee_id)
if business_unit_id:
query = query.filter(EmployeeIdentity.business_unit_id == business_unit_id)
if department_id:
query = query.filter(EmployeeIdentity.department_id == department_id)
if is_active is not None:
query = query.filter(EmployeeIdentity.is_active == is_active)
identities = query.order_by(
EmployeeIdentity.employee_id,
EmployeeIdentity.is_primary.desc()
).all()
return [EmployeeIdentityListItem.model_validate(identity) for identity in identities]
@router.get("/{identity_id}", response_model=EmployeeIdentityResponse)
def get_identity(
identity_id: int,
db: Session = Depends(get_db),
):
"""
獲取員工身份詳情
"""
identity = db.query(EmployeeIdentity).filter(
EmployeeIdentity.id == identity_id
).first()
if not identity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Identity with id {identity_id} not found"
)
response = EmployeeIdentityResponse.model_validate(identity)
response.employee_name = identity.employee.legal_name
response.business_unit_name = identity.business_unit.name
response.email_domain = identity.business_unit.email_domain
if identity.department:
response.department_name = identity.department.name
return response
@router.post("/", response_model=EmployeeIdentityResponse, status_code=status.HTTP_201_CREATED)
def create_identity(
identity_data: EmployeeIdentityCreate,
db: Session = Depends(get_db),
):
"""
創建員工身份
自動生成 SSO 帳號:
- 格式: {username_base}@{email_domain}
- 需要生成 Keycloak UUID (TODO)
檢查:
- 員工是否存在
- 事業部是否存在
- 部門是否存在 (如果指定)
- 同一員工在同一事業部只能有一個身份
"""
# 檢查員工是否存在
employee = db.query(Employee).filter(
Employee.id == identity_data.employee_id
).first()
if not employee:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Employee with id {identity_data.employee_id} not found"
)
# 檢查事業部是否存在
business_unit = db.query(BusinessUnit).filter(
BusinessUnit.id == identity_data.business_unit_id
).first()
if not business_unit:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Business unit with id {identity_data.business_unit_id} not found"
)
# 檢查部門是否存在 (如果指定)
if identity_data.department_id:
department = db.query(Department).filter(
Department.id == identity_data.department_id,
Department.business_unit_id == identity_data.business_unit_id
).first()
if not department:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Department with id {identity_data.department_id} not found in this business unit"
)
# 檢查同一員工在同一事業部是否已有身份
existing = db.query(EmployeeIdentity).filter(
EmployeeIdentity.employee_id == identity_data.employee_id,
EmployeeIdentity.business_unit_id == identity_data.business_unit_id
).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Employee already has an identity in this business unit"
)
# 生成 SSO 帳號
username = f"{employee.username_base}@{business_unit.email_domain}"
# 檢查 SSO 帳號是否已存在
existing_username = db.query(EmployeeIdentity).filter(
EmployeeIdentity.username == username
).first()
if existing_username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Username '{username}' already exists"
)
# TODO: 從 Keycloak 創建帳號並獲取 UUID
keycloak_id = f"temp-uuid-{employee.id}-{business_unit.id}"
# 創建身份
identity = EmployeeIdentity(
employee_id=identity_data.employee_id,
username=username,
keycloak_id=keycloak_id,
business_unit_id=identity_data.business_unit_id,
department_id=identity_data.department_id,
job_title=identity_data.job_title,
job_level=identity_data.job_level,
is_primary=identity_data.is_primary,
email_quota_mb=identity_data.email_quota_mb,
started_at=identity_data.started_at,
)
# 如果設為主要身份,取消其他主要身份
if identity_data.is_primary:
db.query(EmployeeIdentity).filter(
EmployeeIdentity.employee_id == identity_data.employee_id
).update({"is_primary": False})
db.add(identity)
db.commit()
db.refresh(identity)
# TODO: 創建審計日誌
# TODO: 創建 Keycloak 帳號
# TODO: 創建郵件帳號
response = EmployeeIdentityResponse.model_validate(identity)
response.employee_name = employee.legal_name
response.business_unit_name = business_unit.name
response.email_domain = business_unit.email_domain
if identity.department:
response.department_name = identity.department.name
return response
@router.put("/{identity_id}", response_model=EmployeeIdentityResponse)
def update_identity(
identity_id: int,
identity_data: EmployeeIdentityUpdate,
db: Session = Depends(get_db),
):
"""
更新員工身份
"""
identity = db.query(EmployeeIdentity).filter(
EmployeeIdentity.id == identity_id
).first()
if not identity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Identity with id {identity_id} not found"
)
# 檢查部門是否屬於同一事業部 (如果更新部門)
if identity_data.department_id:
department = db.query(Department).filter(
Department.id == identity_data.department_id,
Department.business_unit_id == identity.business_unit_id
).first()
if not department:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Department does not belong to this business unit"
)
# 更新欄位
update_data = identity_data.model_dump(exclude_unset=True)
# 如果設為主要身份,取消其他主要身份
if update_data.get("is_primary"):
db.query(EmployeeIdentity).filter(
EmployeeIdentity.employee_id == identity.employee_id,
EmployeeIdentity.id != identity_id
).update({"is_primary": False})
for field, value in update_data.items():
setattr(identity, field, value)
db.commit()
db.refresh(identity)
# TODO: 創建審計日誌
# TODO: 更新 NAS 配額 (如果職級變更)
response = EmployeeIdentityResponse.model_validate(identity)
response.employee_name = identity.employee.legal_name
response.business_unit_name = identity.business_unit.name
response.email_domain = identity.business_unit.email_domain
if identity.department:
response.department_name = identity.department.name
return response
@router.delete("/{identity_id}", response_model=MessageResponse)
def delete_identity(
identity_id: int,
db: Session = Depends(get_db),
):
"""
刪除員工身份
注意:
- 如果是員工的最後一個身份,無法刪除
- 刪除後會停用對應的 Keycloak 和郵件帳號
"""
identity = db.query(EmployeeIdentity).filter(
EmployeeIdentity.id == identity_id
).first()
if not identity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Identity with id {identity_id} not found"
)
# 檢查是否為員工的最後一個身份
total_identities = db.query(EmployeeIdentity).filter(
EmployeeIdentity.employee_id == identity.employee_id
).count()
if total_identities == 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete employee's last identity. Please terminate the employee instead."
)
# 軟刪除 (停用)
identity.is_active = False
identity.ended_at = db.func.current_date()
db.commit()
# TODO: 創建審計日誌
# TODO: 停用 Keycloak 帳號
# TODO: 停用郵件帳號
return MessageResponse(
message=f"Identity '{identity.username}' has been deactivated"
)
@router.post("/{identity_id}/set-primary", response_model=EmployeeIdentityResponse)
def set_primary_identity(
identity_id: int,
db: Session = Depends(get_db),
):
"""
設定為主要身份
將指定的身份設為員工的主要身份,並取消其他身份的主要狀態
"""
identity = db.query(EmployeeIdentity).filter(
EmployeeIdentity.id == identity_id
).first()
if not identity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Identity with id {identity_id} not found"
)
# 檢查身份是否已停用
if not identity.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot set inactive identity as primary"
)
# 取消同一員工的其他主要身份
db.query(EmployeeIdentity).filter(
EmployeeIdentity.employee_id == identity.employee_id,
EmployeeIdentity.id != identity_id
).update({"is_primary": False})
# 設為主要身份
identity.is_primary = True
db.commit()
db.refresh(identity)
# TODO: 創建審計日誌
response = EmployeeIdentityResponse.model_validate(identity)
response.employee_name = identity.employee.legal_name
response.business_unit_name = identity.business_unit.name
response.email_domain = identity.business_unit.email_domain
if identity.department:
response.department_name = identity.department.name
return response