from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from app.core.database import get_db from app.core.utils import configure_timezone from app.models.settings import SystemSettings from app.models.tenant import Tenant from app.models.account import Account from app.schemas.settings import SettingsUpdate, SettingsResponse from app.services.keycloak_client import KeycloakClient router = APIRouter(prefix="/settings", tags=["settings"]) def _get_or_create(db: Session) -> SystemSettings: s = db.query(SystemSettings).first() if not s: s = SystemSettings(id=1) db.add(s) db.commit() db.refresh(s) return s @router.get("", response_model=SettingsResponse) def get_settings(db: Session = Depends(get_db)): return _get_or_create(db) @router.put("", response_model=SettingsResponse) def update_settings(payload: SettingsUpdate, db: Session = Depends(get_db)): s = _get_or_create(db) # 啟用 SSO 前置條件檢查 if payload.sso_enabled is True: manager = ( db.query(Tenant) .filter(Tenant.is_manager == True, Tenant.is_active == True) .first() ) if not manager: raise HTTPException( status_code=422, detail="啟用 SSO 前必須先建立 is_manager=true 的管理租戶" ) has_account = ( db.query(Account) .filter(Account.tenant_id == manager.id, Account.is_active == True) .first() ) if not has_account: raise HTTPException( status_code=422, detail="管理租戶必須至少有一個有效帳號才能啟用 SSO" ) for field, value in payload.model_dump(exclude_none=True).items(): setattr(s, field, value) db.commit() db.refresh(s) configure_timezone(s.timezone) return s @router.post("/test-keycloak") def test_keycloak(db: Session = Depends(get_db)): """測試 Keycloak master realm 管理帳密是否正確""" s = _get_or_create(db) if not s.keycloak_url or not s.keycloak_admin_user or not s.keycloak_admin_pass: raise HTTPException(status_code=400, detail="請先設定 Keycloak URL 及管理帳密") kc = KeycloakClient(s.keycloak_url, s.keycloak_admin_user, s.keycloak_admin_pass) try: token = kc._get_admin_token() if token: return {"ok": True, "message": f"連線成功:{s.keycloak_url}"} except Exception as e: raise HTTPException(status_code=400, detail=f"Keycloak 連線失敗:{str(e)}") @router.post("/init-sso-realm") def init_sso_realm(db: Session = Depends(get_db)): """ 建立 Admin Portal SSO 環境: 1. 以管理租戶的 keycloak_realm 為準(無管理租戶時 fallback 至 system settings) 2. 確認該 realm 存在(不存在則建立) 3. 在該 realm 建立 vmis-portal Public Client 4. 同步回寫 system_settings.keycloak_realm(前端 JS Adapter 使用) """ s = _get_or_create(db) if not s.keycloak_url or not s.keycloak_admin_user or not s.keycloak_admin_pass: raise HTTPException(status_code=400, detail="請先設定並儲存 Keycloak 連線資訊") # 以管理租戶的 keycloak_realm 為主要來源 manager = ( db.query(Tenant) .filter(Tenant.is_manager == True, Tenant.is_active == True) .first() ) if manager and manager.keycloak_realm: realm = manager.keycloak_realm else: realm = s.keycloak_realm or "vmis-admin" client_id = s.keycloak_client or "vmis-portal" kc = KeycloakClient(s.keycloak_url, s.keycloak_admin_user, s.keycloak_admin_pass) results = [] # 確認/建立 realm if kc.realm_exists(realm): results.append(f"✓ Realm '{realm}' 已存在") else: ok = kc.create_realm(realm, manager.name if manager else "VMIS Admin Portal") if ok: results.append(f"✓ Realm '{realm}' 建立成功") else: raise HTTPException(status_code=500, detail=f"Realm '{realm}' 建立失敗") # 建立 vmis-portal Public Client status = kc.create_public_client(realm, client_id) if status == "exists": results.append(f"✓ Client '{client_id}' 已存在") elif status == "created": results.append(f"✓ Client '{client_id}' 建立成功") else: results.append(f"✗ Client '{client_id}' 建立失敗") # 同步回寫 system_settings.keycloak_realm(前端 Keycloak JS Adapter 使用) if s.keycloak_realm != realm: s.keycloak_realm = realm db.commit() results.append(f"✓ 系統設定 keycloak_realm 同步為 '{realm}'") return {"ok": True, "realm": realm, "details": results}