""" Keycloak Admin REST API 客戶端 直接使用 REST API,避免 python-keycloak 套件的版本兼容性問題 """ import requests from typing import Optional, Dict, Any, List from app.core.config import settings class KeycloakAdminClient: """Keycloak Admin REST API 客戶端""" def __init__(self): """初始化客戶端""" self.server_url = settings.KEYCLOAK_URL self.realm = settings.KEYCLOAK_REALM self.admin_username = settings.KEYCLOAK_ADMIN_USERNAME self.admin_password = settings.KEYCLOAK_ADMIN_PASSWORD self._access_token: Optional[str] = None def _get_admin_token(self) -> Optional[str]: """ 獲取 Admin 訪問令牌 Returns: str: Access Token, 失敗返回 None """ try: token_url = f"{self.server_url}/realms/master/protocol/openid-connect/token" data = { "client_id": "admin-cli", "username": self.admin_username, "password": self.admin_password, "grant_type": "password", } response = requests.post(token_url, data=data, timeout=10) response.raise_for_status() token_data = response.json() self._access_token = token_data.get("access_token") return self._access_token except Exception as e: print(f"✗ Failed to get admin token: {e}") return None def _get_headers(self) -> Dict[str, str]: """ 獲取請求標頭 Returns: dict: 包含 Authorization 的標頭 """ if not self._access_token: self._get_admin_token() return { "Authorization": f"Bearer {self._access_token}", "Content-Type": "application/json", } def get_users(self, query: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """ 獲取用戶列表 Args: query: 查詢參數 (username, email, first, max, etc.) Returns: list: 用戶列表 """ try: url = f"{self.server_url}/admin/realms/{self.realm}/users" params = query or {} response = requests.get( url, headers=self._get_headers(), params=params, timeout=10 ) # 如果是 401,重新獲取 token 並重試 if response.status_code == 401: self._get_admin_token() response = requests.get( url, headers=self._get_headers(), params=params, timeout=10 ) response.raise_for_status() return response.json() except Exception as e: print(f"✗ Failed to get users: {e}") return [] def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]: """ 根據用戶名獲取用戶 Args: username: 用戶名稱 Returns: dict: 用戶資料, 不存在返回 None """ users = self.get_users({"username": username, "exact": True}) return users[0] if users else None def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: """ 根據 ID 獲取用戶 Args: user_id: Keycloak User ID Returns: dict: 用戶資料 """ try: url = f"{self.server_url}/admin/realms/{self.realm}/users/{user_id}" response = requests.get( url, headers=self._get_headers(), timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.get(url, headers=self._get_headers(), timeout=10) response.raise_for_status() return response.json() except Exception as e: print(f"✗ Failed to get user {user_id}: {e}") return None def create_user( self, username: str, email: str, first_name: str, last_name: str, enabled: bool = True, email_verified: bool = False, ) -> Optional[str]: """ 創建用戶 Args: username: 用戶名稱 email: 郵件地址 first_name: 名字 last_name: 姓氏 enabled: 是否啟用 email_verified: 郵件是否已驗證 Returns: str: User ID (成功時), None (失敗時) """ try: url = f"{self.server_url}/admin/realms/{self.realm}/users" user_data = { "username": username, "email": email, "firstName": first_name, "lastName": last_name, "enabled": enabled, "emailVerified": email_verified, } response = requests.post( url, headers=self._get_headers(), json=user_data, timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.post( url, headers=self._get_headers(), json=user_data, timeout=10 ) response.raise_for_status() # Keycloak 在 Location header 返回新用戶的 URL location = response.headers.get("Location", "") user_id = location.split("/")[-1] if location else None print(f"✓ Created user: {username} (ID: {user_id})") return user_id except Exception as e: print(f"✗ Failed to create user {username}: {e}") if hasattr(e, 'response') and e.response is not None: print(f" Response: {e.response.text}") return None def update_user(self, user_id: str, user_data: Dict[str, Any]) -> bool: """ 更新用戶 Args: user_id: Keycloak User ID user_data: 要更新的資料 Returns: bool: 成功返回 True """ try: url = f"{self.server_url}/admin/realms/{self.realm}/users/{user_id}" response = requests.put( url, headers=self._get_headers(), json=user_data, timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.put( url, headers=self._get_headers(), json=user_data, timeout=10 ) response.raise_for_status() print(f"✓ Updated user: {user_id}") return True except Exception as e: print(f"✗ Failed to update user {user_id}: {e}") return False def enable_user(self, user_id: str) -> bool: """啟用用戶""" return self.update_user(user_id, {"enabled": True}) def disable_user(self, user_id: str) -> bool: """停用用戶""" return self.update_user(user_id, {"enabled": False}) def delete_user(self, user_id: str) -> bool: """ 刪除用戶 Args: user_id: Keycloak User ID Returns: bool: 成功返回 True """ try: url = f"{self.server_url}/admin/realms/{self.realm}/users/{user_id}" response = requests.delete( url, headers=self._get_headers(), timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.delete(url, headers=self._get_headers(), timeout=10) response.raise_for_status() print(f"✓ Deleted user: {user_id}") return True except Exception as e: print(f"✗ Failed to delete user {user_id}: {e}") return False def reset_password( self, user_id: str, password: str, temporary: bool = True ) -> bool: """ 重設密碼 Args: user_id: Keycloak User ID password: 新密碼 temporary: 是否為臨時密碼 Returns: bool: 成功返回 True """ try: url = f"{self.server_url}/admin/realms/{self.realm}/users/{user_id}/reset-password" credential = { "type": "password", "value": password, "temporary": temporary, } response = requests.put( url, headers=self._get_headers(), json=credential, timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.put( url, headers=self._get_headers(), json=credential, timeout=10 ) response.raise_for_status() print(f"✓ Reset password for user: {user_id}") return True except Exception as e: print(f"✗ Failed to reset password for {user_id}: {e}") return False # ==================== Realm Management ==================== def create_realm( self, realm_name: str, display_name: str, enabled: bool = True ) -> Optional[Dict[str, Any]]: """ 建立新的 Keycloak Realm (僅限 Superuser) Args: realm_name: Realm 識別碼 (例: porscheworld-pwd) display_name: 顯示名稱 (例: Porsche World) enabled: 是否啟用 Returns: dict: Realm 配置資訊, 失敗返回 None """ try: url = f"{self.server_url}/admin/realms" realm_config = { "realm": realm_name, "displayName": display_name, "enabled": enabled, "sslRequired": "external", "registrationAllowed": False, # 不允許自助註冊 "loginWithEmailAllowed": True, "duplicateEmailsAllowed": False, "resetPasswordAllowed": True, "editUsernameAllowed": False, "bruteForceProtected": True, "permanentLockout": False, "maxFailureWaitSeconds": 900, "minimumQuickLoginWaitSeconds": 60, "waitIncrementSeconds": 60, "quickLoginCheckMilliSeconds": 1000, "maxDeltaTimeSeconds": 43200, "failureFactor": 5, # Token 設定 "accessTokenLifespan": 1800, # 30 分鐘 "ssoSessionIdleTimeout": 3600, # 1 小時 "ssoSessionMaxLifespan": 36000, # 10 小時 "offlineSessionIdleTimeout": 2592000, # 30 天 # 國際化設定 "internationalizationEnabled": True, "supportedLocales": ["zh-TW", "en"], "defaultLocale": "zh-TW", } response = requests.post( url, headers=self._get_headers(), json=realm_config, timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.post( url, headers=self._get_headers(), json=realm_config, timeout=10 ) response.raise_for_status() print(f"✓ Created realm: {realm_name}") return realm_config except Exception as e: print(f"✗ Failed to create realm {realm_name}: {e}") if hasattr(e, 'response') and e.response is not None: print(f" Response: {e.response.text}") return None def get_realm(self, realm_name: str) -> Optional[Dict[str, Any]]: """ 取得 Realm 配置 Args: realm_name: Realm 名稱 Returns: dict: Realm 配置資訊 """ try: url = f"{self.server_url}/admin/realms/{realm_name}" response = requests.get( url, headers=self._get_headers(), timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.get(url, headers=self._get_headers(), timeout=10) response.raise_for_status() return response.json() except Exception as e: print(f"✗ Failed to get realm {realm_name}: {e}") return None def update_realm(self, realm_name: str, config: Dict[str, Any]) -> bool: """ 更新 Realm 配置 Args: realm_name: Realm 名稱 config: 要更新的配置 Returns: bool: 成功返回 True """ try: url = f"{self.server_url}/admin/realms/{realm_name}" response = requests.put( url, headers=self._get_headers(), json=config, timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.put( url, headers=self._get_headers(), json=config, timeout=10 ) response.raise_for_status() print(f"✓ Updated realm: {realm_name}") return True except Exception as e: print(f"✗ Failed to update realm {realm_name}: {e}") return False def delete_realm(self, realm_name: str) -> bool: """ 刪除 Realm (危險操作,僅限 Superuser) ⚠️ WARNING: 此操作會刪除 Realm 中所有使用者、角色、客戶端等資料 Args: realm_name: Realm 名稱 Returns: bool: 成功返回 True """ try: url = f"{self.server_url}/admin/realms/{realm_name}" response = requests.delete( url, headers=self._get_headers(), timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.delete(url, headers=self._get_headers(), timeout=10) response.raise_for_status() print(f"✓ Deleted realm: {realm_name}") return True except Exception as e: print(f"✗ Failed to delete realm {realm_name}: {e}") return False # ==================== Realm Role Management ==================== def create_realm_role( self, realm_name: str, role_name: str, description: Optional[str] = None ) -> bool: """ 在指定 Realm 建立角色 Args: realm_name: Realm 名稱 role_name: 角色名稱 description: 角色說明 Returns: bool: 成功返回 True """ try: url = f"{self.server_url}/admin/realms/{realm_name}/roles" role_data = { "name": role_name, "description": description or f"Role: {role_name}", } response = requests.post( url, headers=self._get_headers(), json=role_data, timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.post( url, headers=self._get_headers(), json=role_data, timeout=10 ) response.raise_for_status() print(f"✓ Created realm role: {role_name} in {realm_name}") return True except Exception as e: print(f"✗ Failed to create realm role {role_name}: {e}") return False def get_realm_roles(self, realm_name: str) -> List[Dict[str, Any]]: """ 取得 Realm 所有角色 Args: realm_name: Realm 名稱 Returns: list: 角色列表 """ try: url = f"{self.server_url}/admin/realms/{realm_name}/roles" response = requests.get( url, headers=self._get_headers(), timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.get(url, headers=self._get_headers(), timeout=10) response.raise_for_status() return response.json() except Exception as e: print(f"✗ Failed to get realm roles for {realm_name}: {e}") return [] def delete_realm_role(self, realm_name: str, role_name: str) -> bool: """ 刪除 Realm 角色 Args: realm_name: Realm 名稱 role_name: 角色名稱 Returns: bool: 成功返回 True """ try: url = f"{self.server_url}/admin/realms/{realm_name}/roles/{role_name}" response = requests.delete( url, headers=self._get_headers(), timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.delete(url, headers=self._get_headers(), timeout=10) response.raise_for_status() print(f"✓ Deleted realm role: {role_name} from {realm_name}") return True except Exception as e: print(f"✗ Failed to delete realm role {role_name}: {e}") return False def get_realm_role_by_name(self, realm_name: str, role_name: str) -> Optional[Dict[str, Any]]: """ 取得指定 Realm 角色的詳細資訊 Args: realm_name: Realm 名稱 role_name: 角色名稱 Returns: dict: 角色資訊 (包含 id, name, description 等) """ try: url = f"{self.server_url}/admin/realms/{realm_name}/roles/{role_name}" response = requests.get( url, headers=self._get_headers(), timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.get(url, headers=self._get_headers(), timeout=10) response.raise_for_status() return response.json() except Exception as e: print(f"✗ Failed to get realm role {role_name}: {e}") return None def assign_realm_role_to_user( self, realm_name: str, user_id: str, role_name: str ) -> bool: """ 將 Realm 角色分配給使用者 Args: realm_name: Realm 名稱 user_id: Keycloak User ID role_name: 角色名稱 Returns: bool: 成功返回 True """ try: # Step 1: 取得角色詳細資訊 (需要 role id) role = self.get_realm_role_by_name(realm_name, role_name) if not role: print(f"✗ Role {role_name} not found in realm {realm_name}") return False # Step 2: 分配角色給使用者 url = f"{self.server_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm" # Keycloak 要求傳入角色的完整資訊 (id, name 等) role_mapping = [{ "id": role["id"], "name": role["name"], }] response = requests.post( url, headers=self._get_headers(), json=role_mapping, timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.post( url, headers=self._get_headers(), json=role_mapping, timeout=10 ) response.raise_for_status() print(f"✓ Assigned role '{role_name}' to user {user_id} in realm {realm_name}") return True except Exception as e: print(f"✗ Failed to assign role {role_name} to user {user_id}: {e}") if hasattr(e, 'response') and e.response is not None: print(f" Response: {e.response.text}") return False def remove_realm_role_from_user( self, realm_name: str, user_id: str, role_name: str ) -> bool: """ 從使用者移除 Realm 角色 Args: realm_name: Realm 名稱 user_id: Keycloak User ID role_name: 角色名稱 Returns: bool: 成功返回 True """ try: # Step 1: 取得角色詳細資訊 role = self.get_realm_role_by_name(realm_name, role_name) if not role: print(f"✗ Role {role_name} not found in realm {realm_name}") return False # Step 2: 從使用者移除角色 url = f"{self.server_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm" role_mapping = [{ "id": role["id"], "name": role["name"], }] response = requests.delete( url, headers=self._get_headers(), json=role_mapping, timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.delete( url, headers=self._get_headers(), json=role_mapping, timeout=10 ) response.raise_for_status() print(f"✓ Removed role '{role_name}' from user {user_id} in realm {realm_name}") return True except Exception as e: print(f"✗ Failed to remove role {role_name} from user {user_id}: {e}") return False def get_user_realm_roles(self, realm_name: str, user_id: str) -> List[Dict[str, Any]]: """ 取得使用者的所有 Realm 角色 Args: realm_name: Realm 名稱 user_id: Keycloak User ID Returns: list: 角色列表 """ try: url = f"{self.server_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm" response = requests.get( url, headers=self._get_headers(), timeout=10 ) if response.status_code == 401: self._get_admin_token() response = requests.get(url, headers=self._get_headers(), timeout=10) response.raise_for_status() return response.json() except Exception as e: print(f"✗ Failed to get user roles for {user_id}: {e}") return [] # 全域實例 (延遲初始化) _keycloak_admin_client: Optional[KeycloakAdminClient] = None def get_keycloak_admin_client() -> KeycloakAdminClient: """獲取 Keycloak Admin 客戶端實例 (單例)""" global _keycloak_admin_client if _keycloak_admin_client is None: _keycloak_admin_client = KeycloakAdminClient() return _keycloak_admin_client