""" 角色管理 API (RBAC) - roles: 租戶層級角色 (不綁定部門) - role_rights: 角色對系統功能的 CRUD 權限 - user_role_assignments: 使用者角色分配 (直接對人,跨部門有效) """ from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query, status, Request from sqlalchemy.orm import Session from app.db.session import get_db from app.api.deps import get_tenant_id, get_current_tenant from app.models.role import UserRole, RoleRight, UserRoleAssignment from app.models.system_function_cache import SystemFunctionCache from app.schemas.response import MessageResponse from app.services.audit_service import audit_service router = APIRouter() # ======================== # 角色 CRUD # ======================== @router.get("/") def get_roles( db: Session = Depends(get_db), include_inactive: bool = False, ): """取得租戶的所有角色""" tenant_id = get_current_tenant_id() query = db.query(Role).filter(Role.tenant_id == tenant_id) if not include_inactive: query = query.filter(Role.is_active == True) roles = query.order_by(Role.id).all() return [ { "id": r.id, "role_code": r.role_code, "role_name": r.role_name, "description": r.description, "is_active": r.is_active, "rights_count": len(r.rights), } for r in roles ] @router.get("/{role_id}") def get_role( role_id: int, db: Session = Depends(get_db), ): """取得角色詳情(含功能權限)""" tenant_id = get_current_tenant_id() role = db.query(Role).filter( Role.id == role_id, Role.tenant_id == tenant_id, ).first() if not role: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Role with id {role_id} not found" ) return { "id": role.id, "role_code": role.role_code, "role_name": role.role_name, "description": role.description, "is_active": role.is_active, "rights": [ { "function_id": r.function_id, "function_code": r.function.function_code if r.function else None, "function_name": r.function.function_name if r.function else None, "service_code": r.function.service_code if r.function else None, "can_read": r.can_read, "can_create": r.can_create, "can_update": r.can_update, "can_delete": r.can_delete, } for r in role.rights ], } @router.post("/", status_code=status.HTTP_201_CREATED) def create_role( data: dict, request: Request, db: Session = Depends(get_db), ): """ 建立角色 Body: { "role_code": "WAREHOUSE_MANAGER", "role_name": "倉管角色", "description": "..." } """ tenant_id = get_current_tenant_id() role_code = data.get("role_code", "").upper() role_name = data.get("role_name", "") if not role_code or not role_name: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="role_code and role_name are required" ) existing = db.query(Role).filter( Role.tenant_id == tenant_id, Role.role_code == role_code, ).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Role code '{role_code}' already exists" ) role = Role( tenant_id=tenant_id, role_code=role_code, role_name=role_name, description=data.get("description"), ) db.add(role) db.commit() db.refresh(role) audit_service.log_action( request=request, db=db, action="create_role", resource_type="role", resource_id=role.id, details={"role_code": role_code, "role_name": role_name}, ) return {"id": role.id, "role_code": role.role_code, "role_name": role.role_name} @router.delete("/{role_id}", response_model=MessageResponse) def deactivate_role( role_id: int, request: Request, db: Session = Depends(get_db), ): """停用角色""" tenant_id = get_current_tenant_id() role = db.query(Role).filter( Role.id == role_id, Role.tenant_id == tenant_id, ).first() if not role: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Role with id {role_id} not found" ) role.is_active = False db.commit() return MessageResponse(message=f"Role '{role.role_name}' has been deactivated") # ======================== # 角色功能權限 # ======================== @router.put("/{role_id}/rights") def set_role_rights( role_id: int, rights: list, request: Request, db: Session = Depends(get_db), ): """ 設定角色的功能權限 (整體替換) Body: [ {"function_id": 1, "can_read": true, "can_create": false, "can_update": false, "can_delete": false}, ... ] """ tenant_id = get_current_tenant_id() role = db.query(Role).filter( Role.id == role_id, Role.tenant_id == tenant_id, ).first() if not role: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Role with id {role_id} not found" ) # 刪除舊的權限 db.query(RoleRight).filter(RoleRight.role_id == role_id).delete() # 新增新的權限 for r in rights: function_id = r.get("function_id") fn = db.query(SystemFunctionCache).filter( SystemFunctionCache.id == function_id ).first() if not fn: continue right = RoleRight( role_id=role_id, function_id=function_id, can_read=r.get("can_read", False), can_create=r.get("can_create", False), can_update=r.get("can_update", False), can_delete=r.get("can_delete", False), ) db.add(right) db.commit() audit_service.log_action( request=request, db=db, action="update_role_rights", resource_type="role", resource_id=role_id, details={"rights_count": len(rights)}, ) return {"message": f"Role rights updated", "rights_count": len(rights)} # ======================== # 使用者角色分配 # ======================== @router.get("/user-assignments/") def get_user_role_assignments( db: Session = Depends(get_db), keycloak_user_id: Optional[str] = Query(None, description="Keycloak User UUID"), ): """取得使用者角色分配""" tenant_id = get_current_tenant_id() query = db.query(UserRoleAssignment).filter( UserRoleAssignment.tenant_id == tenant_id, UserRoleAssignment.is_active == True, ) if keycloak_user_id: query = query.filter(UserRoleAssignment.keycloak_user_id == keycloak_user_id) assignments = query.all() return [ { "id": a.id, "keycloak_user_id": a.keycloak_user_id, "role_id": a.role_id, "role_code": a.role.role_code if a.role else None, "role_name": a.role.role_name if a.role else None, "assigned_at": a.assigned_at, } for a in assignments ] @router.post("/user-assignments/", status_code=status.HTTP_201_CREATED) def assign_role_to_user( data: dict, request: Request, db: Session = Depends(get_db), ): """ 分配角色給使用者 (直接對人,跨部門有效) Body: { "keycloak_user_id": "uuid", "role_id": 1 } """ tenant_id = get_current_tenant_id() keycloak_user_id = data.get("keycloak_user_id") role_id = data.get("role_id") if not keycloak_user_id or not role_id: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="keycloak_user_id and role_id are required" ) role = db.query(Role).filter( Role.id == role_id, Role.tenant_id == tenant_id, ).first() if not role: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Role with id {role_id} not found" ) existing = db.query(UserRoleAssignment).filter( UserRoleAssignment.keycloak_user_id == keycloak_user_id, UserRoleAssignment.role_id == role_id, ).first() if existing: if existing.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="User already has this role assigned" ) existing.is_active = True db.commit() return {"message": "Role assignment reactivated", "id": existing.id} assignment = UserRoleAssignment( tenant_id=tenant_id, keycloak_user_id=keycloak_user_id, role_id=role_id, ) db.add(assignment) db.commit() db.refresh(assignment) audit_service.log_action( request=request, db=db, action="assign_role", resource_type="user_role_assignment", resource_id=assignment.id, details={"keycloak_user_id": keycloak_user_id, "role_id": role_id, "role_code": role.role_code}, ) return {"id": assignment.id, "keycloak_user_id": keycloak_user_id, "role_id": role_id} @router.delete("/user-assignments/{assignment_id}", response_model=MessageResponse) def revoke_role_from_user( assignment_id: int, request: Request, db: Session = Depends(get_db), ): """撤銷使用者角色""" tenant_id = get_current_tenant_id() assignment = db.query(UserRoleAssignment).filter( UserRoleAssignment.id == assignment_id, UserRoleAssignment.tenant_id == tenant_id, ).first() if not assignment: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Assignment with id {assignment_id} not found" ) assignment.is_active = False db.commit() audit_service.log_action( request=request, db=db, action="revoke_role", resource_type="user_role_assignment", resource_id=assignment_id, details={"keycloak_user_id": assignment.keycloak_user_id, "role_id": assignment.role_id}, ) return MessageResponse(message="Role assignment revoked") # ======================== # 系統功能查詢 # ======================== @router.get("/system-functions/") def get_system_functions( db: Session = Depends(get_db), service_code: Optional[str] = Query(None, description="服務代碼篩選: hr/erp/mail/ai"), ): """取得系統功能清單 (從快取表)""" query = db.query(SystemFunctionCache).filter(SystemFunctionCache.is_active == True) if service_code: query = query.filter(SystemFunctionCache.service_code == service_code) functions = query.order_by(SystemFunctionCache.service_code, SystemFunctionCache.id).all() return [ { "id": f.id, "service_code": f.service_code, "function_code": f.function_code, "function_name": f.function_name, "function_category": f.function_category, } for f in functions ]