""" KeycloakClient — 直接呼叫 Keycloak REST API,不使用 python-keycloak 套件。 管理租戶 realm 及帳號的建立/查詢。 """ import logging from typing import Optional import httpx from app.core.config import settings logger = logging.getLogger(__name__) TIMEOUT = 10.0 class KeycloakClient: def __init__(self): self._base = settings.KEYCLOAK_URL.rstrip("/") self._admin_token: Optional[str] = None def _get_admin_token(self) -> str: """取得 vmis-admin realm 的 admin access token""" url = f"{self._base}/realms/{settings.KEYCLOAK_ADMIN_REALM}/protocol/openid-connect/token" resp = httpx.post( url, data={ "grant_type": "client_credentials", "client_id": settings.KEYCLOAK_ADMIN_CLIENT_ID, "client_secret": settings.KEYCLOAK_ADMIN_CLIENT_SECRET, }, timeout=TIMEOUT, ) resp.raise_for_status() return resp.json()["access_token"] def _headers(self) -> dict: if not self._admin_token: self._admin_token = self._get_admin_token() return {"Authorization": f"Bearer {self._admin_token}"} def _admin_url(self, path: str) -> str: return f"{self._base}/admin/realms/{path}" def realm_exists(self, realm: str) -> bool: try: resp = httpx.get(self._admin_url(realm), headers=self._headers(), timeout=TIMEOUT) return resp.status_code == 200 except Exception: return False def create_realm(self, realm: str, display_name: str) -> bool: payload = { "realm": realm, "displayName": display_name, "enabled": True, "loginTheme": "keycloak", } resp = httpx.post( f"{self._base}/admin/realms", json=payload, headers=self._headers(), timeout=TIMEOUT, ) return resp.status_code in (201, 204) def get_user_uuid(self, realm: str, username: str) -> Optional[str]: resp = httpx.get( self._admin_url(f"{realm}/users"), params={"username": username, "exact": "true"}, headers=self._headers(), timeout=TIMEOUT, ) resp.raise_for_status() users = resp.json() return users[0]["id"] if users else None def create_user(self, realm: str, username: str, email: str, password: Optional[str]) -> Optional[str]: payload = { "username": username, "email": email, "enabled": True, "emailVerified": True, } if password: payload["credentials"] = [{"type": "password", "value": password, "temporary": True}] resp = httpx.post( self._admin_url(f"{realm}/users"), json=payload, headers=self._headers(), timeout=TIMEOUT, ) if resp.status_code == 201: location = resp.headers.get("Location", "") return location.rstrip("/").split("/")[-1] return None