Backend: - schedule_tenant: NC 新容器自動 pgsql 安裝 (_nc_db_check 全新容器處理) - schedule_tenant: NC 初始化加入 Redis + APCu memcache 設定 (修正 OIDC invalid_state) - schedule_tenant: 新租戶 KC realm 自動設定 accessCodeLifespan=600s (修正 authentication_expired) - schedule_account: NC Mail 帳號自動設定 (nc_mail_result/nc_mail_done_at) - schedule_account: NC 台灣國定假日行事曆自動訂閱 (CalDAV MKCALENDAR) - nextcloud_client: 新增 subscribe_calendar() CalDAV 訂閱方法 - settings: 新增系統設定 API (site_title/version/timezone/SSO/Keycloak) - models/result: 新增 nc_mail_result, nc_mail_done_at 欄位 - alembic: 遷移 002(system_settings) 003(keycloak_admin) 004(nc_mail_result) Frontend (Admin Portal): - 新增完整管理後台 (index/tenants/accounts/servers/schedules/logs/settings/system-status) - api.js: Keycloak JS Adapter SSO 整合 (PKCE/S256, fallback KC JS 來源, 自動 token 更新) - index.html: Promise.allSettled 取代 Promise.all,防止單一 API 失敗影響整頁 - 所有頁面加入 try/catch + toast 錯誤處理 - 新增品牌 LOGO 與 favicon Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
247 lines
8.1 KiB
Python
247 lines
8.1 KiB
Python
"""
|
||
KeycloakClient — 直接呼叫 Keycloak REST API,不使用 python-keycloak 套件。
|
||
使用 master realm admin 帳密取得管理 token,管理租戶 realm 及帳號。
|
||
"""
|
||
import logging
|
||
from typing import Optional
|
||
import httpx
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
TIMEOUT = 10.0
|
||
|
||
|
||
class KeycloakClient:
|
||
def __init__(self, base_url: str, admin_user: str, admin_pass: str):
|
||
self._base = base_url.rstrip("/")
|
||
self._admin_user = admin_user
|
||
self._admin_pass = admin_pass
|
||
self._admin_token: Optional[str] = None
|
||
|
||
def _get_admin_token(self) -> str:
|
||
"""取得 master realm 的 admin access token(Resource Owner Password)"""
|
||
url = f"{self._base}/realms/master/protocol/openid-connect/token"
|
||
resp = httpx.post(
|
||
url,
|
||
data={
|
||
"grant_type": "password",
|
||
"client_id": "admin-cli",
|
||
"username": self._admin_user,
|
||
"password": self._admin_pass,
|
||
},
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
resp.raise_for_status()
|
||
return resp.json()["access_token"]
|
||
|
||
def _headers(self) -> dict:
|
||
if not self._admin_token:
|
||
self._admin_token = self._get_admin_token()
|
||
return {"Authorization": f"Bearer {self._admin_token}"}
|
||
|
||
def _admin_url(self, path: str) -> str:
|
||
return f"{self._base}/admin/realms/{path}"
|
||
|
||
def realm_exists(self, realm: str) -> bool:
|
||
try:
|
||
resp = httpx.get(
|
||
self._admin_url(realm),
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
return resp.status_code == 200
|
||
except Exception:
|
||
return False
|
||
|
||
def create_realm(self, realm: str, display_name: str) -> bool:
|
||
payload = {
|
||
"realm": realm,
|
||
"displayName": display_name,
|
||
"enabled": True,
|
||
"loginTheme": "keycloak",
|
||
}
|
||
resp = httpx.post(
|
||
f"{self._base}/admin/realms",
|
||
json=payload,
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
return resp.status_code in (201, 204)
|
||
|
||
def update_realm_token_settings(self, realm: str, access_code_lifespan: int = 600) -> bool:
|
||
"""設定 realm 的授權碼有效期(秒),預設 10 分鐘"""
|
||
resp = httpx.put(
|
||
self._admin_url(realm),
|
||
json={
|
||
"accessCodeLifespan": access_code_lifespan,
|
||
"actionTokenGeneratedByUserLifespan": access_code_lifespan,
|
||
},
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
return resp.status_code in (200, 204)
|
||
|
||
def send_welcome_email(self, realm: str, user_id: str) -> bool:
|
||
"""寄送歡迎信(含設定密碼連結)給新使用者"""
|
||
try:
|
||
resp = httpx.put(
|
||
self._admin_url(f"{realm}/users/{user_id}/execute-actions-email"),
|
||
json=["UPDATE_PASSWORD"],
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
return resp.status_code in (200, 204)
|
||
except Exception as e:
|
||
logger.error(f"KC send_welcome_email({realm}/{user_id}) failed: {e}")
|
||
return False
|
||
|
||
def get_user_uuid(self, realm: str, username: str) -> Optional[str]:
|
||
resp = httpx.get(
|
||
self._admin_url(f"{realm}/users"),
|
||
params={"username": username, "exact": "true"},
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
resp.raise_for_status()
|
||
users = resp.json()
|
||
return users[0]["id"] if users else None
|
||
|
||
def create_user(
|
||
self,
|
||
realm: str,
|
||
username: str,
|
||
email: str,
|
||
password: Optional[str],
|
||
) -> Optional[str]:
|
||
payload = {
|
||
"username": username,
|
||
"email": email,
|
||
"enabled": True,
|
||
"emailVerified": True,
|
||
}
|
||
if password:
|
||
payload["credentials"] = [
|
||
{"type": "password", "value": password, "temporary": True}
|
||
]
|
||
resp = httpx.post(
|
||
self._admin_url(f"{realm}/users"),
|
||
json=payload,
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
if resp.status_code == 201:
|
||
location = resp.headers.get("Location", "")
|
||
return location.rstrip("/").split("/")[-1]
|
||
return None
|
||
|
||
def create_public_client(self, realm: str, client_id: str) -> str:
|
||
"""建立 Public Client。回傳 'exists' / 'created' / 'failed'"""
|
||
resp = httpx.get(
|
||
self._admin_url(f"{realm}/clients"),
|
||
params={"clientId": client_id},
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
resp.raise_for_status()
|
||
if resp.json():
|
||
return "exists"
|
||
payload = {
|
||
"clientId": client_id,
|
||
"name": client_id,
|
||
"enabled": True,
|
||
"publicClient": True,
|
||
"standardFlowEnabled": True,
|
||
"directAccessGrantsEnabled": False,
|
||
"redirectUris": ["*"],
|
||
"webOrigins": ["*"],
|
||
}
|
||
resp = httpx.post(
|
||
self._admin_url(f"{realm}/clients"),
|
||
json=payload,
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
return "created" if resp.status_code in (201, 204) else "failed"
|
||
|
||
def create_confidential_client(self, realm: str, client_id: str, redirect_uris: list[str]) -> str:
|
||
"""建立 Confidential Client(用於 NC OIDC)。回傳 'exists' / 'created' / 'failed'"""
|
||
resp = httpx.get(
|
||
self._admin_url(f"{realm}/clients"),
|
||
params={"clientId": client_id},
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
resp.raise_for_status()
|
||
if resp.json():
|
||
return "exists"
|
||
origin = redirect_uris[0].rstrip("/*") if redirect_uris else "*"
|
||
payload = {
|
||
"clientId": client_id,
|
||
"name": client_id,
|
||
"enabled": True,
|
||
"publicClient": False,
|
||
"standardFlowEnabled": True,
|
||
"directAccessGrantsEnabled": False,
|
||
"redirectUris": redirect_uris,
|
||
"webOrigins": [origin],
|
||
"protocol": "openid-connect",
|
||
}
|
||
resp = httpx.post(
|
||
self._admin_url(f"{realm}/clients"),
|
||
json=payload,
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
return "created" if resp.status_code in (201, 204) else "failed"
|
||
|
||
def get_client_secret(self, realm: str, client_id: str) -> Optional[str]:
|
||
"""取得 Confidential Client 的 Secret"""
|
||
resp = httpx.get(
|
||
self._admin_url(f"{realm}/clients"),
|
||
params={"clientId": client_id},
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
resp.raise_for_status()
|
||
clients = resp.json()
|
||
if not clients:
|
||
return None
|
||
client_uuid = clients[0]["id"]
|
||
resp = httpx.get(
|
||
self._admin_url(f"{realm}/clients/{client_uuid}/client-secret"),
|
||
headers=self._headers(),
|
||
timeout=TIMEOUT,
|
||
verify=False,
|
||
)
|
||
if resp.status_code == 200:
|
||
return resp.json().get("value")
|
||
return None
|
||
|
||
|
||
def get_keycloak_client() -> KeycloakClient:
|
||
"""Factory: reads credentials from system settings DB."""
|
||
from app.core.database import SessionLocal
|
||
from app.models.settings import SystemSettings
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
s = db.query(SystemSettings).first()
|
||
if s:
|
||
return KeycloakClient(s.keycloak_url, s.keycloak_admin_user, s.keycloak_admin_pass)
|
||
finally:
|
||
db.close()
|
||
# Fallback to defaults
|
||
return KeycloakClient("https://auth.lab.taipei", "admin", "")
|