Major Features: - ✅ Multi-tenant architecture (tenant isolation) - ✅ Employee CRUD with lifecycle management (onboarding/offboarding) - ✅ Department tree structure with email domain management - ✅ Company info management (single-record editing) - ✅ System functions CRUD (permission management) - ✅ Email account management (multi-account per employee) - ✅ Keycloak SSO integration (auth.lab.taipei) - ✅ Redis session storage (10.1.0.254:6379) - Solves Cookie 4KB limitation - Cross-system session sharing - Sliding expiration (8 hours) - Automatic token refresh Technical Stack: Backend: - FastAPI + SQLAlchemy - PostgreSQL 16 (10.1.0.20:5433) - Keycloak Admin API integration - Docker Mailserver integration (SSH) - Alembic migrations Frontend: - Next.js 14 (App Router) - NextAuth 4 with Keycloak Provider - Redis session storage (ioredis) - Tailwind CSS Infrastructure: - Redis 7 (10.1.0.254:6379) - Session + Cache - Keycloak 26.1.0 (auth.lab.taipei) - Docker Mailserver (10.1.0.254) Architecture Highlights: - Session管理由 Keycloak + Redis 統一控制 - 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session - Token 自動刷新,異質服務整合 - 未來可無縫遷移到雲端 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
817 lines
24 KiB
Python
817 lines
24 KiB
Python
"""
|
|
Keycloak Admin REST API 客戶端
|
|
直接使用 REST API,避免 python-keycloak 套件的版本兼容性問題
|
|
"""
|
|
import requests
|
|
from typing import Optional, Dict, Any, List
|
|
from app.core.config import settings
|
|
|
|
|
|
class KeycloakAdminClient:
|
|
"""Keycloak Admin REST API 客戶端"""
|
|
|
|
def __init__(self):
|
|
"""初始化客戶端"""
|
|
self.server_url = settings.KEYCLOAK_URL
|
|
self.realm = settings.KEYCLOAK_REALM
|
|
self.admin_username = settings.KEYCLOAK_ADMIN_USERNAME
|
|
self.admin_password = settings.KEYCLOAK_ADMIN_PASSWORD
|
|
self._access_token: Optional[str] = None
|
|
|
|
def _get_admin_token(self) -> Optional[str]:
|
|
"""
|
|
獲取 Admin 訪問令牌
|
|
|
|
Returns:
|
|
str: Access Token, 失敗返回 None
|
|
"""
|
|
try:
|
|
token_url = f"{self.server_url}/realms/master/protocol/openid-connect/token"
|
|
|
|
data = {
|
|
"client_id": "admin-cli",
|
|
"username": self.admin_username,
|
|
"password": self.admin_password,
|
|
"grant_type": "password",
|
|
}
|
|
|
|
response = requests.post(token_url, data=data, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
token_data = response.json()
|
|
self._access_token = token_data.get("access_token")
|
|
return self._access_token
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to get admin token: {e}")
|
|
return None
|
|
|
|
def _get_headers(self) -> Dict[str, str]:
|
|
"""
|
|
獲取請求標頭
|
|
|
|
Returns:
|
|
dict: 包含 Authorization 的標頭
|
|
"""
|
|
if not self._access_token:
|
|
self._get_admin_token()
|
|
|
|
return {
|
|
"Authorization": f"Bearer {self._access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
def get_users(self, query: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
獲取用戶列表
|
|
|
|
Args:
|
|
query: 查詢參數 (username, email, first, max, etc.)
|
|
|
|
Returns:
|
|
list: 用戶列表
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{self.realm}/users"
|
|
params = query or {}
|
|
|
|
response = requests.get(
|
|
url,
|
|
headers=self._get_headers(),
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
|
|
# 如果是 401,重新獲取 token 並重試
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.get(
|
|
url,
|
|
headers=self._get_headers(),
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to get users: {e}")
|
|
return []
|
|
|
|
def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
根據用戶名獲取用戶
|
|
|
|
Args:
|
|
username: 用戶名稱
|
|
|
|
Returns:
|
|
dict: 用戶資料, 不存在返回 None
|
|
"""
|
|
users = self.get_users({"username": username, "exact": True})
|
|
return users[0] if users else None
|
|
|
|
def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
根據 ID 獲取用戶
|
|
|
|
Args:
|
|
user_id: Keycloak User ID
|
|
|
|
Returns:
|
|
dict: 用戶資料
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{self.realm}/users/{user_id}"
|
|
|
|
response = requests.get(
|
|
url,
|
|
headers=self._get_headers(),
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.get(url, headers=self._get_headers(), timeout=10)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to get user {user_id}: {e}")
|
|
return None
|
|
|
|
def create_user(
|
|
self,
|
|
username: str,
|
|
email: str,
|
|
first_name: str,
|
|
last_name: str,
|
|
enabled: bool = True,
|
|
email_verified: bool = False,
|
|
) -> Optional[str]:
|
|
"""
|
|
創建用戶
|
|
|
|
Args:
|
|
username: 用戶名稱
|
|
email: 郵件地址
|
|
first_name: 名字
|
|
last_name: 姓氏
|
|
enabled: 是否啟用
|
|
email_verified: 郵件是否已驗證
|
|
|
|
Returns:
|
|
str: User ID (成功時), None (失敗時)
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{self.realm}/users"
|
|
|
|
user_data = {
|
|
"username": username,
|
|
"email": email,
|
|
"firstName": first_name,
|
|
"lastName": last_name,
|
|
"enabled": enabled,
|
|
"emailVerified": email_verified,
|
|
}
|
|
|
|
response = requests.post(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=user_data,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.post(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=user_data,
|
|
timeout=10
|
|
)
|
|
|
|
response.raise_for_status()
|
|
|
|
# Keycloak 在 Location header 返回新用戶的 URL
|
|
location = response.headers.get("Location", "")
|
|
user_id = location.split("/")[-1] if location else None
|
|
|
|
print(f"✓ Created user: {username} (ID: {user_id})")
|
|
return user_id
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to create user {username}: {e}")
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
print(f" Response: {e.response.text}")
|
|
return None
|
|
|
|
def update_user(self, user_id: str, user_data: Dict[str, Any]) -> bool:
|
|
"""
|
|
更新用戶
|
|
|
|
Args:
|
|
user_id: Keycloak User ID
|
|
user_data: 要更新的資料
|
|
|
|
Returns:
|
|
bool: 成功返回 True
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{self.realm}/users/{user_id}"
|
|
|
|
response = requests.put(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=user_data,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.put(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=user_data,
|
|
timeout=10
|
|
)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Updated user: {user_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to update user {user_id}: {e}")
|
|
return False
|
|
|
|
def enable_user(self, user_id: str) -> bool:
|
|
"""啟用用戶"""
|
|
return self.update_user(user_id, {"enabled": True})
|
|
|
|
def disable_user(self, user_id: str) -> bool:
|
|
"""停用用戶"""
|
|
return self.update_user(user_id, {"enabled": False})
|
|
|
|
def delete_user(self, user_id: str) -> bool:
|
|
"""
|
|
刪除用戶
|
|
|
|
Args:
|
|
user_id: Keycloak User ID
|
|
|
|
Returns:
|
|
bool: 成功返回 True
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{self.realm}/users/{user_id}"
|
|
|
|
response = requests.delete(
|
|
url,
|
|
headers=self._get_headers(),
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.delete(url, headers=self._get_headers(), timeout=10)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Deleted user: {user_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to delete user {user_id}: {e}")
|
|
return False
|
|
|
|
def reset_password(
|
|
self,
|
|
user_id: str,
|
|
password: str,
|
|
temporary: bool = True
|
|
) -> bool:
|
|
"""
|
|
重設密碼
|
|
|
|
Args:
|
|
user_id: Keycloak User ID
|
|
password: 新密碼
|
|
temporary: 是否為臨時密碼
|
|
|
|
Returns:
|
|
bool: 成功返回 True
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{self.realm}/users/{user_id}/reset-password"
|
|
|
|
credential = {
|
|
"type": "password",
|
|
"value": password,
|
|
"temporary": temporary,
|
|
}
|
|
|
|
response = requests.put(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=credential,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.put(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=credential,
|
|
timeout=10
|
|
)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Reset password for user: {user_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to reset password for {user_id}: {e}")
|
|
return False
|
|
|
|
# ==================== Realm Management ====================
|
|
|
|
def create_realm(
|
|
self,
|
|
realm_name: str,
|
|
display_name: str,
|
|
enabled: bool = True
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
建立新的 Keycloak Realm (僅限 Superuser)
|
|
|
|
Args:
|
|
realm_name: Realm 識別碼 (例: porscheworld-pwd)
|
|
display_name: 顯示名稱 (例: Porsche World)
|
|
enabled: 是否啟用
|
|
|
|
Returns:
|
|
dict: Realm 配置資訊, 失敗返回 None
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms"
|
|
|
|
realm_config = {
|
|
"realm": realm_name,
|
|
"displayName": display_name,
|
|
"enabled": enabled,
|
|
"sslRequired": "external",
|
|
"registrationAllowed": False, # 不允許自助註冊
|
|
"loginWithEmailAllowed": True,
|
|
"duplicateEmailsAllowed": False,
|
|
"resetPasswordAllowed": True,
|
|
"editUsernameAllowed": False,
|
|
"bruteForceProtected": True,
|
|
"permanentLockout": False,
|
|
"maxFailureWaitSeconds": 900,
|
|
"minimumQuickLoginWaitSeconds": 60,
|
|
"waitIncrementSeconds": 60,
|
|
"quickLoginCheckMilliSeconds": 1000,
|
|
"maxDeltaTimeSeconds": 43200,
|
|
"failureFactor": 5,
|
|
# Token 設定
|
|
"accessTokenLifespan": 1800, # 30 分鐘
|
|
"ssoSessionIdleTimeout": 3600, # 1 小時
|
|
"ssoSessionMaxLifespan": 36000, # 10 小時
|
|
"offlineSessionIdleTimeout": 2592000, # 30 天
|
|
# 國際化設定
|
|
"internationalizationEnabled": True,
|
|
"supportedLocales": ["zh-TW", "en"],
|
|
"defaultLocale": "zh-TW",
|
|
}
|
|
|
|
response = requests.post(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=realm_config,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.post(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=realm_config,
|
|
timeout=10
|
|
)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Created realm: {realm_name}")
|
|
return realm_config
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to create realm {realm_name}: {e}")
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
print(f" Response: {e.response.text}")
|
|
return None
|
|
|
|
def get_realm(self, realm_name: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
取得 Realm 配置
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
|
|
Returns:
|
|
dict: Realm 配置資訊
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{realm_name}"
|
|
|
|
response = requests.get(
|
|
url,
|
|
headers=self._get_headers(),
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.get(url, headers=self._get_headers(), timeout=10)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to get realm {realm_name}: {e}")
|
|
return None
|
|
|
|
def update_realm(self, realm_name: str, config: Dict[str, Any]) -> bool:
|
|
"""
|
|
更新 Realm 配置
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
config: 要更新的配置
|
|
|
|
Returns:
|
|
bool: 成功返回 True
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{realm_name}"
|
|
|
|
response = requests.put(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=config,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.put(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=config,
|
|
timeout=10
|
|
)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Updated realm: {realm_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to update realm {realm_name}: {e}")
|
|
return False
|
|
|
|
def delete_realm(self, realm_name: str) -> bool:
|
|
"""
|
|
刪除 Realm (危險操作,僅限 Superuser)
|
|
|
|
⚠️ WARNING: 此操作會刪除 Realm 中所有使用者、角色、客戶端等資料
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
|
|
Returns:
|
|
bool: 成功返回 True
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{realm_name}"
|
|
|
|
response = requests.delete(
|
|
url,
|
|
headers=self._get_headers(),
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.delete(url, headers=self._get_headers(), timeout=10)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Deleted realm: {realm_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to delete realm {realm_name}: {e}")
|
|
return False
|
|
|
|
# ==================== Realm Role Management ====================
|
|
|
|
def create_realm_role(
|
|
self,
|
|
realm_name: str,
|
|
role_name: str,
|
|
description: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
在指定 Realm 建立角色
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
role_name: 角色名稱
|
|
description: 角色說明
|
|
|
|
Returns:
|
|
bool: 成功返回 True
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{realm_name}/roles"
|
|
|
|
role_data = {
|
|
"name": role_name,
|
|
"description": description or f"Role: {role_name}",
|
|
}
|
|
|
|
response = requests.post(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=role_data,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.post(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=role_data,
|
|
timeout=10
|
|
)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Created realm role: {role_name} in {realm_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to create realm role {role_name}: {e}")
|
|
return False
|
|
|
|
def get_realm_roles(self, realm_name: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
取得 Realm 所有角色
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
|
|
Returns:
|
|
list: 角色列表
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{realm_name}/roles"
|
|
|
|
response = requests.get(
|
|
url,
|
|
headers=self._get_headers(),
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.get(url, headers=self._get_headers(), timeout=10)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to get realm roles for {realm_name}: {e}")
|
|
return []
|
|
|
|
def delete_realm_role(self, realm_name: str, role_name: str) -> bool:
|
|
"""
|
|
刪除 Realm 角色
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
role_name: 角色名稱
|
|
|
|
Returns:
|
|
bool: 成功返回 True
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{realm_name}/roles/{role_name}"
|
|
|
|
response = requests.delete(
|
|
url,
|
|
headers=self._get_headers(),
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.delete(url, headers=self._get_headers(), timeout=10)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Deleted realm role: {role_name} from {realm_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to delete realm role {role_name}: {e}")
|
|
return False
|
|
|
|
def get_realm_role_by_name(self, realm_name: str, role_name: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
取得指定 Realm 角色的詳細資訊
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
role_name: 角色名稱
|
|
|
|
Returns:
|
|
dict: 角色資訊 (包含 id, name, description 等)
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{realm_name}/roles/{role_name}"
|
|
|
|
response = requests.get(
|
|
url,
|
|
headers=self._get_headers(),
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.get(url, headers=self._get_headers(), timeout=10)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to get realm role {role_name}: {e}")
|
|
return None
|
|
|
|
def assign_realm_role_to_user(
|
|
self,
|
|
realm_name: str,
|
|
user_id: str,
|
|
role_name: str
|
|
) -> bool:
|
|
"""
|
|
將 Realm 角色分配給使用者
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
user_id: Keycloak User ID
|
|
role_name: 角色名稱
|
|
|
|
Returns:
|
|
bool: 成功返回 True
|
|
"""
|
|
try:
|
|
# Step 1: 取得角色詳細資訊 (需要 role id)
|
|
role = self.get_realm_role_by_name(realm_name, role_name)
|
|
if not role:
|
|
print(f"✗ Role {role_name} not found in realm {realm_name}")
|
|
return False
|
|
|
|
# Step 2: 分配角色給使用者
|
|
url = f"{self.server_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm"
|
|
|
|
# Keycloak 要求傳入角色的完整資訊 (id, name 等)
|
|
role_mapping = [{
|
|
"id": role["id"],
|
|
"name": role["name"],
|
|
}]
|
|
|
|
response = requests.post(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=role_mapping,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.post(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=role_mapping,
|
|
timeout=10
|
|
)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Assigned role '{role_name}' to user {user_id} in realm {realm_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to assign role {role_name} to user {user_id}: {e}")
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
print(f" Response: {e.response.text}")
|
|
return False
|
|
|
|
def remove_realm_role_from_user(
|
|
self,
|
|
realm_name: str,
|
|
user_id: str,
|
|
role_name: str
|
|
) -> bool:
|
|
"""
|
|
從使用者移除 Realm 角色
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
user_id: Keycloak User ID
|
|
role_name: 角色名稱
|
|
|
|
Returns:
|
|
bool: 成功返回 True
|
|
"""
|
|
try:
|
|
# Step 1: 取得角色詳細資訊
|
|
role = self.get_realm_role_by_name(realm_name, role_name)
|
|
if not role:
|
|
print(f"✗ Role {role_name} not found in realm {realm_name}")
|
|
return False
|
|
|
|
# Step 2: 從使用者移除角色
|
|
url = f"{self.server_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm"
|
|
|
|
role_mapping = [{
|
|
"id": role["id"],
|
|
"name": role["name"],
|
|
}]
|
|
|
|
response = requests.delete(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=role_mapping,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.delete(
|
|
url,
|
|
headers=self._get_headers(),
|
|
json=role_mapping,
|
|
timeout=10
|
|
)
|
|
|
|
response.raise_for_status()
|
|
print(f"✓ Removed role '{role_name}' from user {user_id} in realm {realm_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to remove role {role_name} from user {user_id}: {e}")
|
|
return False
|
|
|
|
def get_user_realm_roles(self, realm_name: str, user_id: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
取得使用者的所有 Realm 角色
|
|
|
|
Args:
|
|
realm_name: Realm 名稱
|
|
user_id: Keycloak User ID
|
|
|
|
Returns:
|
|
list: 角色列表
|
|
"""
|
|
try:
|
|
url = f"{self.server_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm"
|
|
|
|
response = requests.get(
|
|
url,
|
|
headers=self._get_headers(),
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self._get_admin_token()
|
|
response = requests.get(url, headers=self._get_headers(), timeout=10)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to get user roles for {user_id}: {e}")
|
|
return []
|
|
|
|
|
|
# 全域實例 (延遲初始化)
|
|
_keycloak_admin_client: Optional[KeycloakAdminClient] = None
|
|
|
|
|
|
def get_keycloak_admin_client() -> KeycloakAdminClient:
|
|
"""獲取 Keycloak Admin 客戶端實例 (單例)"""
|
|
global _keycloak_admin_client
|
|
if _keycloak_admin_client is None:
|
|
_keycloak_admin_client = KeycloakAdminClient()
|
|
return _keycloak_admin_client
|