feat(vmis): 租戶自動開通完整流程 + Admin Portal SSO + NC 行事曆訂閱

Backend:
- schedule_tenant: NC 新容器自動 pgsql 安裝 (_nc_db_check 全新容器處理)
- schedule_tenant: NC 初始化加入 Redis + APCu memcache 設定 (修正 OIDC invalid_state)
- schedule_tenant: 新租戶 KC realm 自動設定 accessCodeLifespan=600s (修正 authentication_expired)
- schedule_account: NC Mail 帳號自動設定 (nc_mail_result/nc_mail_done_at)
- schedule_account: NC 台灣國定假日行事曆自動訂閱 (CalDAV MKCALENDAR)
- nextcloud_client: 新增 subscribe_calendar() CalDAV 訂閱方法
- settings: 新增系統設定 API (site_title/version/timezone/SSO/Keycloak)
- models/result: 新增 nc_mail_result, nc_mail_done_at 欄位
- alembic: 遷移 002(system_settings) 003(keycloak_admin) 004(nc_mail_result)

Frontend (Admin Portal):
- 新增完整管理後台 (index/tenants/accounts/servers/schedules/logs/settings/system-status)
- api.js: Keycloak JS Adapter SSO 整合 (PKCE/S256, fallback KC JS 來源, 自動 token 更新)
- index.html: Promise.allSettled 取代 Promise.all,防止單一 API 失敗影響整頁
- 所有頁面加入 try/catch + toast 錯誤處理
- 新增品牌 LOGO 與 favicon

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
VMIS Developer
2026-03-15 15:31:37 +08:00
parent 42d1420f9c
commit 62baadb06f
53 changed files with 5638 additions and 195 deletions

View File

@@ -1,34 +1,36 @@
"""
KeycloakClient — 直接呼叫 Keycloak REST API不使用 python-keycloak 套件。
管理租戶 realm 及帳號的建立/查詢
使用 master realm admin 帳密取得管理 token管理租戶 realm 及帳號。
"""
import logging
from typing import Optional
import httpx
from app.core.config import settings
logger = logging.getLogger(__name__)
TIMEOUT = 10.0
class KeycloakClient:
def __init__(self):
self._base = settings.KEYCLOAK_URL.rstrip("/")
def __init__(self, base_url: str, admin_user: str, admin_pass: str):
self._base = base_url.rstrip("/")
self._admin_user = admin_user
self._admin_pass = admin_pass
self._admin_token: Optional[str] = None
def _get_admin_token(self) -> str:
"""取得 vmis-admin realm 的 admin access token"""
url = f"{self._base}/realms/{settings.KEYCLOAK_ADMIN_REALM}/protocol/openid-connect/token"
"""取得 master realm 的 admin access tokenResource Owner Password"""
url = f"{self._base}/realms/master/protocol/openid-connect/token"
resp = httpx.post(
url,
data={
"grant_type": "client_credentials",
"client_id": settings.KEYCLOAK_ADMIN_CLIENT_ID,
"client_secret": settings.KEYCLOAK_ADMIN_CLIENT_SECRET,
"grant_type": "password",
"client_id": "admin-cli",
"username": self._admin_user,
"password": self._admin_pass,
},
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
return resp.json()["access_token"]
@@ -43,7 +45,12 @@ class KeycloakClient:
def realm_exists(self, realm: str) -> bool:
try:
resp = httpx.get(self._admin_url(realm), headers=self._headers(), timeout=TIMEOUT)
resp = httpx.get(
self._admin_url(realm),
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return resp.status_code == 200
except Exception:
return False
@@ -60,21 +67,58 @@ class KeycloakClient:
json=payload,
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return resp.status_code in (201, 204)
def update_realm_token_settings(self, realm: str, access_code_lifespan: int = 600) -> bool:
"""設定 realm 的授權碼有效期(秒),預設 10 分鐘"""
resp = httpx.put(
self._admin_url(realm),
json={
"accessCodeLifespan": access_code_lifespan,
"actionTokenGeneratedByUserLifespan": access_code_lifespan,
},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return resp.status_code in (200, 204)
def send_welcome_email(self, realm: str, user_id: str) -> bool:
"""寄送歡迎信(含設定密碼連結)給新使用者"""
try:
resp = httpx.put(
self._admin_url(f"{realm}/users/{user_id}/execute-actions-email"),
json=["UPDATE_PASSWORD"],
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return resp.status_code in (200, 204)
except Exception as e:
logger.error(f"KC send_welcome_email({realm}/{user_id}) failed: {e}")
return False
def get_user_uuid(self, realm: str, username: str) -> Optional[str]:
resp = httpx.get(
self._admin_url(f"{realm}/users"),
params={"username": username, "exact": "true"},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
users = resp.json()
return users[0]["id"] if users else None
def create_user(self, realm: str, username: str, email: str, password: Optional[str]) -> Optional[str]:
def create_user(
self,
realm: str,
username: str,
email: str,
password: Optional[str],
) -> Optional[str]:
payload = {
"username": username,
"email": email,
@@ -82,14 +126,121 @@ class KeycloakClient:
"emailVerified": True,
}
if password:
payload["credentials"] = [{"type": "password", "value": password, "temporary": True}]
payload["credentials"] = [
{"type": "password", "value": password, "temporary": True}
]
resp = httpx.post(
self._admin_url(f"{realm}/users"),
json=payload,
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
if resp.status_code == 201:
location = resp.headers.get("Location", "")
return location.rstrip("/").split("/")[-1]
return None
def create_public_client(self, realm: str, client_id: str) -> str:
"""建立 Public Client。回傳 'exists' / 'created' / 'failed'"""
resp = httpx.get(
self._admin_url(f"{realm}/clients"),
params={"clientId": client_id},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
if resp.json():
return "exists"
payload = {
"clientId": client_id,
"name": client_id,
"enabled": True,
"publicClient": True,
"standardFlowEnabled": True,
"directAccessGrantsEnabled": False,
"redirectUris": ["*"],
"webOrigins": ["*"],
}
resp = httpx.post(
self._admin_url(f"{realm}/clients"),
json=payload,
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return "created" if resp.status_code in (201, 204) else "failed"
def create_confidential_client(self, realm: str, client_id: str, redirect_uris: list[str]) -> str:
"""建立 Confidential Client用於 NC OIDC。回傳 'exists' / 'created' / 'failed'"""
resp = httpx.get(
self._admin_url(f"{realm}/clients"),
params={"clientId": client_id},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
if resp.json():
return "exists"
origin = redirect_uris[0].rstrip("/*") if redirect_uris else "*"
payload = {
"clientId": client_id,
"name": client_id,
"enabled": True,
"publicClient": False,
"standardFlowEnabled": True,
"directAccessGrantsEnabled": False,
"redirectUris": redirect_uris,
"webOrigins": [origin],
"protocol": "openid-connect",
}
resp = httpx.post(
self._admin_url(f"{realm}/clients"),
json=payload,
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
return "created" if resp.status_code in (201, 204) else "failed"
def get_client_secret(self, realm: str, client_id: str) -> Optional[str]:
"""取得 Confidential Client 的 Secret"""
resp = httpx.get(
self._admin_url(f"{realm}/clients"),
params={"clientId": client_id},
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
resp.raise_for_status()
clients = resp.json()
if not clients:
return None
client_uuid = clients[0]["id"]
resp = httpx.get(
self._admin_url(f"{realm}/clients/{client_uuid}/client-secret"),
headers=self._headers(),
timeout=TIMEOUT,
verify=False,
)
if resp.status_code == 200:
return resp.json().get("value")
return None
def get_keycloak_client() -> KeycloakClient:
"""Factory: reads credentials from system settings DB."""
from app.core.database import SessionLocal
from app.models.settings import SystemSettings
db = SessionLocal()
try:
s = db.query(SystemSettings).first()
if s:
return KeycloakClient(s.keycloak_url, s.keycloak_admin_user, s.keycloak_admin_pass)
finally:
db.close()
# Fallback to defaults
return KeycloakClient("https://auth.lab.taipei", "admin", "")