""" 部門管理 API (統一樹狀結構) 設計原則: - depth=0, parent_id=NULL: 第一層部門 (原事業部),可設定 email_domain - depth>=1: 子部門,email_domain 繼承第一層祖先 - 取代舊的 business_units API """ from typing import List, Optional, Any, Dict from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from app.db.session import get_db from app.api.deps import get_tenant_id, get_current_tenant from app.models.department import Department from app.models.department_member import DepartmentMember from app.schemas.department import ( DepartmentCreate, DepartmentUpdate, DepartmentResponse, DepartmentListItem, DepartmentTreeNode, ) from app.schemas.response import MessageResponse router = APIRouter() def get_effective_email_domain(department: Department, db: Session) -> str | None: """取得部門的有效郵件網域 (第一層自身,子層向上追溯)""" if department.depth == 0: return department.email_domain if department.parent_id: parent = db.query(Department).filter(Department.id == department.parent_id).first() if parent: return get_effective_email_domain(parent, db) return None def build_tree(departments: List[Department], parent_id: int | None, db: Session) -> List[Dict]: """遞迴建立部門樹狀結構""" nodes = [] for dept in departments: if dept.parent_id == parent_id: children = build_tree(departments, dept.id, db) node = { "id": dept.id, "code": dept.code, "name": dept.name, "name_en": dept.name_en, "depth": dept.depth, "parent_id": dept.parent_id, "email_domain": dept.email_domain, "effective_email_domain": get_effective_email_domain(dept, db), "email_address": dept.email_address, "email_quota_mb": dept.email_quota_mb, "description": dept.description, "is_active": dept.is_active, "is_top_level": dept.depth == 0 and dept.parent_id is None, "member_count": dept.members.filter_by(is_active=True).count(), "children": children, } nodes.append(node) return nodes @router.get("/tree") def get_departments_tree( db: Session = Depends(get_db), tenant_id: int = Depends(get_tenant_id), include_inactive: bool = False, ): """ 取得完整部門樹狀結構 回傳格式: [ { "id": 1, "code": "BD", "name": "業務發展部", "depth": 0, "email_domain": "ease.taipei", "children": [ {"id": 4, "code": "WIND", "name": "玄鐵風能部", "depth": 1, ...} ] }, ... ] """ query = db.query(Department).filter(Department.tenant_id == tenant_id) if not include_inactive: query = query.filter(Department.is_active == True) all_departments = query.order_by(Department.depth, Department.id).all() tree = build_tree(all_departments, None, db) return tree @router.get("/", response_model=List[DepartmentListItem]) def get_departments( db: Session = Depends(get_db), tenant_id: int = Depends(get_tenant_id), parent_id: Optional[int] = Query(None, description="上層部門 ID 篩選 (0=取得第一層)"), depth: Optional[int] = Query(None, description="層次深度篩選 (0=第一層,1=第二層)"), include_inactive: bool = False, ): """ 獲取部門列表 Args: parent_id: 上層部門 ID 篩選 depth: 層次深度篩選 (0=第一層即原事業部,1=第二層子部門) include_inactive: 是否包含停用的部門 """ query = db.query(Department).filter(Department.tenant_id == tenant_id) if depth is not None: query = query.filter(Department.depth == depth) if parent_id is not None: if parent_id == 0: query = query.filter(Department.parent_id == None) else: query = query.filter(Department.parent_id == parent_id) if not include_inactive: query = query.filter(Department.is_active == True) departments = query.order_by(Department.depth, Department.id).all() result = [] for dept in departments: item = DepartmentListItem.model_validate(dept) item.effective_email_domain = get_effective_email_domain(dept, db) item.member_count = dept.members.filter_by(is_active=True).count() result.append(item) return result @router.get("/{department_id}", response_model=DepartmentResponse) def get_department( department_id: int, db: Session = Depends(get_db), tenant_id: int = Depends(get_tenant_id), ): """取得部門詳情""" department = db.query(Department).filter( Department.id == department_id, Department.tenant_id == tenant_id, ).first() if not department: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Department with id {department_id} not found" ) response = DepartmentResponse.model_validate(department) response.effective_email_domain = get_effective_email_domain(department, db) response.member_count = department.members.filter_by(is_active=True).count() if department.parent: response.parent_name = department.parent.name return response @router.post("/", response_model=DepartmentResponse, status_code=status.HTTP_201_CREATED) def create_department( department_data: DepartmentCreate, db: Session = Depends(get_db), tenant_id: int = Depends(get_tenant_id), ): """ 創建部門 規則: - parent_id=NULL: 建立第一層部門 (depth=0),可設定 email_domain - parent_id=有值: 建立子部門 (depth=parent.depth+1),不可設定 email_domain (繼承) """ depth = 0 parent = None if department_data.parent_id: # 檢查上層部門是否存在 parent = db.query(Department).filter( Department.id == department_data.parent_id, Department.tenant_id == tenant_id, ).first() if not parent: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Parent department with id {department_data.parent_id} not found" ) depth = parent.depth + 1 # 子部門不可設定 email_domain if hasattr(department_data, 'email_domain') and department_data.email_domain: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="email_domain can only be set on top-level departments (parent_id=NULL)" ) # 檢查同層內 code 是否已存在 existing = db.query(Department).filter( Department.tenant_id == tenant_id, Department.parent_id == department_data.parent_id, Department.code == department_data.code, ).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Department code '{department_data.code}' already exists at this level" ) data = department_data.model_dump() data['tenant_id'] = tenant_id data['depth'] = depth department = Department(**data) db.add(department) db.commit() db.refresh(department) response = DepartmentResponse.model_validate(department) response.effective_email_domain = get_effective_email_domain(department, db) response.member_count = 0 if parent: response.parent_name = parent.name return response @router.put("/{department_id}", response_model=DepartmentResponse) def update_department( department_id: int, department_data: DepartmentUpdate, db: Session = Depends(get_db), tenant_id: int = Depends(get_tenant_id), ): """ 更新部門 注意: code 和 parent_id 建立後不可修改 第一層部門可更新 email_domain,子部門不可更新 email_domain """ department = db.query(Department).filter( Department.id == department_id, Department.tenant_id == tenant_id, ).first() if not department: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Department with id {department_id} not found" ) update_data = department_data.model_dump(exclude_unset=True) # 子部門不可更新 email_domain if 'email_domain' in update_data and department.depth > 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="email_domain can only be set on top-level departments (depth=0)" ) for field, value in update_data.items(): setattr(department, field, value) db.commit() db.refresh(department) response = DepartmentResponse.model_validate(department) response.effective_email_domain = get_effective_email_domain(department, db) response.member_count = department.members.filter_by(is_active=True).count() if department.parent: response.parent_name = department.parent.name return response @router.delete("/{department_id}", response_model=MessageResponse) def delete_department( department_id: int, db: Session = Depends(get_db), tenant_id: int = Depends(get_tenant_id), ): """ 停用部門 (軟刪除) 注意: 有活躍成員的部門不可停用 """ department = db.query(Department).filter( Department.id == department_id, Department.tenant_id == tenant_id, ).first() if not department: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Department with id {department_id} not found" ) # 檢查是否有活躍的成員 active_members = department.members.filter_by(is_active=True).count() if active_members > 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Cannot deactivate department with {active_members} active members" ) # 檢查是否有活躍的子部門 active_children = db.query(Department).filter( Department.parent_id == department_id, Department.is_active == True, ).count() if active_children > 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Cannot deactivate department with {active_children} active sub-departments" ) department.is_active = False db.commit() return MessageResponse( message=f"Department '{department.name}' has been deactivated" ) @router.get("/{department_id}/children") def get_department_children( department_id: int, db: Session = Depends(get_db), tenant_id: int = Depends(get_tenant_id), include_inactive: bool = False, ): """取得部門的直接子部門列表""" # 確認父部門存在 parent = db.query(Department).filter( Department.id == department_id, Department.tenant_id == tenant_id, ).first() if not parent: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Department with id {department_id} not found" ) query = db.query(Department).filter( Department.parent_id == department_id, Department.tenant_id == tenant_id, ) if not include_inactive: query = query.filter(Department.is_active == True) children = query.order_by(Department.id).all() effective_domain = get_effective_email_domain(parent, db) result = [] for dept in children: item = DepartmentListItem.model_validate(dept) item.effective_email_domain = effective_domain item.member_count = dept.members.filter_by(is_active=True).count() result.append(item) return result