Major Features: - ✅ Multi-tenant architecture (tenant isolation) - ✅ Employee CRUD with lifecycle management (onboarding/offboarding) - ✅ Department tree structure with email domain management - ✅ Company info management (single-record editing) - ✅ System functions CRUD (permission management) - ✅ Email account management (multi-account per employee) - ✅ Keycloak SSO integration (auth.lab.taipei) - ✅ Redis session storage (10.1.0.254:6379) - Solves Cookie 4KB limitation - Cross-system session sharing - Sliding expiration (8 hours) - Automatic token refresh Technical Stack: Backend: - FastAPI + SQLAlchemy - PostgreSQL 16 (10.1.0.20:5433) - Keycloak Admin API integration - Docker Mailserver integration (SSH) - Alembic migrations Frontend: - Next.js 14 (App Router) - NextAuth 4 with Keycloak Provider - Redis session storage (ioredis) - Tailwind CSS Infrastructure: - Redis 7 (10.1.0.254:6379) - Session + Cache - Keycloak 26.1.0 (auth.lab.taipei) - Docker Mailserver (10.1.0.254) Architecture Highlights: - Session管理由 Keycloak + Redis 統一控制 - 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session - Token 自動刷新,異質服務整合 - 未來可無縫遷移到雲端 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
604 lines
20 KiB
Python
604 lines
20 KiB
Python
"""
|
||
租戶管理 API
|
||
用於管理多租戶資訊(僅系統管理公司可存取)
|
||
"""
|
||
from typing import List
|
||
from datetime import datetime
|
||
import secrets
|
||
import string
|
||
from fastapi import APIRouter, Depends, HTTPException, status
|
||
from sqlalchemy.orm import Session
|
||
from sqlalchemy.exc import IntegrityError
|
||
|
||
from app.api.deps import get_db, require_auth, get_current_tenant
|
||
from app.models import Tenant, Employee
|
||
from app.schemas.tenant import (
|
||
TenantCreateRequest,
|
||
TenantCreateResponse,
|
||
TenantUpdateRequest,
|
||
TenantUpdateResponse,
|
||
TenantResponse,
|
||
InitializationRequest,
|
||
InitializationResponse
|
||
)
|
||
from app.services.keycloak_admin_client import get_keycloak_admin_client
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.get("/current", summary="取得當前租戶資訊")
|
||
def get_current_tenant_info(
|
||
tenant: Tenant = Depends(get_current_tenant),
|
||
):
|
||
"""
|
||
取得當前租戶資訊
|
||
|
||
根據 JWT Token 的 Realm 自動識別租戶
|
||
"""
|
||
return {
|
||
"id": tenant.id,
|
||
"code": tenant.code,
|
||
"name": tenant.name,
|
||
"name_eng": tenant.name_eng,
|
||
"tax_id": tenant.tax_id,
|
||
"prefix": tenant.prefix,
|
||
"tel": tenant.tel,
|
||
"add": tenant.add,
|
||
"url": tenant.url,
|
||
"keycloak_realm": tenant.keycloak_realm,
|
||
"is_sysmana": tenant.is_sysmana,
|
||
"plan_id": tenant.plan_id,
|
||
"max_users": tenant.max_users,
|
||
"storage_quota_gb": tenant.storage_quota_gb,
|
||
"status": tenant.status,
|
||
"is_active": tenant.is_active,
|
||
"edit_by": tenant.edit_by,
|
||
"created_at": tenant.created_at.isoformat() if tenant.created_at else None,
|
||
"updated_at": tenant.updated_at.isoformat() if tenant.updated_at else None,
|
||
}
|
||
|
||
|
||
@router.patch("/current", summary="更新當前租戶資訊")
|
||
def update_current_tenant_info(
|
||
request: TenantUpdateRequest,
|
||
db: Session = Depends(get_db),
|
||
tenant: Tenant = Depends(get_current_tenant),
|
||
):
|
||
"""
|
||
更新當前租戶的基本資料
|
||
|
||
僅允許更新以下欄位:
|
||
- name: 公司名稱
|
||
- name_eng: 公司英文名稱
|
||
- tax_id: 統一編號
|
||
- tel: 公司電話
|
||
- add: 公司地址
|
||
- url: 公司網站
|
||
|
||
注意: 租戶代碼 (code)、前綴 (prefix)、方案等核心欄位不可修改
|
||
"""
|
||
try:
|
||
# 更新欄位
|
||
if request.name is not None:
|
||
tenant.name = request.name
|
||
if request.name_eng is not None:
|
||
tenant.name_eng = request.name_eng
|
||
if request.tax_id is not None:
|
||
tenant.tax_id = request.tax_id
|
||
if request.tel is not None:
|
||
tenant.tel = request.tel
|
||
if request.add is not None:
|
||
tenant.add = request.add
|
||
if request.url is not None:
|
||
tenant.url = request.url
|
||
|
||
# 更新編輯者
|
||
tenant.edit_by = "current_user" # TODO: 從 JWT Token 取得實際用戶名稱
|
||
|
||
db.commit()
|
||
db.refresh(tenant)
|
||
|
||
return {
|
||
"message": "公司資料已成功更新",
|
||
"tenant": {
|
||
"id": tenant.id,
|
||
"code": tenant.code,
|
||
"name": tenant.name,
|
||
"name_eng": tenant.name_eng,
|
||
"tax_id": tenant.tax_id,
|
||
"tel": tenant.tel,
|
||
"add": tenant.add,
|
||
"url": tenant.url,
|
||
}
|
||
}
|
||
|
||
except IntegrityError as e:
|
||
db.rollback()
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail=f"更新失敗: {str(e)}"
|
||
)
|
||
except Exception as e:
|
||
db.rollback()
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"更新失敗: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.get("/", summary="列出所有租戶(僅系統管理公司)")
|
||
def list_tenants(
|
||
db: Session = Depends(get_db),
|
||
current_tenant: Tenant = Depends(get_current_tenant),
|
||
):
|
||
"""
|
||
列出所有租戶
|
||
|
||
權限要求: 必須為系統管理公司 (is_sysmana=True)
|
||
"""
|
||
# 權限檢查
|
||
if not current_tenant.is_sysmana:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="Only system management company can access this resource"
|
||
)
|
||
|
||
tenants = db.query(Tenant).filter(Tenant.is_active == True).all()
|
||
|
||
return {
|
||
"total": len(tenants),
|
||
"items": [
|
||
{
|
||
"id": t.id,
|
||
"code": t.code,
|
||
"name": t.name,
|
||
"name_eng": t.name_eng,
|
||
"keycloak_realm": t.keycloak_realm,
|
||
"tax_id": t.tax_id,
|
||
"prefix": t.prefix,
|
||
"domain_set": t.domain_set,
|
||
"tel": t.tel,
|
||
"add": t.add,
|
||
"url": t.url,
|
||
"plan_id": t.plan_id,
|
||
"max_users": t.max_users,
|
||
"storage_quota_gb": t.storage_quota_gb,
|
||
"status": t.status,
|
||
"is_sysmana": t.is_sysmana,
|
||
"is_active": t.is_active,
|
||
"is_initialized": t.is_initialized,
|
||
"initialized_at": t.initialized_at.isoformat() if t.initialized_at else None,
|
||
"initialized_by": t.initialized_by,
|
||
"edit_by": t.edit_by,
|
||
"created_at": t.created_at.isoformat() if t.created_at else None,
|
||
"updated_at": t.updated_at.isoformat() if t.updated_at else None,
|
||
}
|
||
for t in tenants
|
||
],
|
||
}
|
||
|
||
|
||
@router.get("/{tenant_id}", summary="取得指定租戶資訊(僅系統管理公司)")
|
||
def get_tenant(
|
||
tenant_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_tenant: Tenant = Depends(get_current_tenant),
|
||
):
|
||
"""
|
||
取得指定租戶詳細資訊
|
||
|
||
權限要求: 必須為系統管理公司 (is_sysmana=True)
|
||
"""
|
||
# 權限檢查
|
||
if not current_tenant.is_sysmana:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="Only system management company can access this resource"
|
||
)
|
||
|
||
tenant = db.query(Tenant).filter(Tenant.id == tenant_id).first()
|
||
|
||
if not tenant:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail=f"Tenant {tenant_id} not found"
|
||
)
|
||
|
||
return {
|
||
"id": tenant.id,
|
||
"code": tenant.code,
|
||
"name": tenant.name,
|
||
"name_eng": tenant.name_eng,
|
||
"tax_id": tenant.tax_id,
|
||
"prefix": tenant.prefix,
|
||
"domain_set": tenant.domains,
|
||
"tel": tenant.tel,
|
||
"add": tenant.add,
|
||
"url": tenant.url,
|
||
"keycloak_realm": tenant.keycloak_realm,
|
||
"is_sysmana": tenant.is_sysmana,
|
||
"plan_id": tenant.plan_id,
|
||
"max_users": tenant.max_users,
|
||
"storage_quota_gb": tenant.storage_quota_gb,
|
||
"status": tenant.status,
|
||
"is_active": tenant.is_active,
|
||
"is_initialized": tenant.is_initialized,
|
||
"initialized_at": tenant.initialized_at,
|
||
"initialized_by": tenant.initialized_by,
|
||
"created_at": tenant.created_at,
|
||
"updated_at": tenant.updated_at,
|
||
}
|
||
|
||
|
||
def _generate_temp_password(length: int = 12) -> str:
|
||
"""產生臨時密碼"""
|
||
chars = string.ascii_letters + string.digits + "!@#$%"
|
||
return ''.join(secrets.choice(chars) for _ in range(length))
|
||
|
||
|
||
@router.post("/", response_model=TenantCreateResponse, summary="建立新租戶(僅 Superuser)")
|
||
def create_tenant(
|
||
request: TenantCreateRequest,
|
||
db: Session = Depends(get_db),
|
||
current_tenant: Tenant = Depends(get_current_tenant),
|
||
):
|
||
"""
|
||
建立新租戶(含 Keycloak Realm + Tenant Admin 帳號)
|
||
|
||
權限要求: 必須為系統管理公司 (is_sysmana=True)
|
||
|
||
流程:
|
||
1. 驗證租戶代碼唯一性
|
||
2. 建立 Keycloak Realm
|
||
3. 在 Keycloak Realm 中建立 Tenant Admin 使用者
|
||
4. 建立租戶記錄(tenants 表)
|
||
5. 建立 Employee 記錄(employees 表)
|
||
6. 返回租戶資訊與臨時密碼
|
||
|
||
Returns:
|
||
租戶資訊 + Tenant Admin 登入資訊
|
||
"""
|
||
# ========== 權限檢查 ==========
|
||
if not current_tenant.is_sysmana:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="Only system management company can create tenants"
|
||
)
|
||
|
||
# ========== Step 1: 驗證租戶代碼唯一性 ==========
|
||
existing_tenant = db.query(Tenant).filter(Tenant.code == request.code).first()
|
||
if existing_tenant:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail=f"Tenant code '{request.code}' already exists"
|
||
)
|
||
|
||
# 產生 Keycloak Realm 名稱 (格式: porscheworld-pwd)
|
||
realm_name = f"porscheworld-{request.code.lower()}"
|
||
|
||
# ========== Step 2: 建立 Keycloak Realm ==========
|
||
keycloak_client = get_keycloak_admin_client()
|
||
realm_config = keycloak_client.create_realm(
|
||
realm_name=realm_name,
|
||
display_name=request.name
|
||
)
|
||
|
||
if not realm_config:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail="Failed to create Keycloak Realm"
|
||
)
|
||
|
||
try:
|
||
# ========== Step 3: 建立 Keycloak Realm Role (tenant-admin) ==========
|
||
keycloak_client.create_realm_role(
|
||
realm_name=realm_name,
|
||
role_name="tenant-admin",
|
||
description="租戶管理員 - 可管理公司內所有資源"
|
||
)
|
||
|
||
# ========== Step 4: 建立租戶記錄 ==========
|
||
new_tenant = Tenant(
|
||
code=request.code,
|
||
name=request.name,
|
||
name_eng=request.name_eng,
|
||
tax_id=request.tax_id,
|
||
prefix=request.prefix,
|
||
tel=request.tel,
|
||
add=request.add,
|
||
url=request.url,
|
||
keycloak_realm=realm_name,
|
||
plan_id=request.plan_id,
|
||
max_users=request.max_users,
|
||
storage_quota_gb=request.storage_quota_gb,
|
||
status="trial",
|
||
is_sysmana=False,
|
||
is_active=True,
|
||
is_initialized=False, # 尚未初始化
|
||
edit_by="system"
|
||
)
|
||
|
||
db.add(new_tenant)
|
||
db.flush() # 取得 tenant.id
|
||
|
||
# ========== Step 5: 在 Keycloak 建立 Tenant Admin 使用者 ==========
|
||
# 使用提供的臨時密碼或產生新的
|
||
temp_password = request.admin_temp_password
|
||
|
||
# 分割姓名 (假設格式: "陳保時" → firstName="保時", lastName="陳")
|
||
name_parts = request.admin_name.split()
|
||
if len(name_parts) >= 2:
|
||
first_name = " ".join(name_parts[1:])
|
||
last_name = name_parts[0]
|
||
else:
|
||
first_name = request.admin_name
|
||
last_name = ""
|
||
|
||
keycloak_user_id = keycloak_client.create_user(
|
||
username=request.admin_username,
|
||
email=request.admin_email,
|
||
first_name=first_name,
|
||
last_name=last_name,
|
||
enabled=True,
|
||
email_verified=False
|
||
)
|
||
|
||
if not keycloak_user_id:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail="Failed to create Keycloak user"
|
||
)
|
||
|
||
# 設定臨時密碼(首次登入必須變更)
|
||
keycloak_client.reset_password(
|
||
user_id=keycloak_user_id,
|
||
password=temp_password,
|
||
temporary=True # 臨時密碼
|
||
)
|
||
|
||
# 將 tenant-admin 角色分配給使用者
|
||
role_assigned = keycloak_client.assign_realm_role_to_user(
|
||
realm_name=realm_name,
|
||
user_id=keycloak_user_id,
|
||
role_name="tenant-admin"
|
||
)
|
||
|
||
if not role_assigned:
|
||
print(f"⚠️ Warning: Failed to assign tenant-admin role to user {keycloak_user_id}")
|
||
# 不中斷流程,但記錄警告
|
||
|
||
# ========== Step 6: 建立 Employee 記錄 ==========
|
||
admin_employee = Employee(
|
||
tenant_id=new_tenant.id,
|
||
seq_no=1, # 第一號員工
|
||
tenant_emp_code=f"{request.prefix}0001",
|
||
name=request.admin_name,
|
||
name_eng=name_parts[0] if len(name_parts) >= 2 else request.admin_name,
|
||
keycloak_username=request.admin_username,
|
||
keycloak_user_id=keycloak_user_id,
|
||
storage_quota_gb=100, # Admin 預設配額
|
||
email_quota_mb=10240, # 10 GB
|
||
employment_status="active",
|
||
is_active=True,
|
||
edit_by="system"
|
||
)
|
||
|
||
db.add(admin_employee)
|
||
db.commit()
|
||
|
||
# ========== Step 7: 返回結果 ==========
|
||
return TenantCreateResponse(
|
||
message="Tenant created successfully",
|
||
tenant={
|
||
"id": new_tenant.id,
|
||
"code": new_tenant.code,
|
||
"name": new_tenant.name,
|
||
"keycloak_realm": realm_name,
|
||
"status": new_tenant.status,
|
||
},
|
||
admin_user={
|
||
"username": request.admin_username,
|
||
"email": request.admin_email,
|
||
"keycloak_user_id": keycloak_user_id,
|
||
},
|
||
keycloak_realm=realm_name,
|
||
temporary_password=temp_password
|
||
)
|
||
|
||
except IntegrityError as e:
|
||
db.rollback()
|
||
# 嘗試清理已建立的 Realm
|
||
keycloak_client.delete_realm(realm_name)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail=f"Database integrity error: {str(e)}"
|
||
)
|
||
except Exception as e:
|
||
db.rollback()
|
||
# 嘗試清理已建立的 Realm
|
||
keycloak_client.delete_realm(realm_name)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"Failed to create tenant: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.post("/{tenant_id}/initialize", response_model=InitializationResponse, summary="完成租戶初始化(僅 Tenant Admin)")
|
||
def initialize_tenant(
|
||
tenant_id: int,
|
||
request: InitializationRequest,
|
||
db: Session = Depends(get_db),
|
||
current_tenant: Tenant = Depends(get_current_tenant),
|
||
):
|
||
"""
|
||
完成租戶初始化流程
|
||
|
||
權限要求:
|
||
- 必須為該租戶的成員
|
||
- 必須擁有 tenant-admin 角色 (在 Keycloak 驗證)
|
||
- 租戶必須尚未初始化 (is_initialized = false)
|
||
|
||
流程:
|
||
1. 驗證權限與初始化狀態
|
||
2. 更新公司基本資料
|
||
3. 建立部門結構
|
||
4. 建立系統角色 (同步到 Keycloak)
|
||
5. 儲存預設配額與服務設定
|
||
6. 設定 is_initialized = true
|
||
7. 記錄審計日誌
|
||
|
||
Returns:
|
||
初始化結果摘要
|
||
"""
|
||
from app.models import Department, UserRole, AuditLog
|
||
|
||
# ========== Step 1: 權限檢查 ==========
|
||
# 驗證使用者屬於該租戶
|
||
if current_tenant.id != tenant_id:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="You can only initialize your own tenant"
|
||
)
|
||
|
||
# 取得租戶記錄
|
||
tenant = db.query(Tenant).filter(Tenant.id == tenant_id).first()
|
||
if not tenant:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail=f"Tenant {tenant_id} not found"
|
||
)
|
||
|
||
# 防止重複初始化
|
||
if tenant.is_initialized:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail="Tenant has already been initialized. Initialization wizard is locked."
|
||
)
|
||
|
||
# TODO: 驗證使用者擁有 tenant-admin 角色 (從 JWT Token 或 Keycloak API)
|
||
# 目前暫時跳過,後續實作 JWT Token 驗證
|
||
|
||
try:
|
||
# ========== Step 2: 更新公司基本資料 ==========
|
||
company_info = request.company_info
|
||
|
||
if "name" in company_info:
|
||
tenant.name = company_info["name"]
|
||
if "name_eng" in company_info:
|
||
tenant.name_eng = company_info["name_eng"]
|
||
if "tax_id" in company_info:
|
||
tenant.tax_id = company_info["tax_id"]
|
||
if "tel" in company_info:
|
||
tenant.tel = company_info["tel"]
|
||
if "add" in company_info:
|
||
tenant.add = company_info["add"]
|
||
if "url" in company_info:
|
||
tenant.url = company_info["url"]
|
||
|
||
# ========== Step 3: 建立部門結構 ==========
|
||
departments_created = []
|
||
|
||
for dept_data in request.departments:
|
||
new_dept = Department(
|
||
tenant_id=tenant_id,
|
||
code=dept_data.get("code", dept_data["name"][:10]),
|
||
name=dept_data["name"],
|
||
name_eng=dept_data.get("name_eng"),
|
||
parent_id=dept_data.get("parent_id"),
|
||
is_active=True,
|
||
edit_by="system"
|
||
)
|
||
db.add(new_dept)
|
||
departments_created.append(dept_data["name"])
|
||
|
||
db.flush() # 取得部門 ID
|
||
|
||
# ========== Step 4: 建立系統角色 ==========
|
||
keycloak_client = get_keycloak_admin_client()
|
||
roles_created = []
|
||
|
||
for role_data in request.roles:
|
||
# 在資料庫建立角色記錄
|
||
new_role = UserRole(
|
||
tenant_id=tenant_id,
|
||
role_code=role_data["code"],
|
||
role_name=role_data["name"],
|
||
description=role_data.get("description", ""),
|
||
is_active=True,
|
||
edit_by="system"
|
||
)
|
||
db.add(new_role)
|
||
|
||
# 在 Keycloak Realm 建立對應角色
|
||
role_created = keycloak_client.create_realm_role(
|
||
realm_name=tenant.keycloak_realm,
|
||
role_name=role_data["code"],
|
||
description=role_data.get("description", role_data["name"])
|
||
)
|
||
|
||
if role_created:
|
||
roles_created.append(role_data["name"])
|
||
else:
|
||
print(f"⚠️ Warning: Failed to create role {role_data['code']} in Keycloak")
|
||
|
||
# ========== Step 5: 儲存預設配額與服務設定 ==========
|
||
# TODO: 實作預設配額儲存邏輯 (需要設計 tenant_settings 表)
|
||
# 目前暫時儲存在 tenant 的 JSONB 欄位或獨立表
|
||
|
||
default_settings = request.default_settings
|
||
# 這裡可以儲存到 tenant metadata 或獨立的 settings 表
|
||
|
||
# ========== Step 6: 設定初始化完成 ==========
|
||
tenant.is_initialized = True
|
||
tenant.initialized_at = datetime.utcnow()
|
||
# TODO: 從 JWT Token 取得 current_user.username
|
||
tenant.initialized_by = "admin" # 暫時硬編碼
|
||
|
||
# ========== Step 7: 記錄審計日誌 ==========
|
||
audit_log = AuditLog(
|
||
tenant_id=tenant_id,
|
||
user_id=None, # TODO: 從 current_user 取得
|
||
action="tenant.initialized",
|
||
resource_type="tenant",
|
||
resource_id=str(tenant_id),
|
||
details={
|
||
"departments_created": len(departments_created),
|
||
"roles_created": len(roles_created),
|
||
"department_names": departments_created,
|
||
"role_names": roles_created,
|
||
"default_settings": default_settings,
|
||
},
|
||
ip_address=None, # TODO: 從 request 取得
|
||
user_agent=None,
|
||
)
|
||
db.add(audit_log)
|
||
|
||
# 提交所有變更
|
||
db.commit()
|
||
|
||
# ========== Step 8: 返回結果 ==========
|
||
return InitializationResponse(
|
||
message="Tenant initialization completed successfully",
|
||
summary={
|
||
"tenant_id": tenant_id,
|
||
"tenant_name": tenant.name,
|
||
"departments_created": len(departments_created),
|
||
"roles_created": len(roles_created),
|
||
"initialized_at": tenant.initialized_at.isoformat(),
|
||
"initialized_by": tenant.initialized_by,
|
||
}
|
||
)
|
||
|
||
except IntegrityError as e:
|
||
db.rollback()
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail=f"Database integrity error: {str(e)}"
|
||
)
|
||
except Exception as e:
|
||
db.rollback()
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"Initialization failed: {str(e)}"
|
||
)
|