Major Features: - ✅ Multi-tenant architecture (tenant isolation) - ✅ Employee CRUD with lifecycle management (onboarding/offboarding) - ✅ Department tree structure with email domain management - ✅ Company info management (single-record editing) - ✅ System functions CRUD (permission management) - ✅ Email account management (multi-account per employee) - ✅ Keycloak SSO integration (auth.lab.taipei) - ✅ Redis session storage (10.1.0.254:6379) - Solves Cookie 4KB limitation - Cross-system session sharing - Sliding expiration (8 hours) - Automatic token refresh Technical Stack: Backend: - FastAPI + SQLAlchemy - PostgreSQL 16 (10.1.0.20:5433) - Keycloak Admin API integration - Docker Mailserver integration (SSH) - Alembic migrations Frontend: - Next.js 14 (App Router) - NextAuth 4 with Keycloak Provider - Redis session storage (ioredis) - Tailwind CSS Infrastructure: - Redis 7 (10.1.0.254:6379) - Session + Cache - Keycloak 26.1.0 (auth.lab.taipei) - Docker Mailserver (10.1.0.254) Architecture Highlights: - Session管理由 Keycloak + Redis 統一控制 - 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session - Token 自動刷新,異質服務整合 - 未來可無縫遷移到雲端 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
390 lines
11 KiB
Python
390 lines
11 KiB
Python
"""
|
|
角色管理 API (RBAC)
|
|
- roles: 租戶層級角色 (不綁定部門)
|
|
- role_rights: 角色對系統功能的 CRUD 權限
|
|
- user_role_assignments: 使用者角色分配 (直接對人,跨部門有效)
|
|
"""
|
|
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status, Request
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db.session import get_db
|
|
from app.api.deps import get_tenant_id, get_current_tenant
|
|
from app.models.role import UserRole, RoleRight, UserRoleAssignment
|
|
from app.models.system_function_cache import SystemFunctionCache
|
|
from app.schemas.response import MessageResponse
|
|
from app.services.audit_service import audit_service
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ========================
|
|
# 角色 CRUD
|
|
# ========================
|
|
|
|
@router.get("/")
|
|
def get_roles(
|
|
db: Session = Depends(get_db),
|
|
include_inactive: bool = False,
|
|
):
|
|
"""取得租戶的所有角色"""
|
|
tenant_id = get_current_tenant_id()
|
|
query = db.query(Role).filter(Role.tenant_id == tenant_id)
|
|
|
|
if not include_inactive:
|
|
query = query.filter(Role.is_active == True)
|
|
|
|
roles = query.order_by(Role.id).all()
|
|
|
|
return [
|
|
{
|
|
"id": r.id,
|
|
"role_code": r.role_code,
|
|
"role_name": r.role_name,
|
|
"description": r.description,
|
|
"is_active": r.is_active,
|
|
"rights_count": len(r.rights),
|
|
}
|
|
for r in roles
|
|
]
|
|
|
|
|
|
@router.get("/{role_id}")
|
|
def get_role(
|
|
role_id: int,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""取得角色詳情(含功能權限)"""
|
|
tenant_id = get_current_tenant_id()
|
|
role = db.query(Role).filter(
|
|
Role.id == role_id,
|
|
Role.tenant_id == tenant_id,
|
|
).first()
|
|
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Role with id {role_id} not found"
|
|
)
|
|
|
|
return {
|
|
"id": role.id,
|
|
"role_code": role.role_code,
|
|
"role_name": role.role_name,
|
|
"description": role.description,
|
|
"is_active": role.is_active,
|
|
"rights": [
|
|
{
|
|
"function_id": r.function_id,
|
|
"function_code": r.function.function_code if r.function else None,
|
|
"function_name": r.function.function_name if r.function else None,
|
|
"service_code": r.function.service_code if r.function else None,
|
|
"can_read": r.can_read,
|
|
"can_create": r.can_create,
|
|
"can_update": r.can_update,
|
|
"can_delete": r.can_delete,
|
|
}
|
|
for r in role.rights
|
|
],
|
|
}
|
|
|
|
|
|
@router.post("/", status_code=status.HTTP_201_CREATED)
|
|
def create_role(
|
|
data: dict,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
建立角色
|
|
|
|
Body: { "role_code": "WAREHOUSE_MANAGER", "role_name": "倉管角色", "description": "..." }
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
role_code = data.get("role_code", "").upper()
|
|
role_name = data.get("role_name", "")
|
|
|
|
if not role_code or not role_name:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="role_code and role_name are required"
|
|
)
|
|
|
|
existing = db.query(Role).filter(
|
|
Role.tenant_id == tenant_id,
|
|
Role.role_code == role_code,
|
|
).first()
|
|
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Role code '{role_code}' already exists"
|
|
)
|
|
|
|
role = Role(
|
|
tenant_id=tenant_id,
|
|
role_code=role_code,
|
|
role_name=role_name,
|
|
description=data.get("description"),
|
|
)
|
|
db.add(role)
|
|
db.commit()
|
|
db.refresh(role)
|
|
|
|
audit_service.log_action(
|
|
request=request, db=db,
|
|
action="create_role", resource_type="role", resource_id=role.id,
|
|
details={"role_code": role_code, "role_name": role_name},
|
|
)
|
|
|
|
return {"id": role.id, "role_code": role.role_code, "role_name": role.role_name}
|
|
|
|
|
|
@router.delete("/{role_id}", response_model=MessageResponse)
|
|
def deactivate_role(
|
|
role_id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""停用角色"""
|
|
tenant_id = get_current_tenant_id()
|
|
role = db.query(Role).filter(
|
|
Role.id == role_id,
|
|
Role.tenant_id == tenant_id,
|
|
).first()
|
|
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Role with id {role_id} not found"
|
|
)
|
|
|
|
role.is_active = False
|
|
db.commit()
|
|
|
|
return MessageResponse(message=f"Role '{role.role_name}' has been deactivated")
|
|
|
|
|
|
# ========================
|
|
# 角色功能權限
|
|
# ========================
|
|
|
|
@router.put("/{role_id}/rights")
|
|
def set_role_rights(
|
|
role_id: int,
|
|
rights: list,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
設定角色的功能權限 (整體替換)
|
|
|
|
Body: [
|
|
{"function_id": 1, "can_read": true, "can_create": false, "can_update": false, "can_delete": false},
|
|
...
|
|
]
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
role = db.query(Role).filter(
|
|
Role.id == role_id,
|
|
Role.tenant_id == tenant_id,
|
|
).first()
|
|
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Role with id {role_id} not found"
|
|
)
|
|
|
|
# 刪除舊的權限
|
|
db.query(RoleRight).filter(RoleRight.role_id == role_id).delete()
|
|
|
|
# 新增新的權限
|
|
for r in rights:
|
|
function_id = r.get("function_id")
|
|
fn = db.query(SystemFunctionCache).filter(
|
|
SystemFunctionCache.id == function_id
|
|
).first()
|
|
|
|
if not fn:
|
|
continue
|
|
|
|
right = RoleRight(
|
|
role_id=role_id,
|
|
function_id=function_id,
|
|
can_read=r.get("can_read", False),
|
|
can_create=r.get("can_create", False),
|
|
can_update=r.get("can_update", False),
|
|
can_delete=r.get("can_delete", False),
|
|
)
|
|
db.add(right)
|
|
|
|
db.commit()
|
|
|
|
audit_service.log_action(
|
|
request=request, db=db,
|
|
action="update_role_rights", resource_type="role", resource_id=role_id,
|
|
details={"rights_count": len(rights)},
|
|
)
|
|
|
|
return {"message": f"Role rights updated", "rights_count": len(rights)}
|
|
|
|
|
|
# ========================
|
|
# 使用者角色分配
|
|
# ========================
|
|
|
|
@router.get("/user-assignments/")
|
|
def get_user_role_assignments(
|
|
db: Session = Depends(get_db),
|
|
keycloak_user_id: Optional[str] = Query(None, description="Keycloak User UUID"),
|
|
):
|
|
"""取得使用者角色分配"""
|
|
tenant_id = get_current_tenant_id()
|
|
query = db.query(UserRoleAssignment).filter(
|
|
UserRoleAssignment.tenant_id == tenant_id,
|
|
UserRoleAssignment.is_active == True,
|
|
)
|
|
|
|
if keycloak_user_id:
|
|
query = query.filter(UserRoleAssignment.keycloak_user_id == keycloak_user_id)
|
|
|
|
assignments = query.all()
|
|
|
|
return [
|
|
{
|
|
"id": a.id,
|
|
"keycloak_user_id": a.keycloak_user_id,
|
|
"role_id": a.role_id,
|
|
"role_code": a.role.role_code if a.role else None,
|
|
"role_name": a.role.role_name if a.role else None,
|
|
"assigned_at": a.assigned_at,
|
|
}
|
|
for a in assignments
|
|
]
|
|
|
|
|
|
@router.post("/user-assignments/", status_code=status.HTTP_201_CREATED)
|
|
def assign_role_to_user(
|
|
data: dict,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
分配角色給使用者 (直接對人,跨部門有效)
|
|
|
|
Body: { "keycloak_user_id": "uuid", "role_id": 1 }
|
|
"""
|
|
tenant_id = get_current_tenant_id()
|
|
keycloak_user_id = data.get("keycloak_user_id")
|
|
role_id = data.get("role_id")
|
|
|
|
if not keycloak_user_id or not role_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="keycloak_user_id and role_id are required"
|
|
)
|
|
|
|
role = db.query(Role).filter(
|
|
Role.id == role_id,
|
|
Role.tenant_id == tenant_id,
|
|
).first()
|
|
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Role with id {role_id} not found"
|
|
)
|
|
|
|
existing = db.query(UserRoleAssignment).filter(
|
|
UserRoleAssignment.keycloak_user_id == keycloak_user_id,
|
|
UserRoleAssignment.role_id == role_id,
|
|
).first()
|
|
|
|
if existing:
|
|
if existing.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="User already has this role assigned"
|
|
)
|
|
existing.is_active = True
|
|
db.commit()
|
|
return {"message": "Role assignment reactivated", "id": existing.id}
|
|
|
|
assignment = UserRoleAssignment(
|
|
tenant_id=tenant_id,
|
|
keycloak_user_id=keycloak_user_id,
|
|
role_id=role_id,
|
|
)
|
|
db.add(assignment)
|
|
db.commit()
|
|
db.refresh(assignment)
|
|
|
|
audit_service.log_action(
|
|
request=request, db=db,
|
|
action="assign_role", resource_type="user_role_assignment", resource_id=assignment.id,
|
|
details={"keycloak_user_id": keycloak_user_id, "role_id": role_id, "role_code": role.role_code},
|
|
)
|
|
|
|
return {"id": assignment.id, "keycloak_user_id": keycloak_user_id, "role_id": role_id}
|
|
|
|
|
|
@router.delete("/user-assignments/{assignment_id}", response_model=MessageResponse)
|
|
def revoke_role_from_user(
|
|
assignment_id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""撤銷使用者角色"""
|
|
tenant_id = get_current_tenant_id()
|
|
assignment = db.query(UserRoleAssignment).filter(
|
|
UserRoleAssignment.id == assignment_id,
|
|
UserRoleAssignment.tenant_id == tenant_id,
|
|
).first()
|
|
|
|
if not assignment:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Assignment with id {assignment_id} not found"
|
|
)
|
|
|
|
assignment.is_active = False
|
|
db.commit()
|
|
|
|
audit_service.log_action(
|
|
request=request, db=db,
|
|
action="revoke_role", resource_type="user_role_assignment", resource_id=assignment_id,
|
|
details={"keycloak_user_id": assignment.keycloak_user_id, "role_id": assignment.role_id},
|
|
)
|
|
|
|
return MessageResponse(message="Role assignment revoked")
|
|
|
|
|
|
# ========================
|
|
# 系統功能查詢
|
|
# ========================
|
|
|
|
@router.get("/system-functions/")
|
|
def get_system_functions(
|
|
db: Session = Depends(get_db),
|
|
service_code: Optional[str] = Query(None, description="服務代碼篩選: hr/erp/mail/ai"),
|
|
):
|
|
"""取得系統功能清單 (從快取表)"""
|
|
query = db.query(SystemFunctionCache).filter(SystemFunctionCache.is_active == True)
|
|
|
|
if service_code:
|
|
query = query.filter(SystemFunctionCache.service_code == service_code)
|
|
|
|
functions = query.order_by(SystemFunctionCache.service_code, SystemFunctionCache.id).all()
|
|
|
|
return [
|
|
{
|
|
"id": f.id,
|
|
"service_code": f.service_code,
|
|
"function_code": f.function_code,
|
|
"function_name": f.function_name,
|
|
"function_category": f.function_category,
|
|
}
|
|
for f in functions
|
|
]
|