""" 員工管理 API """ from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query, status, Request from sqlalchemy.orm import Session from sqlalchemy import or_ from app.db.session import get_db from app.models.employee import Employee, EmployeeStatus from app.models.emp_setting import EmpSetting from app.models.emp_resume import EmpResume from app.models.department import Department from app.models.department_member import DepartmentMember from app.schemas.employee import ( EmployeeCreate, EmployeeUpdate, EmployeeResponse, EmployeeListItem, ) from app.schemas.base import PaginationParams, PaginatedResponse from app.schemas.response import SuccessResponse, MessageResponse from app.api.deps import get_pagination_params from app.services.audit_service import audit_service router = APIRouter() @router.get("/", response_model=PaginatedResponse) def get_employees( db: Session = Depends(get_db), pagination: PaginationParams = Depends(get_pagination_params), status_filter: Optional[EmployeeStatus] = Query(None, description="員工狀態篩選"), search: Optional[str] = Query(None, description="搜尋關鍵字 (姓名或帳號)"), ): """ 獲取員工列表 支援: - 分頁 - 狀態篩選 - 關鍵字搜尋 (姓名、帳號) """ # ⚠️ 暫時改為查詢 EmpSetting (因為 Employee model 對應的 tenant_employees 表不存在) query = db.query(EmpSetting).join(EmpResume, EmpSetting.tenant_resume_id == EmpResume.id) # 狀態篩選 if status_filter: query = query.filter(EmpSetting.employment_status == status_filter) # 搜尋 (在 EmpResume 中搜尋) if search: search_pattern = f"%{search}%" query = query.filter( or_( EmpResume.legal_name.ilike(search_pattern), EmpResume.english_name.ilike(search_pattern), EmpSetting.tenant_emp_code.ilike(search_pattern), ) ) # 總數 total = query.count() # 分頁 offset = (pagination.page - 1) * pagination.page_size emp_settings = query.offset(offset).limit(pagination.page_size).all() # 計算總頁數 total_pages = (total + pagination.page_size - 1) // pagination.page_size # 轉換為回應格式 (暫時簡化,不使用 EmployeeListItem) items = [] for emp_setting in emp_settings: resume = emp_setting.resume items.append({ "id": emp_setting.id if hasattr(emp_setting, 'id') else emp_setting.tenant_id * 10000 + emp_setting.seq_no, "employee_id": emp_setting.tenant_emp_code, "legal_name": resume.legal_name if resume else "", "english_name": resume.english_name if resume else "", "status": emp_setting.employment_status, "hire_date": emp_setting.hire_at.isoformat() if emp_setting.hire_at else None, "is_active": emp_setting.is_active, }) return PaginatedResponse( total=total, page=pagination.page, page_size=pagination.page_size, total_pages=total_pages, items=items, ) @router.get("/{employee_id}", response_model=EmployeeResponse) def get_employee( employee_id: int, db: Session = Depends(get_db), ): """ 獲取員工詳情 (Phase 2.4: 包含主要身份完整資訊) """ employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Employee with id {employee_id} not found" ) # 組建回應 response = EmployeeResponse.model_validate(employee) response.has_network_drive = employee.network_drive is not None response.department_count = sum(1 for m in employee.department_memberships if m.is_active) return response @router.post("/", response_model=EmployeeResponse, status_code=status.HTTP_201_CREATED) def create_employee( employee_data: EmployeeCreate, request: Request, db: Session = Depends(get_db), ): """ 創建員工 (Phase 2.3: 同時創建第一個員工身份) 自動生成員工編號 (EMP001, EMP002, ...) 同時創建第一個 employee_identity 記錄 (主要身份) """ # 檢查 username_base 是否已存在 existing = db.query(Employee).filter( Employee.username_base == employee_data.username_base ).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Username '{employee_data.username_base}' already exists" ) # 檢查部門是否存在 (如果有提供) department = None if employee_data.department_id: department = db.query(Department).filter( Department.id == employee_data.department_id, Department.is_active == True ).first() if not department: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Department with id {employee_data.department_id} not found or inactive" ) # 生成員工編號 last_employee = db.query(Employee).order_by(Employee.id.desc()).first() if last_employee and last_employee.employee_id.startswith("EMP"): try: last_number = int(last_employee.employee_id[3:]) new_number = last_number + 1 except ValueError: new_number = 1 else: new_number = 1 employee_id = f"EMP{new_number:03d}" # 創建員工 # TODO: 從 JWT token 或 session 獲取 tenant_id,目前暫時使用預設值 1 tenant_id = 1 # 預設租戶 ID employee = Employee( tenant_id=tenant_id, employee_id=employee_id, username_base=employee_data.username_base, legal_name=employee_data.legal_name, english_name=employee_data.english_name, phone=employee_data.phone, mobile=employee_data.mobile, hire_date=employee_data.hire_date, status=EmployeeStatus.ACTIVE, ) db.add(employee) db.flush() # 先 flush 以取得 employee.id # 若有指定部門,建立 department_member 紀錄 if department: membership = DepartmentMember( tenant_id=tenant_id, employee_id=employee.id, department_id=department.id, position=employee_data.job_title, membership_type="permanent", is_active=True, ) db.add(membership) db.commit() db.refresh(employee) # 創建審計日誌 audit_service.log_create( db=db, resource_type="employee", resource_id=employee.id, performed_by="system@porscheworld.tw", # TODO: 從 JWT Token 獲取 details={ "employee_id": employee.employee_id, "username_base": employee.username_base, "legal_name": employee.legal_name, "department_id": employee_data.department_id, "job_title": employee_data.job_title, }, ip_address=audit_service.get_client_ip(request), ) response = EmployeeResponse.model_validate(employee) response.has_network_drive = False response.department_count = 1 if department else 0 return response @router.put("/{employee_id}", response_model=EmployeeResponse) def update_employee( employee_id: int, employee_data: EmployeeUpdate, request: Request, db: Session = Depends(get_db), ): """ 更新員工資料 """ employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Employee with id {employee_id} not found" ) # 記錄舊值 (用於審計日誌) old_values = audit_service.model_to_dict(employee) # 更新欄位 (只更新提供的欄位) update_data = employee_data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(employee, field, value) db.commit() db.refresh(employee) # 創建審計日誌 if update_data: # 只有實際有更新時才記錄 audit_service.log_update( db=db, resource_type="employee", resource_id=employee.id, performed_by="system@porscheworld.tw", # TODO: 從 JWT Token 獲取 old_values={k: old_values[k] for k in update_data.keys() if k in old_values}, new_values=update_data, ip_address=audit_service.get_client_ip(request), ) response = EmployeeResponse.model_validate(employee) response.has_network_drive = employee.network_drive is not None response.department_count = sum(1 for m in employee.department_memberships if m.is_active) return response @router.delete("/{employee_id}", response_model=MessageResponse) def delete_employee( employee_id: int, request: Request, db: Session = Depends(get_db), ): """ 停用員工 注意: 這是軟刪除,只將狀態設為 terminated """ employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Employee with id {employee_id} not found" ) # 軟刪除 employee.status = EmployeeStatus.TERMINATED # 停用所有部門成員資格 from datetime import datetime memberships_deactivated = 0 for membership in employee.department_memberships: if membership.is_active: membership.is_active = False membership.ended_at = datetime.utcnow() memberships_deactivated += 1 # 停用 NAS 帳號 has_nas = employee.network_drive is not None if employee.network_drive: employee.network_drive.is_active = False db.commit() # 創建審計日誌 audit_service.log_delete( db=db, resource_type="employee", resource_id=employee.id, performed_by="system@porscheworld.tw", # TODO: 從 JWT Token 獲取 details={ "employee_id": employee.employee_id, "username_base": employee.username_base, "legal_name": employee.legal_name, "memberships_deactivated": memberships_deactivated, "nas_deactivated": has_nas, }, ip_address=audit_service.get_client_ip(request), ) # TODO: 停用 Keycloak 帳號 return MessageResponse( message=f"Employee {employee.employee_id} ({employee.legal_name}) has been terminated" ) @router.post("/{employee_id}/activate", response_model=MessageResponse) def activate_employee( employee_id: int, db: Session = Depends(get_db), ): """ 重新啟用員工 """ employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Employee with id {employee_id} not found" ) employee.status = EmployeeStatus.ACTIVE db.commit() # TODO: 創建審計日誌 return MessageResponse( message=f"Employee {employee.employee_id} ({employee.legal_name}) has been activated" ) @router.get("/{employee_id}/identities", response_model=List, deprecated=True) def get_employee_identities( employee_id: int, db: Session = Depends(get_db), ): """ [已廢棄] 獲取員工的所有身份 此端點已廢棄,請使用 GET /department-members/?employee_id={id} 取代 """ employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Employee with id {employee_id} not found" ) # 廢棄端點: 回傳空列表,請改用 /department-members/?employee_id={id} return []