""" 審計日誌歸檔批次 (5.3) 執行時間: 每月 1 日 01:00 批次名稱: archive_audit_logs 將 90 天前的審計日誌匯出為 CSV,並從主資料庫刪除 歸檔目錄: /mnt/nas/working/audit_logs/ """ import csv import logging import os from datetime import datetime, timedelta from typing import Optional from app.batch.base import log_batch_execution logger = logging.getLogger(__name__) ARCHIVE_DAYS = 90 # 保留最近 90 天,超過的歸檔 ARCHIVE_BASE_DIR = "/mnt/nas/working/audit_logs" def _get_archive_dir() -> str: """取得歸檔目錄,不存在時建立""" os.makedirs(ARCHIVE_BASE_DIR, exist_ok=True) return ARCHIVE_BASE_DIR def run_archive_audit_logs(dry_run: bool = False) -> dict: """ 執行審計日誌歸檔批次 Args: dry_run: True 時只統計不實際刪除 Returns: 執行結果摘要 """ started_at = datetime.utcnow() cutoff_date = datetime.utcnow() - timedelta(days=ARCHIVE_DAYS) logger.info(f"=== 開始審計日誌歸檔批次 === 截止日期: {cutoff_date.strftime('%Y-%m-%d')}") if dry_run: logger.info("[DRY RUN] 不會實際刪除資料") from app.db.session import get_db from app.models.audit_log import AuditLog db = next(get_db()) try: # 1. 查詢超過 90 天的日誌 old_logs = db.query(AuditLog).filter( AuditLog.performed_at < cutoff_date ).order_by(AuditLog.performed_at).all() total_count = len(old_logs) logger.info(f"找到 {total_count} 筆待歸檔日誌") if total_count == 0: message = f"無需歸檔 (截止日期 {cutoff_date.strftime('%Y-%m-%d')} 前無記錄)" log_batch_execution( batch_name="archive_audit_logs", status="success", message=message, started_at=started_at, ) return {"status": "success", "archived": 0, "message": message} # 2. 匯出到 CSV archive_month = cutoff_date.strftime("%Y%m") archive_dir = _get_archive_dir() csv_path = os.path.join(archive_dir, f"archive_{archive_month}.csv") fieldnames = [ "id", "action", "resource_type", "resource_id", "performed_by", "ip_address", "details", "performed_at" ] logger.info(f"匯出至: {csv_path}") with open(csv_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for log in old_logs: writer.writerow({ "id": log.id, "action": log.action, "resource_type": log.resource_type, "resource_id": log.resource_id, "performed_by": getattr(log, "performed_by", ""), "ip_address": getattr(log, "ip_address", ""), "details": str(getattr(log, "details", "")), "performed_at": str(log.performed_at), }) logger.info(f"已匯出 {total_count} 筆至 {csv_path}") # 3. 刪除舊日誌 (非 dry_run 才執行) deleted_count = 0 if not dry_run: for log in old_logs: db.delete(log) db.commit() deleted_count = total_count logger.info(f"已刪除 {deleted_count} 筆舊日誌") else: logger.info(f"[DRY RUN] 將刪除 {total_count} 筆 (未實際執行)") # 4. 記錄批次執行日誌 finished_at = datetime.utcnow() message = ( f"歸檔 {total_count} 筆到 {csv_path}" + (f"; 已刪除 {deleted_count} 筆" if not dry_run else " (DRY RUN)") ) log_batch_execution( batch_name="archive_audit_logs", status="success", message=message, started_at=started_at, finished_at=finished_at, ) logger.info(f"=== 審計日誌歸檔批次完成 === {message}") return { "status": "success", "archived": total_count, "deleted": deleted_count, "csv_path": csv_path, } except Exception as e: error_msg = f"審計日誌歸檔批次失敗: {str(e)}" logger.error(error_msg) try: db.rollback() except Exception: pass log_batch_execution( batch_name="archive_audit_logs", status="failed", message=error_msg, started_at=started_at, ) return {"status": "failed", "error": str(e)} finally: db.close() if __name__ == "__main__": import sys import argparse sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true", help="只統計不實際刪除") args = parser.parse_args() result = run_archive_audit_logs(dry_run=args.dry_run) print(f"執行結果: {result}")