Major Features: - ✅ Multi-tenant architecture (tenant isolation) - ✅ Employee CRUD with lifecycle management (onboarding/offboarding) - ✅ Department tree structure with email domain management - ✅ Company info management (single-record editing) - ✅ System functions CRUD (permission management) - ✅ Email account management (multi-account per employee) - ✅ Keycloak SSO integration (auth.lab.taipei) - ✅ Redis session storage (10.1.0.254:6379) - Solves Cookie 4KB limitation - Cross-system session sharing - Sliding expiration (8 hours) - Automatic token refresh Technical Stack: Backend: - FastAPI + SQLAlchemy - PostgreSQL 16 (10.1.0.20:5433) - Keycloak Admin API integration - Docker Mailserver integration (SSH) - Alembic migrations Frontend: - Next.js 14 (App Router) - NextAuth 4 with Keycloak Provider - Redis session storage (ioredis) - Tailwind CSS Infrastructure: - Redis 7 (10.1.0.254:6379) - Session + Cache - Keycloak 26.1.0 (auth.lab.taipei) - Docker Mailserver (10.1.0.254) Architecture Highlights: - Session管理由 Keycloak + Redis 統一控制 - 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session - Token 自動刷新,異質服務整合 - 未來可無縫遷移到雲端 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
543 lines
16 KiB
Python
543 lines
16 KiB
Python
"""
|
|
系統權限管理 API
|
|
管理員工對各系統 (Gitea, Portainer, Traefik, Keycloak) 的存取權限
|
|
"""
|
|
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status, Request
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from sqlalchemy import and_
|
|
|
|
from app.db.session import get_db
|
|
from app.models.employee import Employee
|
|
from app.models.permission import Permission
|
|
from app.schemas.permission import (
|
|
PermissionCreate,
|
|
PermissionUpdate,
|
|
PermissionResponse,
|
|
PermissionListItem,
|
|
PermissionBatchCreate,
|
|
PermissionFilter,
|
|
VALID_SYSTEMS,
|
|
VALID_ACCESS_LEVELS,
|
|
)
|
|
from app.schemas.base import PaginationParams, PaginatedResponse
|
|
from app.schemas.response import SuccessResponse, MessageResponse
|
|
from app.api.deps import get_pagination_params
|
|
from app.services.audit_service import audit_service
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def get_current_tenant_id() -> int:
|
|
"""取得當前租戶 ID (暫時寫死為 1, 未來從 JWT token 取得)"""
|
|
return 1
|
|
|
|
|
|
@router.get("/", response_model=PaginatedResponse)
|
|
def get_permissions(
|
|
db: Session = Depends(get_db),
|
|
pagination: PaginationParams = Depends(get_pagination_params),
|
|
filter_params: PermissionFilter = Depends(),
|
|
):
|
|
"""
|
|
獲取權限列表
|
|
|
|
支援:
|
|
- 分頁
|
|
- 員工篩選
|
|
- 系統名稱篩選
|
|
- 存取層級篩選
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
query = db.query(Permission).filter(Permission.tenant_id == tenant_id)
|
|
|
|
# 員工篩選
|
|
if filter_params.employee_id:
|
|
query = query.filter(Permission.employee_id == filter_params.employee_id)
|
|
|
|
# 系統名稱篩選
|
|
if filter_params.system_name:
|
|
query = query.filter(Permission.system_name == filter_params.system_name)
|
|
|
|
# 存取層級篩選
|
|
if filter_params.access_level:
|
|
query = query.filter(Permission.access_level == filter_params.access_level)
|
|
|
|
# 總數
|
|
total = query.count()
|
|
|
|
# 分頁
|
|
offset = (pagination.page - 1) * pagination.page_size
|
|
permissions = (
|
|
query.options(joinedload(Permission.employee))
|
|
.offset(offset)
|
|
.limit(pagination.page_size)
|
|
.all()
|
|
)
|
|
|
|
# 計算總頁數
|
|
total_pages = (total + pagination.page_size - 1) // pagination.page_size
|
|
|
|
# 組裝回應資料
|
|
items = []
|
|
for perm in permissions:
|
|
item = PermissionListItem.model_validate(perm)
|
|
item.employee_name = perm.employee.legal_name if perm.employee else None
|
|
item.employee_number = perm.employee.employee_id if perm.employee else None
|
|
items.append(item)
|
|
|
|
return PaginatedResponse(
|
|
total=total,
|
|
page=pagination.page,
|
|
page_size=pagination.page_size,
|
|
total_pages=total_pages,
|
|
items=items,
|
|
)
|
|
|
|
|
|
@router.get("/systems", response_model=dict)
|
|
def get_available_systems_route():
|
|
"""
|
|
取得所有可授權的系統列表 (必須在 /{permission_id} 之前定義)
|
|
"""
|
|
return {
|
|
"systems": VALID_SYSTEMS,
|
|
"access_levels": VALID_ACCESS_LEVELS,
|
|
"system_descriptions": {
|
|
"gitea": "Git 程式碼託管系統",
|
|
"portainer": "Docker 容器管理系統",
|
|
"traefik": "反向代理與路由系統",
|
|
"keycloak": "SSO 身份認證系統",
|
|
},
|
|
"access_level_descriptions": {
|
|
"admin": "完整管理權限",
|
|
"user": "一般使用者權限",
|
|
"readonly": "唯讀權限",
|
|
},
|
|
}
|
|
|
|
|
|
@router.get("/{permission_id}", response_model=PermissionResponse)
|
|
def get_permission(
|
|
permission_id: int,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
獲取權限詳情
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
permission = (
|
|
db.query(Permission)
|
|
.options(
|
|
joinedload(Permission.employee),
|
|
joinedload(Permission.granter),
|
|
)
|
|
.filter(
|
|
Permission.id == permission_id,
|
|
Permission.tenant_id == tenant_id,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if not permission:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Permission with id {permission_id} not found",
|
|
)
|
|
|
|
# 組裝回應資料
|
|
response = PermissionResponse.model_validate(permission)
|
|
response.employee_name = permission.employee.legal_name if permission.employee else None
|
|
response.employee_number = permission.employee.employee_id if permission.employee else None
|
|
response.granted_by_name = permission.granter.legal_name if permission.granter else None
|
|
|
|
return response
|
|
|
|
|
|
@router.post("/", response_model=PermissionResponse, status_code=status.HTTP_201_CREATED)
|
|
def create_permission(
|
|
permission_data: PermissionCreate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
創建權限
|
|
|
|
注意:
|
|
- 員工必須存在
|
|
- 每個員工對每個系統只能有一個權限 (unique constraint)
|
|
- 系統名稱: gitea, portainer, traefik, keycloak
|
|
- 存取層級: admin, user, readonly
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
|
|
# 檢查員工是否存在
|
|
employee = db.query(Employee).filter(
|
|
Employee.id == permission_data.employee_id,
|
|
Employee.tenant_id == tenant_id,
|
|
).first()
|
|
|
|
if not employee:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Employee with id {permission_data.employee_id} not found",
|
|
)
|
|
|
|
# 檢查是否已有該系統的權限
|
|
existing = db.query(Permission).filter(
|
|
and_(
|
|
Permission.employee_id == permission_data.employee_id,
|
|
Permission.system_name == permission_data.system_name,
|
|
Permission.tenant_id == tenant_id,
|
|
)
|
|
).first()
|
|
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Permission for system '{permission_data.system_name}' already exists for this employee",
|
|
)
|
|
|
|
# 檢查授予人是否存在 (如果有提供)
|
|
if permission_data.granted_by:
|
|
granter = db.query(Employee).filter(
|
|
Employee.id == permission_data.granted_by,
|
|
Employee.tenant_id == tenant_id,
|
|
).first()
|
|
if not granter:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Granter with id {permission_data.granted_by} not found",
|
|
)
|
|
|
|
# 創建權限
|
|
permission = Permission(
|
|
tenant_id=tenant_id,
|
|
employee_id=permission_data.employee_id,
|
|
system_name=permission_data.system_name,
|
|
access_level=permission_data.access_level,
|
|
granted_by=permission_data.granted_by,
|
|
)
|
|
|
|
db.add(permission)
|
|
db.commit()
|
|
db.refresh(permission)
|
|
|
|
# 重新載入關聯資料
|
|
db.refresh(permission)
|
|
permission = (
|
|
db.query(Permission)
|
|
.options(
|
|
joinedload(Permission.employee),
|
|
joinedload(Permission.granter),
|
|
)
|
|
.filter(Permission.id == permission.id)
|
|
.first()
|
|
)
|
|
|
|
# 記錄審計日誌
|
|
audit_service.log_action(
|
|
request=request,
|
|
db=db,
|
|
action="create_permission",
|
|
resource_type="permission",
|
|
resource_id=permission.id,
|
|
details={
|
|
"employee_id": permission.employee_id,
|
|
"system_name": permission.system_name,
|
|
"access_level": permission.access_level,
|
|
"granted_by": permission.granted_by,
|
|
},
|
|
)
|
|
|
|
# 組裝回應資料
|
|
response = PermissionResponse.model_validate(permission)
|
|
response.employee_name = employee.legal_name
|
|
response.employee_number = employee.employee_id
|
|
response.granted_by_name = permission.granter.legal_name if permission.granter else None
|
|
|
|
return response
|
|
|
|
|
|
@router.put("/{permission_id}", response_model=PermissionResponse)
|
|
def update_permission(
|
|
permission_id: int,
|
|
permission_data: PermissionUpdate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
更新權限
|
|
|
|
可更新:
|
|
- 存取層級 (admin, user, readonly)
|
|
- 授予人
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
permission = (
|
|
db.query(Permission)
|
|
.options(
|
|
joinedload(Permission.employee),
|
|
joinedload(Permission.granter),
|
|
)
|
|
.filter(
|
|
Permission.id == permission_id,
|
|
Permission.tenant_id == tenant_id,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if not permission:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Permission with id {permission_id} not found",
|
|
)
|
|
|
|
# 檢查授予人是否存在 (如果有提供)
|
|
if permission_data.granted_by:
|
|
granter = db.query(Employee).filter(
|
|
Employee.id == permission_data.granted_by,
|
|
Employee.tenant_id == tenant_id,
|
|
).first()
|
|
if not granter:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Granter with id {permission_data.granted_by} not found",
|
|
)
|
|
|
|
# 記錄變更前的值
|
|
old_access_level = permission.access_level
|
|
old_granted_by = permission.granted_by
|
|
|
|
# 更新欄位
|
|
permission.access_level = permission_data.access_level
|
|
if permission_data.granted_by is not None:
|
|
permission.granted_by = permission_data.granted_by
|
|
|
|
db.commit()
|
|
db.refresh(permission)
|
|
|
|
# 記錄審計日誌
|
|
audit_service.log_action(
|
|
request=request,
|
|
db=db,
|
|
action="update_permission",
|
|
resource_type="permission",
|
|
resource_id=permission.id,
|
|
details={
|
|
"employee_id": permission.employee_id,
|
|
"system_name": permission.system_name,
|
|
"changes": {
|
|
"access_level": {"from": old_access_level, "to": permission.access_level},
|
|
"granted_by": {"from": old_granted_by, "to": permission.granted_by},
|
|
},
|
|
},
|
|
)
|
|
|
|
# 組裝回應資料
|
|
response = PermissionResponse.model_validate(permission)
|
|
response.employee_name = permission.employee.legal_name if permission.employee else None
|
|
response.employee_number = permission.employee.employee_id if permission.employee else None
|
|
response.granted_by_name = permission.granter.legal_name if permission.granter else None
|
|
|
|
return response
|
|
|
|
|
|
@router.delete("/{permission_id}", response_model=MessageResponse)
|
|
def delete_permission(
|
|
permission_id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
刪除權限
|
|
|
|
撤銷員工對某系統的存取權限
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
permission = db.query(Permission).filter(
|
|
Permission.id == permission_id,
|
|
Permission.tenant_id == tenant_id,
|
|
).first()
|
|
|
|
if not permission:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Permission with id {permission_id} not found",
|
|
)
|
|
|
|
# 記錄審計日誌 (在刪除前)
|
|
audit_service.log_action(
|
|
request=request,
|
|
db=db,
|
|
action="delete_permission",
|
|
resource_type="permission",
|
|
resource_id=permission.id,
|
|
details={
|
|
"employee_id": permission.employee_id,
|
|
"system_name": permission.system_name,
|
|
"access_level": permission.access_level,
|
|
},
|
|
)
|
|
|
|
# 刪除權限
|
|
db.delete(permission)
|
|
db.commit()
|
|
|
|
return MessageResponse(
|
|
message=f"Permission for system {permission.system_name} has been revoked"
|
|
)
|
|
|
|
|
|
@router.get("/employees/{employee_id}/permissions", response_model=List[PermissionResponse])
|
|
def get_employee_permissions(
|
|
employee_id: int,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
取得員工的所有系統權限
|
|
|
|
回傳該員工可以存取的所有系統及其權限層級
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
|
|
# 檢查員工是否存在
|
|
employee = db.query(Employee).filter(
|
|
Employee.id == employee_id,
|
|
Employee.tenant_id == tenant_id,
|
|
).first()
|
|
|
|
if not employee:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Employee with id {employee_id} not found",
|
|
)
|
|
|
|
# 查詢員工的所有權限
|
|
permissions = (
|
|
db.query(Permission)
|
|
.options(joinedload(Permission.granter))
|
|
.filter(
|
|
Permission.employee_id == employee_id,
|
|
Permission.tenant_id == tenant_id,
|
|
)
|
|
.all()
|
|
)
|
|
|
|
# 組裝回應資料
|
|
result = []
|
|
for perm in permissions:
|
|
response = PermissionResponse.model_validate(perm)
|
|
response.employee_name = employee.legal_name
|
|
response.employee_number = employee.employee_id
|
|
response.granted_by_name = perm.granter.legal_name if perm.granter else None
|
|
result.append(response)
|
|
|
|
return result
|
|
|
|
|
|
@router.post("/batch", response_model=List[PermissionResponse], status_code=status.HTTP_201_CREATED)
|
|
def create_permissions_batch(
|
|
batch_data: PermissionBatchCreate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
批量創建權限
|
|
|
|
一次為一個員工授予多個系統的權限
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
|
|
# 檢查員工是否存在
|
|
employee = db.query(Employee).filter(
|
|
Employee.id == batch_data.employee_id,
|
|
Employee.tenant_id == tenant_id,
|
|
).first()
|
|
|
|
if not employee:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Employee with id {batch_data.employee_id} not found",
|
|
)
|
|
|
|
# 檢查授予人是否存在 (如果有提供)
|
|
granter = None
|
|
if batch_data.granted_by:
|
|
granter = db.query(Employee).filter(
|
|
Employee.id == batch_data.granted_by,
|
|
Employee.tenant_id == tenant_id,
|
|
).first()
|
|
if not granter:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Granter with id {batch_data.granted_by} not found",
|
|
)
|
|
|
|
# 創建權限列表
|
|
created_permissions = []
|
|
for perm_data in batch_data.permissions:
|
|
# 檢查是否已有該系統的權限
|
|
existing = db.query(Permission).filter(
|
|
and_(
|
|
Permission.employee_id == batch_data.employee_id,
|
|
Permission.system_name == perm_data.system_name,
|
|
Permission.tenant_id == tenant_id,
|
|
)
|
|
).first()
|
|
|
|
if existing:
|
|
# 跳過已存在的權限
|
|
continue
|
|
|
|
# 創建權限
|
|
permission = Permission(
|
|
tenant_id=tenant_id,
|
|
employee_id=batch_data.employee_id,
|
|
system_name=perm_data.system_name,
|
|
access_level=perm_data.access_level,
|
|
granted_by=batch_data.granted_by,
|
|
)
|
|
|
|
db.add(permission)
|
|
created_permissions.append(permission)
|
|
|
|
db.commit()
|
|
|
|
# 刷新所有創建的權限
|
|
for perm in created_permissions:
|
|
db.refresh(perm)
|
|
|
|
# 記錄審計日誌
|
|
audit_service.log_action(
|
|
request=request,
|
|
db=db,
|
|
action="create_permissions_batch",
|
|
resource_type="permission",
|
|
resource_id=batch_data.employee_id,
|
|
details={
|
|
"employee_id": batch_data.employee_id,
|
|
"granted_by": batch_data.granted_by,
|
|
"permissions": [
|
|
{
|
|
"system_name": perm.system_name,
|
|
"access_level": perm.access_level,
|
|
}
|
|
for perm in created_permissions
|
|
],
|
|
},
|
|
)
|
|
|
|
# 組裝回應資料
|
|
result = []
|
|
for perm in created_permissions:
|
|
response = PermissionResponse.model_validate(perm)
|
|
response.employee_name = employee.legal_name
|
|
response.employee_number = employee.employee_id
|
|
response.granted_by_name = granter.legal_name if granter else None
|
|
result.append(response)
|
|
|
|
return result
|
|
|
|
|