Files
hr-portal/backend/app/batch/archive_audit_logs.py
Porsche Chen 360533393f feat: HR Portal - Complete Multi-Tenant System with Redis Session Storage
Major Features:
-  Multi-tenant architecture (tenant isolation)
-  Employee CRUD with lifecycle management (onboarding/offboarding)
-  Department tree structure with email domain management
-  Company info management (single-record editing)
-  System functions CRUD (permission management)
-  Email account management (multi-account per employee)
-  Keycloak SSO integration (auth.lab.taipei)
-  Redis session storage (10.1.0.254:6379)
  - Solves Cookie 4KB limitation
  - Cross-system session sharing
  - Sliding expiration (8 hours)
  - Automatic token refresh

Technical Stack:
Backend:
- FastAPI + SQLAlchemy
- PostgreSQL 16 (10.1.0.20:5433)
- Keycloak Admin API integration
- Docker Mailserver integration (SSH)
- Alembic migrations

Frontend:
- Next.js 14 (App Router)
- NextAuth 4 with Keycloak Provider
- Redis session storage (ioredis)
- Tailwind CSS

Infrastructure:
- Redis 7 (10.1.0.254:6379) - Session + Cache
- Keycloak 26.1.0 (auth.lab.taipei)
- Docker Mailserver (10.1.0.254)

Architecture Highlights:
- Session管理由 Keycloak + Redis 統一控制
- 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session
- Token 自動刷新,異質服務整合
- 未來可無縫遷移到雲端

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-23 20:12:43 +08:00

161 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
審計日誌歸檔批次 (5.3)
執行時間: 每月 1 日 01:00
批次名稱: archive_audit_logs
將 90 天前的審計日誌匯出為 CSV並從主資料庫刪除
歸檔目錄: /mnt/nas/working/audit_logs/
"""
import csv
import logging
import os
from datetime import datetime, timedelta
from typing import Optional
from app.batch.base import log_batch_execution
logger = logging.getLogger(__name__)
ARCHIVE_DAYS = 90 # 保留最近 90 天,超過的歸檔
ARCHIVE_BASE_DIR = "/mnt/nas/working/audit_logs"
def _get_archive_dir() -> str:
"""取得歸檔目錄,不存在時建立"""
os.makedirs(ARCHIVE_BASE_DIR, exist_ok=True)
return ARCHIVE_BASE_DIR
def run_archive_audit_logs(dry_run: bool = False) -> dict:
"""
執行審計日誌歸檔批次
Args:
dry_run: True 時只統計不實際刪除
Returns:
執行結果摘要
"""
started_at = datetime.utcnow()
cutoff_date = datetime.utcnow() - timedelta(days=ARCHIVE_DAYS)
logger.info(f"=== 開始審計日誌歸檔批次 === 截止日期: {cutoff_date.strftime('%Y-%m-%d')}")
if dry_run:
logger.info("[DRY RUN] 不會實際刪除資料")
from app.db.session import get_db
from app.models.audit_log import AuditLog
db = next(get_db())
try:
# 1. 查詢超過 90 天的日誌
old_logs = db.query(AuditLog).filter(
AuditLog.performed_at < cutoff_date
).order_by(AuditLog.performed_at).all()
total_count = len(old_logs)
logger.info(f"找到 {total_count} 筆待歸檔日誌")
if total_count == 0:
message = f"無需歸檔 (截止日期 {cutoff_date.strftime('%Y-%m-%d')} 前無記錄)"
log_batch_execution(
batch_name="archive_audit_logs",
status="success",
message=message,
started_at=started_at,
)
return {"status": "success", "archived": 0, "message": message}
# 2. 匯出到 CSV
archive_month = cutoff_date.strftime("%Y%m")
archive_dir = _get_archive_dir()
csv_path = os.path.join(archive_dir, f"archive_{archive_month}.csv")
fieldnames = [
"id", "action", "resource_type", "resource_id",
"performed_by", "ip_address",
"details", "performed_at"
]
logger.info(f"匯出至: {csv_path}")
with open(csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for log in old_logs:
writer.writerow({
"id": log.id,
"action": log.action,
"resource_type": log.resource_type,
"resource_id": log.resource_id,
"performed_by": getattr(log, "performed_by", ""),
"ip_address": getattr(log, "ip_address", ""),
"details": str(getattr(log, "details", "")),
"performed_at": str(log.performed_at),
})
logger.info(f"已匯出 {total_count} 筆至 {csv_path}")
# 3. 刪除舊日誌 (非 dry_run 才執行)
deleted_count = 0
if not dry_run:
for log in old_logs:
db.delete(log)
db.commit()
deleted_count = total_count
logger.info(f"已刪除 {deleted_count} 筆舊日誌")
else:
logger.info(f"[DRY RUN] 將刪除 {total_count} 筆 (未實際執行)")
# 4. 記錄批次執行日誌
finished_at = datetime.utcnow()
message = (
f"歸檔 {total_count} 筆到 {csv_path}"
+ (f"; 已刪除 {deleted_count}" if not dry_run else " (DRY RUN)")
)
log_batch_execution(
batch_name="archive_audit_logs",
status="success",
message=message,
started_at=started_at,
finished_at=finished_at,
)
logger.info(f"=== 審計日誌歸檔批次完成 === {message}")
return {
"status": "success",
"archived": total_count,
"deleted": deleted_count,
"csv_path": csv_path,
}
except Exception as e:
error_msg = f"審計日誌歸檔批次失敗: {str(e)}"
logger.error(error_msg)
try:
db.rollback()
except Exception:
pass
log_batch_execution(
batch_name="archive_audit_logs",
status="failed",
message=error_msg,
started_at=started_at,
)
return {"status": "failed", "error": str(e)}
finally:
db.close()
if __name__ == "__main__":
import sys
import argparse
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true", help="只統計不實際刪除")
args = parser.parse_args()
result = run_archive_audit_logs(dry_run=args.dry_run)
print(f"執行結果: {result}")