""" KeycloakClient — 直接呼叫 Keycloak REST API,不使用 python-keycloak 套件。 使用 master realm admin 帳密取得管理 token,管理租戶 realm 及帳號。 """ import logging from typing import Optional import httpx logger = logging.getLogger(__name__) TIMEOUT = 10.0 class KeycloakClient: def __init__(self, base_url: str, admin_user: str, admin_pass: str): self._base = base_url.rstrip("/") self._admin_user = admin_user self._admin_pass = admin_pass self._admin_token: Optional[str] = None def _get_admin_token(self) -> str: """取得 master realm 的 admin access token(Resource Owner Password)""" url = f"{self._base}/realms/master/protocol/openid-connect/token" resp = httpx.post( url, data={ "grant_type": "password", "client_id": "admin-cli", "username": self._admin_user, "password": self._admin_pass, }, timeout=TIMEOUT, verify=False, ) resp.raise_for_status() return resp.json()["access_token"] def _headers(self) -> dict: if not self._admin_token: self._admin_token = self._get_admin_token() return {"Authorization": f"Bearer {self._admin_token}"} def _admin_url(self, path: str) -> str: return f"{self._base}/admin/realms/{path}" def realm_exists(self, realm: str) -> bool: try: resp = httpx.get( self._admin_url(realm), headers=self._headers(), timeout=TIMEOUT, verify=False, ) return resp.status_code == 200 except Exception: return False def create_realm(self, realm: str, display_name: str) -> bool: payload = { "realm": realm, "displayName": display_name, "enabled": True, "loginTheme": "keycloak", } resp = httpx.post( f"{self._base}/admin/realms", json=payload, headers=self._headers(), timeout=TIMEOUT, verify=False, ) return resp.status_code in (201, 204) def update_realm_token_settings(self, realm: str, access_code_lifespan: int = 600) -> bool: """設定 realm 的授權碼有效期(秒),預設 10 分鐘""" resp = httpx.put( self._admin_url(realm), json={ "accessCodeLifespan": access_code_lifespan, "actionTokenGeneratedByUserLifespan": access_code_lifespan, }, headers=self._headers(), timeout=TIMEOUT, verify=False, ) return resp.status_code in (200, 204) def send_welcome_email(self, realm: str, user_id: str) -> bool: """寄送歡迎信(含設定密碼連結)給新使用者""" try: resp = httpx.put( self._admin_url(f"{realm}/users/{user_id}/execute-actions-email"), json=["UPDATE_PASSWORD"], headers=self._headers(), timeout=TIMEOUT, verify=False, ) return resp.status_code in (200, 204) except Exception as e: logger.error(f"KC send_welcome_email({realm}/{user_id}) failed: {e}") return False def get_user_uuid(self, realm: str, username: str) -> Optional[str]: resp = httpx.get( self._admin_url(f"{realm}/users"), params={"username": username, "exact": "true"}, headers=self._headers(), timeout=TIMEOUT, verify=False, ) resp.raise_for_status() users = resp.json() return users[0]["id"] if users else None def create_user( self, realm: str, username: str, email: str, password: Optional[str], ) -> Optional[str]: payload = { "username": username, "email": email, "enabled": True, "emailVerified": True, } if password: payload["credentials"] = [ {"type": "password", "value": password, "temporary": True} ] resp = httpx.post( self._admin_url(f"{realm}/users"), json=payload, headers=self._headers(), timeout=TIMEOUT, verify=False, ) if resp.status_code == 201: location = resp.headers.get("Location", "") return location.rstrip("/").split("/")[-1] return None def create_public_client(self, realm: str, client_id: str) -> str: """建立 Public Client。回傳 'exists' / 'created' / 'failed'""" resp = httpx.get( self._admin_url(f"{realm}/clients"), params={"clientId": client_id}, headers=self._headers(), timeout=TIMEOUT, verify=False, ) resp.raise_for_status() if resp.json(): return "exists" payload = { "clientId": client_id, "name": client_id, "enabled": True, "publicClient": True, "standardFlowEnabled": True, "directAccessGrantsEnabled": False, "redirectUris": ["*"], "webOrigins": ["*"], } resp = httpx.post( self._admin_url(f"{realm}/clients"), json=payload, headers=self._headers(), timeout=TIMEOUT, verify=False, ) return "created" if resp.status_code in (201, 204) else "failed" def create_confidential_client(self, realm: str, client_id: str, redirect_uris: list[str]) -> str: """建立 Confidential Client(用於 NC OIDC)。回傳 'exists' / 'created' / 'failed'""" resp = httpx.get( self._admin_url(f"{realm}/clients"), params={"clientId": client_id}, headers=self._headers(), timeout=TIMEOUT, verify=False, ) resp.raise_for_status() if resp.json(): return "exists" origin = redirect_uris[0].rstrip("/*") if redirect_uris else "*" payload = { "clientId": client_id, "name": client_id, "enabled": True, "publicClient": False, "standardFlowEnabled": True, "directAccessGrantsEnabled": False, "redirectUris": redirect_uris, "webOrigins": [origin], "protocol": "openid-connect", } resp = httpx.post( self._admin_url(f"{realm}/clients"), json=payload, headers=self._headers(), timeout=TIMEOUT, verify=False, ) return "created" if resp.status_code in (201, 204) else "failed" def get_client_secret(self, realm: str, client_id: str) -> Optional[str]: """取得 Confidential Client 的 Secret""" resp = httpx.get( self._admin_url(f"{realm}/clients"), params={"clientId": client_id}, headers=self._headers(), timeout=TIMEOUT, verify=False, ) resp.raise_for_status() clients = resp.json() if not clients: return None client_uuid = clients[0]["id"] resp = httpx.get( self._admin_url(f"{realm}/clients/{client_uuid}/client-secret"), headers=self._headers(), timeout=TIMEOUT, verify=False, ) if resp.status_code == 200: return resp.json().get("value") return None def get_keycloak_client() -> KeycloakClient: """Factory: reads credentials from system settings DB.""" from app.core.database import SessionLocal from app.models.settings import SystemSettings db = SessionLocal() try: s = db.query(SystemSettings).first() if s: return KeycloakClient(s.keycloak_url, s.keycloak_admin_user, s.keycloak_admin_pass) finally: db.close() # Fallback to defaults return KeycloakClient("https://auth.lab.taipei", "admin", "")