""" 審計日誌 API """ from typing import List, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from app.db.session import get_db from app.models.audit_log import AuditLog from app.schemas.audit_log import ( AuditLogResponse, AuditLogListItem, AuditLogFilter, ) from app.schemas.base import PaginationParams, PaginatedResponse from app.api.deps import get_pagination_params router = APIRouter() @router.get("/", response_model=PaginatedResponse) def get_audit_logs( db: Session = Depends(get_db), pagination: PaginationParams = Depends(get_pagination_params), action: Optional[str] = Query(None, description="操作類型篩選"), resource_type: Optional[str] = Query(None, description="資源類型篩選"), resource_id: Optional[int] = Query(None, description="資源 ID 篩選"), performed_by: Optional[str] = Query(None, description="操作者篩選"), start_date: Optional[datetime] = Query(None, description="開始日期"), end_date: Optional[datetime] = Query(None, description="結束日期"), ): """ 獲取審計日誌列表 支援: - 分頁 - 多種篩選條件 - 時間範圍篩選 """ query = db.query(AuditLog) # 操作類型篩選 if action: query = query.filter(AuditLog.action == action) # 資源類型篩選 if resource_type: query = query.filter(AuditLog.resource_type == resource_type) # 資源 ID 篩選 if resource_id is not None: query = query.filter(AuditLog.resource_id == resource_id) # 操作者篩選 if performed_by: query = query.filter(AuditLog.performed_by.ilike(f"%{performed_by}%")) # 時間範圍篩選 if start_date: query = query.filter(AuditLog.performed_at >= start_date) if end_date: query = query.filter(AuditLog.performed_at <= end_date) # 總數 total = query.count() # 分頁 (按時間倒序) offset = (pagination.page - 1) * pagination.page_size audit_logs = query.order_by( AuditLog.performed_at.desc() ).offset(offset).limit(pagination.page_size).all() # 計算總頁數 total_pages = (total + pagination.page_size - 1) // pagination.page_size return PaginatedResponse( total=total, page=pagination.page, page_size=pagination.page_size, total_pages=total_pages, items=[AuditLogListItem.model_validate(log) for log in audit_logs], ) @router.get("/{audit_log_id}", response_model=AuditLogResponse) def get_audit_log( audit_log_id: int, db: Session = Depends(get_db), ): """ 獲取審計日誌詳情 """ audit_log = db.query(AuditLog).filter( AuditLog.id == audit_log_id ).first() if not audit_log: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Audit log with id {audit_log_id} not found" ) return AuditLogResponse.model_validate(audit_log) @router.get("/resource/{resource_type}/{resource_id}", response_model=List[AuditLogListItem]) def get_resource_audit_logs( resource_type: str, resource_id: int, db: Session = Depends(get_db), ): """ 獲取特定資源的所有審計日誌 Args: resource_type: 資源類型 (employee, identity, department, etc.) resource_id: 資源 ID Returns: 該資源的所有操作記錄 (按時間倒序) """ audit_logs = db.query(AuditLog).filter( AuditLog.resource_type == resource_type, AuditLog.resource_id == resource_id ).order_by(AuditLog.performed_at.desc()).all() return [AuditLogListItem.model_validate(log) for log in audit_logs] @router.get("/user/{username}", response_model=List[AuditLogListItem]) def get_user_audit_logs( username: str, db: Session = Depends(get_db), limit: int = Query(100, le=1000, description="限制返回數量"), ): """ 獲取特定用戶的操作記錄 Args: username: 操作者 SSO 帳號 limit: 限制返回數量 (預設 100,最大 1000) Returns: 該用戶的操作記錄 (按時間倒序) """ audit_logs = db.query(AuditLog).filter( AuditLog.performed_by == username ).order_by(AuditLog.performed_at.desc()).limit(limit).all() return [AuditLogListItem.model_validate(log) for log in audit_logs] @router.get("/stats/summary") def get_audit_stats( db: Session = Depends(get_db), start_date: Optional[datetime] = Query(None, description="開始日期"), end_date: Optional[datetime] = Query(None, description="結束日期"), ): """ 獲取審計日誌統計 返回: - 按操作類型分組的統計 - 按資源類型分組的統計 - 操作頻率最高的用戶 """ query = db.query(AuditLog) if start_date: query = query.filter(AuditLog.performed_at >= start_date) if end_date: query = query.filter(AuditLog.performed_at <= end_date) # 總操作數 total_operations = query.count() # 按操作類型統計 from sqlalchemy import func action_stats = db.query( AuditLog.action, func.count(AuditLog.id).label('count') ).group_by(AuditLog.action).all() # 按資源類型統計 resource_stats = db.query( AuditLog.resource_type, func.count(AuditLog.id).label('count') ).group_by(AuditLog.resource_type).all() # 操作最多的用戶 (Top 10) top_users = db.query( AuditLog.performed_by, func.count(AuditLog.id).label('count') ).group_by(AuditLog.performed_by).order_by( func.count(AuditLog.id).desc() ).limit(10).all() return { "total_operations": total_operations, "by_action": {action: count for action, count in action_stats}, "by_resource_type": {resource: count for resource, count in resource_stats}, "top_users": [ {"username": user, "operations": count} for user, count in top_users ] }