""" Keycloak SSO 整合服務 """ from typing import Optional, Dict, Any from keycloak import KeycloakAdmin, KeycloakOpenID from keycloak.exceptions import KeycloakError from app.core.config import settings class KeycloakService: """Keycloak 服務類別""" def __init__(self): """初始化 Keycloak 連線""" self.server_url = settings.KEYCLOAK_URL self.realm_name = settings.KEYCLOAK_REALM self.client_id = settings.KEYCLOAK_CLIENT_ID self.client_secret = settings.KEYCLOAK_CLIENT_SECRET self._admin = None self._openid = None @property def admin(self) -> Optional[KeycloakAdmin]: """延遲初始化 Keycloak Admin 客戶端""" if self._admin is None and settings.KEYCLOAK_ADMIN_USERNAME and settings.KEYCLOAK_ADMIN_PASSWORD: try: # Keycloak 26.x 需要完整的 server_url (不含 /auth) self._admin = KeycloakAdmin( server_url=self.server_url, username=settings.KEYCLOAK_ADMIN_USERNAME, password=settings.KEYCLOAK_ADMIN_PASSWORD, realm_name=self.realm_name, user_realm_name="master", # Admin 登入的 realm (通常是 master) verify=True, timeout=10 # 設定 10 秒超時 ) except Exception as e: print(f"Warning: Failed to initialize Keycloak Admin: {e}") import traceback traceback.print_exc() return self._admin @property def openid(self) -> KeycloakOpenID: """延遲初始化 Keycloak OpenID 客戶端""" if self._openid is None: self._openid = KeycloakOpenID( server_url=self.server_url, client_id=self.client_id, realm_name=self.realm_name, client_secret_key=self.client_secret ) return self._openid def create_user( self, username: str, email: str, first_name: str, last_name: str, enabled: bool = True, email_verified: bool = False, temporary_password: Optional[str] = None, ) -> Optional[str]: """ 創建 Keycloak 用戶 Args: username: 用戶名稱 (username_base@email_domain) email: 郵件地址 (同 username) first_name: 名字 last_name: 姓氏 enabled: 是否啟用 email_verified: 郵件是否已驗證 temporary_password: 臨時密碼 (用戶首次登入需修改) Returns: str: Keycloak User ID (UUID), 失敗返回 None """ if not self.admin: print("Error: Keycloak Admin not initialized") return None try: # 創建用戶 user_data = { "username": username, "email": email, "firstName": first_name, "lastName": last_name, "enabled": enabled, "emailVerified": email_verified, } # 如果提供臨時密碼 if temporary_password: user_data["credentials"] = [{ "type": "password", "value": temporary_password, "temporary": True # 用戶首次登入需修改 }] user_id = self.admin.create_user(user_data) print(f"[OK] Keycloak user created: {username} (ID: {user_id})") return user_id except KeycloakError as e: print(f"[ERROR] Failed to create Keycloak user {username}: {e}") return None def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]: """ 根據用戶名獲取用戶資訊 Args: username: 用戶名稱 Returns: dict: 用戶資訊, 不存在返回 None """ if not self.admin: return None try: users = self.admin.get_users({"username": username}) if users: return users[0] return None except KeycloakError as e: print(f"[ERROR] Failed to get user {username}: {e}") return None def update_user(self, user_id: str, user_data: Dict[str, Any]) -> bool: """ 更新用戶資訊 Args: user_id: Keycloak User ID user_data: 要更新的用戶資料 Returns: bool: 成功返回 True """ if not self.admin: return False try: self.admin.update_user(user_id, user_data) print(f"[OK] Keycloak user updated: {user_id}") return True except KeycloakError as e: print(f"[ERROR] Failed to update user {user_id}: {e}") return False def disable_user(self, user_id: str) -> bool: """ 停用用戶 Args: user_id: Keycloak User ID Returns: bool: 成功返回 True """ return self.update_user(user_id, {"enabled": False}) def enable_user(self, user_id: str) -> bool: """ 啟用用戶 Args: user_id: Keycloak User ID Returns: bool: 成功返回 True """ return self.update_user(user_id, {"enabled": True}) def delete_user(self, user_id: str) -> bool: """ 刪除用戶 注意: 這是實際刪除,建議使用 disable_user 進行軟刪除 Args: user_id: Keycloak User ID Returns: bool: 成功返回 True """ if not self.admin: return False try: self.admin.delete_user(user_id) print(f"[OK] Keycloak user deleted: {user_id}") return True except KeycloakError as e: print(f"[ERROR] Failed to delete user {user_id}: {e}") return False def reset_password( self, user_id: str, new_password: str, temporary: bool = True ) -> bool: """ 重設用戶密碼 Args: user_id: Keycloak User ID new_password: 新密碼 temporary: 是否為臨時密碼 (用戶首次登入需修改) Returns: bool: 成功返回 True """ if not self.admin: return False try: self.admin.set_user_password( user_id, new_password, temporary=temporary ) print(f"[OK] Password reset for user: {user_id}") return True except KeycloakError as e: print(f"[ERROR] Failed to reset password for {user_id}: {e}") return False def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """ 驗證 JWT Token Args: token: JWT Token Returns: dict: Token payload (包含用戶資訊), 無效返回 None """ try: # python-keycloak 會自動從 Keycloak 獲取公鑰並驗證 token_info = self.openid.decode_token( token, validate=True # 驗證簽名和過期時間 ) return token_info except Exception as e: print(f"[ERROR] Token verification failed: {e}") return None def get_user_info_from_token(self, token: str) -> Optional[Dict[str, Any]]: """ 從 Token 獲取用戶資訊 Args: token: JWT Token Returns: dict: 用戶資訊 """ token_info = self.verify_token(token) if not token_info: return None return { "username": token_info.get("preferred_username"), "email": token_info.get("email"), "first_name": token_info.get("given_name"), "last_name": token_info.get("family_name"), "sub": token_info.get("sub"), # Keycloak User ID "iss": token_info.get("iss"), # Issuer (用於多租戶) "realm_access": token_info.get("realm_access"), # 角色資訊 } def introspect_token(self, token: str) -> Optional[Dict[str, Any]]: """ 檢查 Token 狀態 Args: token: JWT Token Returns: dict: Token 資訊 (包含 active 狀態) """ try: return self.openid.introspect(token) except Exception as e: print(f"[ERROR] Token introspection failed: {e}") return None def is_token_active(self, token: str) -> bool: """ 檢查 Token 是否有效 Args: token: JWT Token Returns: bool: 有效返回 True """ introspection = self.introspect_token(token) if not introspection: return False return introspection.get("active", False) # 全域 Keycloak 服務實例 # keycloak_service = KeycloakService() # 延遲初始化服務實例 _keycloak_service_instance: Optional[KeycloakService] = None def get_keycloak_service() -> KeycloakService: """獲取 Keycloak 服務實例 (單例)""" global _keycloak_service_instance if _keycloak_service_instance is None: _keycloak_service_instance = KeycloakService() return _keycloak_service_instance # 模擬屬性訪問 class _KeycloakServiceProxy: def __getattr__(self, name): return getattr(get_keycloak_service(), name) keycloak_service = _KeycloakServiceProxy()