""" Keycloak 同步批次 (5.2) 執行時間: 每日 03:00 批次名稱: sync_keycloak_users 同步 Keycloak 使用者狀態到 HR Portal 以 HR Portal 為準 (Single Source of Truth) """ import logging from datetime import datetime from app.batch.base import log_batch_execution logger = logging.getLogger(__name__) def run_sync_keycloak_users() -> dict: """ 執行 Keycloak 同步批次 以 HR Portal 員工狀態為準,同步到 Keycloak: - active → Keycloak enabled = True - terminated/on_leave → Keycloak enabled = False Returns: 執行結果摘要 """ started_at = datetime.utcnow() summary = { "total_checked": 0, "synced": 0, "not_found_in_keycloak": 0, "no_keycloak_id": 0, "errors": 0, } issues = [] logger.info("=== 開始 Keycloak 同步批次 ===") from app.db.session import get_db from app.models.employee import Employee from app.services.keycloak_admin_client import get_keycloak_admin_client db = next(get_db()) try: # 1. 取得所有員工 employees = db.query(Employee).all() keycloak_client = get_keycloak_admin_client() logger.info(f"共 {len(employees)} 位員工待檢查") for emp in employees: summary["total_checked"] += 1 # 跳過沒有 Keycloak ID 的員工 (尚未執行到職流程) # 以 username_base 查詢 Keycloak username = emp.username_base if not username: summary["no_keycloak_id"] += 1 continue try: # 2. 查詢 Keycloak 使用者 kc_user = keycloak_client.get_user_by_username(username) if not kc_user: # Keycloak 使用者不存在,可能尚未建立 summary["not_found_in_keycloak"] += 1 logger.debug(f"員工 {emp.employee_id} ({username}) 在 Keycloak 中不存在,跳過") continue kc_user_id = kc_user.get("id") kc_enabled = kc_user.get("enabled", False) # 3. 判斷應有的 enabled 狀態 should_be_enabled = (emp.status == "active") # 4. 狀態不一致時,以 HR Portal 為準同步到 Keycloak if kc_enabled != should_be_enabled: success = keycloak_client.update_user( kc_user_id, {"enabled": should_be_enabled} ) if success: summary["synced"] += 1 logger.info( f"✓ 同步 {emp.employee_id} ({username}): " f"Keycloak enabled {kc_enabled} → {should_be_enabled} " f"(HR 狀態: {emp.status})" ) else: summary["errors"] += 1 issues.append(f"{emp.employee_id}: 同步失敗") logger.warning(f"✗ 同步 {emp.employee_id} ({username}) 失敗") except Exception as e: summary["errors"] += 1 issues.append(f"{emp.employee_id}: {str(e)}") logger.error(f"處理員工 {emp.employee_id} 時發生錯誤: {e}") # 5. 記錄批次執行日誌 finished_at = datetime.utcnow() message = ( f"檢查: {summary['total_checked']}, " f"同步: {summary['synced']}, " f"Keycloak 無帳號: {summary['not_found_in_keycloak']}, " f"錯誤: {summary['errors']}" ) if issues: message += f"\n問題清單: {'; '.join(issues[:10])}" if len(issues) > 10: message += f" ... 共 {len(issues)} 個問題" status = "failed" if summary["errors"] > 0 else "success" log_batch_execution( batch_name="sync_keycloak_users", status=status, message=message, started_at=started_at, finished_at=finished_at, ) logger.info(f"=== Keycloak 同步批次完成 === {message}") return {"status": status, "summary": summary} except Exception as e: error_msg = f"Keycloak 同步批次失敗: {str(e)}" logger.error(error_msg) log_batch_execution( batch_name="sync_keycloak_users", status="failed", message=error_msg, started_at=started_at, ) return {"status": "failed", "error": str(e)} finally: db.close() if __name__ == "__main__": import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) logging.basicConfig(level=logging.INFO) result = run_sync_keycloak_users() print(f"執行結果: {result}")