Files
vmis/backend/app/services/keycloak_client.py
VMIS Developer 62baadb06f feat(vmis): 租戶自動開通完整流程 + Admin Portal SSO + NC 行事曆訂閱
Backend:
- schedule_tenant: NC 新容器自動 pgsql 安裝 (_nc_db_check 全新容器處理)
- schedule_tenant: NC 初始化加入 Redis + APCu memcache 設定 (修正 OIDC invalid_state)
- schedule_tenant: 新租戶 KC realm 自動設定 accessCodeLifespan=600s (修正 authentication_expired)
- schedule_account: NC Mail 帳號自動設定 (nc_mail_result/nc_mail_done_at)
- schedule_account: NC 台灣國定假日行事曆自動訂閱 (CalDAV MKCALENDAR)
- nextcloud_client: 新增 subscribe_calendar() CalDAV 訂閱方法
- settings: 新增系統設定 API (site_title/version/timezone/SSO/Keycloak)
- models/result: 新增 nc_mail_result, nc_mail_done_at 欄位
- alembic: 遷移 002(system_settings) 003(keycloak_admin) 004(nc_mail_result)

Frontend (Admin Portal):
- 新增完整管理後台 (index/tenants/accounts/servers/schedules/logs/settings/system-status)
- api.js: Keycloak JS Adapter SSO 整合 (PKCE/S256, fallback KC JS 來源, 自動 token 更新)
- index.html: Promise.allSettled 取代 Promise.all,防止單一 API 失敗影響整頁
- 所有頁面加入 try/catch + toast 錯誤處理
- 新增品牌 LOGO 與 favicon

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 15:31:37 +08:00

247 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
KeycloakClient — 直接呼叫 Keycloak REST API不使用 python-keycloak 套件。
使用 master realm admin 帳密取得管理 token管理租戶 realm 及帳號。
"""
import logging
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
TIMEOUT = 10.0
class KeycloakClient:
def __init__(self, base_url: str, admin_user: str, admin_pass: str):
self._base = base_url.rstrip("/")
self._admin_user = admin_user
self._admin_pass = admin_pass
self._admin_token: Optional[str] = None
def _get_admin_token(self) -> str:
"""取得 master realm 的 admin access tokenResource Owner Password"""
url = f"{self._base}/realms/master/protocol/openid-connect/token"
resp = httpx.post(
url,
data={
"grant_type": "password",
"client_id": "admin-cli",
"username": self._admin_user,
"password": self._admin_pass,
},
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
return resp.json()["access_token"]
def _headers(self) -> dict:
if not self._admin_token:
self._admin_token = self._get_admin_token()
return {"Authorization": f"Bearer {self._admin_token}"}
def _admin_url(self, path: str) -> str:
return f"{self._base}/admin/realms/{path}"
def realm_exists(self, realm: str) -> bool:
try:
resp = httpx.get(
self._admin_url(realm),
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return resp.status_code == 200
except Exception:
return False
def create_realm(self, realm: str, display_name: str) -> bool:
payload = {
"realm": realm,
"displayName": display_name,
"enabled": True,
"loginTheme": "keycloak",
}
resp = httpx.post(
f"{self._base}/admin/realms",
json=payload,
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return resp.status_code in (201, 204)
def update_realm_token_settings(self, realm: str, access_code_lifespan: int = 600) -> bool:
"""設定 realm 的授權碼有效期(秒),預設 10 分鐘"""
resp = httpx.put(
self._admin_url(realm),
json={
"accessCodeLifespan": access_code_lifespan,
"actionTokenGeneratedByUserLifespan": access_code_lifespan,
},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return resp.status_code in (200, 204)
def send_welcome_email(self, realm: str, user_id: str) -> bool:
"""寄送歡迎信(含設定密碼連結)給新使用者"""
try:
resp = httpx.put(
self._admin_url(f"{realm}/users/{user_id}/execute-actions-email"),
json=["UPDATE_PASSWORD"],
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return resp.status_code in (200, 204)
except Exception as e:
logger.error(f"KC send_welcome_email({realm}/{user_id}) failed: {e}")
return False
def get_user_uuid(self, realm: str, username: str) -> Optional[str]:
resp = httpx.get(
self._admin_url(f"{realm}/users"),
params={"username": username, "exact": "true"},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
users = resp.json()
return users[0]["id"] if users else None
def create_user(
self,
realm: str,
username: str,
email: str,
password: Optional[str],
) -> Optional[str]:
payload = {
"username": username,
"email": email,
"enabled": True,
"emailVerified": True,
}
if password:
payload["credentials"] = [
{"type": "password", "value": password, "temporary": True}
]
resp = httpx.post(
self._admin_url(f"{realm}/users"),
json=payload,
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
if resp.status_code == 201:
location = resp.headers.get("Location", "")
return location.rstrip("/").split("/")[-1]
return None
def create_public_client(self, realm: str, client_id: str) -> str:
"""建立 Public Client。回傳 'exists' / 'created' / 'failed'"""
resp = httpx.get(
self._admin_url(f"{realm}/clients"),
params={"clientId": client_id},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
if resp.json():
return "exists"
payload = {
"clientId": client_id,
"name": client_id,
"enabled": True,
"publicClient": True,
"standardFlowEnabled": True,
"directAccessGrantsEnabled": False,
"redirectUris": ["*"],
"webOrigins": ["*"],
}
resp = httpx.post(
self._admin_url(f"{realm}/clients"),
json=payload,
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return "created" if resp.status_code in (201, 204) else "failed"
def create_confidential_client(self, realm: str, client_id: str, redirect_uris: list[str]) -> str:
"""建立 Confidential Client用於 NC OIDC。回傳 'exists' / 'created' / 'failed'"""
resp = httpx.get(
self._admin_url(f"{realm}/clients"),
params={"clientId": client_id},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
if resp.json():
return "exists"
origin = redirect_uris[0].rstrip("/*") if redirect_uris else "*"
payload = {
"clientId": client_id,
"name": client_id,
"enabled": True,
"publicClient": False,
"standardFlowEnabled": True,
"directAccessGrantsEnabled": False,
"redirectUris": redirect_uris,
"webOrigins": [origin],
"protocol": "openid-connect",
}
resp = httpx.post(
self._admin_url(f"{realm}/clients"),
json=payload,
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return "created" if resp.status_code in (201, 204) else "failed"
def get_client_secret(self, realm: str, client_id: str) -> Optional[str]:
"""取得 Confidential Client 的 Secret"""
resp = httpx.get(
self._admin_url(f"{realm}/clients"),
params={"clientId": client_id},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
clients = resp.json()
if not clients:
return None
client_uuid = clients[0]["id"]
resp = httpx.get(
self._admin_url(f"{realm}/clients/{client_uuid}/client-secret"),
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
if resp.status_code == 200:
return resp.json().get("value")
return None
def get_keycloak_client() -> KeycloakClient:
"""Factory: reads credentials from system settings DB."""
from app.core.database import SessionLocal
from app.models.settings import SystemSettings
db = SessionLocal()
try:
s = db.query(SystemSettings).first()
if s:
return KeycloakClient(s.keycloak_url, s.keycloak_admin_user, s.keycloak_admin_pass)
finally:
db.close()
# Fallback to defaults
return KeycloakClient("https://auth.lab.taipei", "admin", "")