Files
hr-portal/backend/app/services/installation_service.py
Porsche Chen 360533393f feat: HR Portal - Complete Multi-Tenant System with Redis Session Storage
Major Features:
-  Multi-tenant architecture (tenant isolation)
-  Employee CRUD with lifecycle management (onboarding/offboarding)
-  Department tree structure with email domain management
-  Company info management (single-record editing)
-  System functions CRUD (permission management)
-  Email account management (multi-account per employee)
-  Keycloak SSO integration (auth.lab.taipei)
-  Redis session storage (10.1.0.254:6379)
  - Solves Cookie 4KB limitation
  - Cross-system session sharing
  - Sliding expiration (8 hours)
  - Automatic token refresh

Technical Stack:
Backend:
- FastAPI + SQLAlchemy
- PostgreSQL 16 (10.1.0.20:5433)
- Keycloak Admin API integration
- Docker Mailserver integration (SSH)
- Alembic migrations

Frontend:
- Next.js 14 (App Router)
- NextAuth 4 with Keycloak Provider
- Redis session storage (ioredis)
- Tailwind CSS

Infrastructure:
- Redis 7 (10.1.0.254:6379) - Session + Cache
- Keycloak 26.1.0 (auth.lab.taipei)
- Docker Mailserver (10.1.0.254)

Architecture Highlights:
- Session管理由 Keycloak + Redis 統一控制
- 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session
- Token 自動刷新,異質服務整合
- 未來可無縫遷移到雲端

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-23 20:12:43 +08:00

790 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
初始化系統服務
負責初始化流程的業務邏輯
"""
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session
from app.models import (
InstallationSession,
InstallationTenantInfo,
InstallationDepartmentSetup,
TemporaryPassword,
InstallationAccessLog,
Tenant,
Department,
UserRole
)
from app.models.emp_resume import EmpResume
from app.models.emp_setting import EmpSetting
from app.utils.password_generator import generate_secure_password, hash_password
from app.services.keycloak_service import KeycloakService
class InstallationService:
"""初始化服務"""
def __init__(self, db: Session):
self.db = db
self.keycloak_service = KeycloakService()
# ==================== Phase 0: 建立安裝會話 ====================
def create_session(
self,
tenant_id: int,
environment: str,
executed_by: str,
session_name: Optional[str] = None
) -> InstallationSession:
"""
建立新的安裝會話
Args:
tenant_id: 租戶 ID
environment: 環境 (development/testing/production)
executed_by: 執行人
session_name: 會話名稱(可選)
Returns:
安裝會話物件
"""
session = InstallationSession(
tenant_id=tenant_id,
session_name=session_name or f"{environment} 環境初始化",
environment=environment,
status='in_progress',
executed_by=executed_by,
started_at=datetime.now()
)
self.db.add(session)
self.db.commit()
self.db.refresh(session)
# 記錄審計日誌
self._log_access(
session_id=session.id,
action='create_session',
action_by=executed_by,
action_method='api',
access_granted=True
)
return session
# ==================== Phase 2: 公司資訊設定 ====================
def save_tenant_info(
self,
session_id: int,
tenant_info_data: Dict[str, Any]
) -> InstallationTenantInfo:
"""
儲存租戶初始化資訊
Args:
session_id: 安裝會話 ID
tenant_info_data: 租戶資訊字典
Returns:
租戶初始化資訊物件
"""
session = self.db.query(InstallationSession).get(session_id)
if not session:
raise ValueError(f"找不到安裝會話 ID: {session_id}")
# 檢查是否已存在(優先用 session_id找不到則用 tenant_id
tenant_info = self.db.query(InstallationTenantInfo).filter_by(
session_id=session_id
).first()
# 如果 session_id 找不到,嘗試用 tenant_id 查詢(處理舊數據)
if not tenant_info:
tenant_info = self.db.query(InstallationTenantInfo).filter_by(
tenant_id=session.tenant_id
).first()
if tenant_info:
# 更新現有資料(同時更新 session_id 以保持一致性)
tenant_info.session_id = session_id
for key, value in tenant_info_data.items():
if hasattr(tenant_info, key):
setattr(tenant_info, key, value)
tenant_info.updated_at = datetime.now()
else:
# 建立新資料
tenant_info = InstallationTenantInfo(
tenant_id=session.tenant_id,
session_id=session_id,
**tenant_info_data
)
self.db.add(tenant_info)
self.db.commit()
self.db.refresh(tenant_info)
return tenant_info
def setup_admin_credentials(
self,
session_id: int,
admin_data: Dict[str, Any],
password_method: str = 'auto',
manual_password: Optional[str] = None
) -> tuple[InstallationTenantInfo, str]:
"""
設定系統管理員並產生初始密碼
Args:
session_id: 安裝會話 ID
admin_data: 管理員資訊
password_method: 密碼設定方式 (auto/manual)
manual_password: 手動設定的密碼(如果 method='manual'
Returns:
(租戶資訊物件, 明文密碼)
Raises:
ValueError: 如果密碼驗證失敗
"""
from app.utils.password_generator import validate_password_for_user
session = self.db.query(InstallationSession).get(session_id)
if not session:
raise ValueError(f"找不到安裝會話 ID: {session_id}")
# 產生或驗證密碼
if password_method == 'auto':
initial_password = generate_secure_password(16)
else:
if not manual_password:
raise ValueError("手動設定密碼時必須提供 manual_password")
# 驗證密碼強度
is_valid, errors = validate_password_for_user(
manual_password,
username=admin_data.get('admin_username'),
name=admin_data.get('admin_legal_name'),
email=admin_data.get('admin_email')
)
if not is_valid:
raise ValueError(f"密碼驗證失敗: {', '.join(errors)}")
initial_password = manual_password
# 加密密碼
password_hash = hash_password(initial_password)
# 儲存管理員資訊
tenant_info = self.save_tenant_info(session_id, admin_data)
# 建立臨時密碼記錄
temp_password = TemporaryPassword(
tenant_id=session.tenant_id,
username=admin_data.get('admin_english_name', 'admin'), # ✅ 使用 admin_english_name (SSO 帳號)
session_id=session_id,
password_hash=password_hash,
plain_password=initial_password, # 明文密碼(僅此階段保存)
password_method=password_method,
is_temporary=True,
must_change_on_login=True,
created_at=datetime.now(),
expires_at=datetime.now() + timedelta(days=7), # 7 天有效期
is_viewable=True,
viewable_until=datetime.now() + timedelta(hours=1) # 1 小時內可查看
)
self.db.add(temp_password)
self.db.commit()
return tenant_info, initial_password
# ==================== Phase 3: 組織架構設定 ====================
def setup_departments(
self,
session_id: int,
departments_data: List[Dict[str, Any]]
) -> List[InstallationDepartmentSetup]:
"""
設定部門架構
Args:
session_id: 安裝會話 ID
departments_data: 部門資訊列表
Returns:
部門設定物件列表
"""
session = self.db.query(InstallationSession).get(session_id)
if not session:
raise ValueError(f"找不到安裝會話 ID: {session_id}")
dept_setups = []
for dept_data in departments_data:
dept_setup = InstallationDepartmentSetup(
tenant_id=session.tenant_id,
session_id=session_id,
**dept_data
)
self.db.add(dept_setup)
dept_setups.append(dept_setup)
self.db.commit()
return dept_setups
# ==================== Phase 4: 執行初始化 ====================
def execute_initialization(
self,
session_id: int
) -> Dict[str, Any]:
"""
執行完整的初始化流程
Args:
session_id: 安裝會話 ID
Returns:
執行結果
Raises:
Exception: 如果任何步驟失敗
"""
session = self.db.query(InstallationSession).get(session_id)
if not session:
raise ValueError(f"找不到安裝會話 ID: {session_id}")
tenant_info = self.db.query(InstallationTenantInfo).filter_by(
session_id=session_id
).first()
if not tenant_info:
# 調試:查看資料庫中所有的 tenant_info 記錄
all_tenant_infos = self.db.query(InstallationTenantInfo).all()
tenant_info_list = [f"ID:{t.id}, SessionID:{t.session_id}, TenantID:{t.tenant_id}" for t in all_tenant_infos]
raise ValueError(
f"找不到租戶初始化資訊 (session_id={session_id})。"
f"資料庫中現有記錄: {tenant_info_list}"
)
results = {
'tenant_updated': False,
'departments_created': 0,
'admin_created': False,
'keycloak_user_created': False,
'mailbox_created': False,
'roles_assigned': False
}
try:
# Step 1: 建立或更新租戶基本資料
if session.tenant_id:
# 更新現有租戶
tenant = self.db.query(Tenant).get(session.tenant_id)
if not tenant:
raise ValueError(f"找不到租戶 ID: {session.tenant_id}")
else:
# 建立新租戶 (初始化流程)
# ⚠️ 租戶代碼和 Keycloak Realm 必須為小寫
tenant_code_lower = tenant_info.tenant_code.lower()
tenant = Tenant(
name=tenant_info.company_name,
name_eng=tenant_info.company_name_en,
code=tenant_code_lower,
keycloak_realm=tenant_code_lower, # Keycloak Realm = tenant_code (小寫)
prefix=tenant_info.tenant_prefix,
tax_id=tenant_info.tax_id,
tel=tenant_info.tel,
add=tenant_info.add,
domain_set=tenant_info.domain_set,
domain=tenant_info.domain,
is_sysmana=True, # 初始化建立的第一個租戶為系統管理公司
is_active=True
)
self.db.add(tenant)
self.db.flush() # 取得 tenant.id
# 更新 session 的 tenant_id
session.tenant_id = tenant.id
tenant_info.tenant_id = tenant.id
# 更新租戶資料 (如果是更新模式)
if session.tenant_id and tenant:
if tenant_info.company_name:
tenant.name = tenant_info.company_name
if tenant_info.company_name_en:
tenant.name_eng = tenant_info.company_name_en
if tenant_info.tenant_code:
tenant.code = tenant_info.tenant_code
if tenant_info.tenant_prefix:
tenant.prefix = tenant_info.tenant_prefix
if tenant_info.tax_id:
tenant.tax_id = tenant_info.tax_id
if tenant_info.tel:
tenant.tel = tenant_info.tel
if tenant_info.add:
tenant.add = tenant_info.add
if tenant_info.domain_set:
tenant.domain_set = tenant_info.domain_set
if tenant_info.domain:
tenant.domain = tenant_info.domain
tenant.is_initialized = True
tenant.initialized_at = datetime.now()
tenant.initialized_by = session.executed_by
self.db.commit()
results['tenant_updated'] = True
# Step 2: 建立「初始化部門」(每個租戶必備)
init_dept_exists = self.db.query(Department).filter_by(
tenant_id=session.tenant_id,
code='INIT'
).first()
init_dept = None
if not init_dept_exists:
# 決定初始化部門的 email_domain
# domain_set=1 (組織網域): 使用 tenant.domain
# domain_set=2 (部門網域): 使用 tenant_info.domain 作為初始化部門的網域
if tenant_info.domain_set == 1:
# 組織網域模式: 必須設定 domain
if not tenant_info.domain:
raise ValueError("組織網域模式必須設定網域")
init_dept_domain = tenant_info.domain
else:
# 部門網域模式: domain 是初始化部門的預設網域
if not tenant_info.domain:
raise ValueError("請設定初始化部門的預設網域")
init_dept_domain = tenant_info.domain
init_dept = Department(
tenant_id=session.tenant_id,
seq_no=1, # 第一個部門序號為 1
code='INIT',
name='初始化部門',
name_en='Initialization Department',
email_domain=init_dept_domain,
depth=0,
description='系統初始化專用部門,待組織架構建立完成後可刪除或保留',
is_active=True
)
self.db.add(init_dept)
self.db.commit()
self.db.refresh(init_dept)
results['departments_created'] += 1
# Step 2.1: 建立用戶自訂的其他部門 (如果有)
dept_setups = self.db.query(InstallationDepartmentSetup).filter_by(
session_id=session_id,
is_created=False
).all()
for dept_setup in dept_setups:
dept = Department(
tenant_id=session.tenant_id,
code=dept_setup.department_code,
name=dept_setup.department_name,
name_en=dept_setup.department_name_en,
email_domain=dept_setup.email_domain,
depth=dept_setup.depth,
is_active=True
)
self.db.add(dept)
dept_setup.is_created = True
results['departments_created'] += 1
self.db.commit()
# Step 3: 建立系統管理員員工 (歸屬於初始化部門)
# 取得初始化部門 ID
if not init_dept:
init_dept = self.db.query(Department).filter_by(
tenant_id=session.tenant_id,
code='INIT'
).first()
if not init_dept:
raise ValueError("找不到初始化部門,無法建立管理員")
# 決定 SSO 帳號名稱:使用英文名稱(第一個管理員不會有衝突)
sso_username = tenant_info.admin_english_name
if not sso_username:
raise ValueError("管理員英文名稱為必填項")
# 檢查是否已存在(使用 tenant_emp_settings 複合主鍵)
admin_exists = self.db.query(EmpSetting).filter_by(
tenant_id=session.tenant_id,
seq_no=1 # 第一個員工
).first()
if not admin_exists:
# Step 3-1: 建立人員基本資料 (EmpResume)
resume = EmpResume(
tenant_id=session.tenant_id,
seq_no=1, # 第一個員工序號為 1
legal_name=tenant_info.admin_legal_name,
english_name=tenant_info.admin_english_name,
id_number=f"INIT{session.tenant_id}001", # 初始化用臨時身分證號
mobile=tenant_info.admin_phone,
personal_email=tenant_info.admin_email,
is_active=True
)
self.db.add(resume)
self.db.commit()
self.db.refresh(resume)
results['resumes_created'] = 1
# Step 3-2: 建立員工任用設定 (EmpSetting) - 複合主鍵
emp_setting = EmpSetting(
tenant_id=session.tenant_id,
seq_no=1, # 第一個員工(或由觸發器自動生成)
tenant_resume_id=resume.id,
tenant_emp_code=f"{tenant_info.tenant_prefix}0001", # 或由觸發器自動生成
hire_at=datetime.now().date(),
employment_type='full_time',
employment_status='active',
primary_dept_id=init_dept.id,
storage_quota_gb=100, # 管理員預設 100GB
email_quota_mb=10240, # 管理員預設 10GB
tenant_keycloak_username=sso_username, # 優先使用英文名稱,有衝突時使用 admin_username
is_active=True
)
self.db.add(emp_setting)
self.db.commit()
self.db.refresh(emp_setting)
results['emp_settings_created'] = 1
# 決定郵件網域:使用初始化部門的網域
if not init_dept.email_domain:
raise ValueError("初始化部門未設定郵件網域,請檢查設定")
# 管理員郵件地址:使用 SSO 帳號名稱 + 網域
admin_email_address = f"{sso_username}@{init_dept.email_domain}"
# Step 4: 建立 Keycloak 用戶
temp_password = self.db.query(TemporaryPassword).filter_by(
session_id=session_id,
is_used=False
).first()
if temp_password and temp_password.plain_password:
# 建立 Keycloak 用戶(同時設定臨時密碼)
user_id = self.keycloak_service.create_user(
username=sso_username, # 使用英文名稱作為 SSO 帳號
email=admin_email_address,
first_name=tenant_info.admin_legal_name.split()[0] if tenant_info.admin_legal_name else '',
last_name=tenant_info.admin_legal_name.split()[-1] if len(tenant_info.admin_legal_name.split()) > 1 else '',
enabled=True,
temporary_password=temp_password.plain_password # ✅ 設定臨時密碼,強制首次登入修改
)
# 檢查 Keycloak 用戶是否建立成功
if not user_id:
raise ValueError(f"Keycloak 用戶建立失敗: {sso_username}")
# 更新員工任用設定的 Keycloak User ID
emp_setting.tenant_keycloak_user_id = user_id
# 標記臨時密碼已使用(但保留明文密碼供用戶記錄)
temp_password.is_used = True
temp_password.used_at = datetime.now()
# ⚠️ 不立即清除明文密碼,保留給用戶記錄
# temp_password.plain_password = None
# temp_password.plain_password_cleared_at = datetime.now()
# temp_password.cleared_reason = 'keycloak_created'
self.db.commit()
results['keycloak_user_created'] = True
# Step 4.5: 建立郵件帳號 (Docker Mailserver)
try:
from app.services.mailserver_service import MailserverService
from app.utils.password_generator import generate_secure_password
mailserver = MailserverService()
# 為管理員建立郵件帳號
# 郵件地址已在 Step 3 決定: admin_email_address
# 郵件密碼:如果臨時密碼還有明文則使用,否則自動生成新密碼
mail_password = temp_password.plain_password if (temp_password and temp_password.plain_password) else generate_secure_password()
mail_result = mailserver.create_email_account(
email=admin_email_address,
password=mail_password,
quota_mb=emp_setting.email_quota_mb
)
if mail_result.get('success'):
results['mailbox_created'] = True
print(f"[OK] 郵件帳號建立成功: {admin_email_address}")
else:
# 郵件建立失敗僅記錄 warning不中斷初始化流程
print(f"[WARNING] 郵件帳號建立失敗: {mail_result.get('error', 'Unknown error')}")
results['mailbox_created'] = False
results['mailbox_error'] = mail_result.get('error')
except Exception as mail_error:
# 郵件系統錯誤不應中斷初始化流程
print(f"[WARNING] 郵件系統整合失敗: {str(mail_error)}")
results['mailbox_created'] = False
results['mailbox_error'] = str(mail_error)
# Step 5: 分配系統管理員角色
sys_admin_role = self.db.query(UserRole).filter_by(
tenant_id=session.tenant_id,
role_code='SYS_ADMIN'
).first()
if sys_admin_role:
from app.models import UserRoleAssignment
role_assignment = UserRoleAssignment(
tenant_id=session.tenant_id,
keycloak_user_id=emp_setting.tenant_keycloak_user_id,
role_id=sys_admin_role.id,
is_active=True
)
self.db.add(role_assignment)
self.db.commit()
results['roles_assigned'] = True
# 標記初始化完成並自動鎖定
session.status = 'completed'
session.completed_at = datetime.now()
session.completed_steps = 5
session.is_locked = True
session.locked_at = datetime.now()
session.locked_by = 'system'
session.lock_reason = '初始化完成自動鎖定'
tenant_info.is_completed = True
tenant_info.completed_at = datetime.now()
tenant_info.completed_by = session.executed_by
# 更新系統狀態:從 initialization → operational
from app.models.installation import InstallationSystemStatus
system_status = self.db.query(InstallationSystemStatus).filter_by(id=1).first()
if system_status:
system_status.previous_phase = system_status.current_phase
system_status.current_phase = 'operational'
system_status.phase_changed_at = datetime.now()
system_status.phase_changed_by = session.executed_by or 'installer'
system_status.phase_change_reason = '初始化完成,系統進入正式運作階段'
system_status.initialization_completed = True
system_status.initialized_at = datetime.now()
system_status.initialized_by = session.executed_by or 'installer'
system_status.operational_since = datetime.now()
self.db.commit()
# 記錄鎖定日誌
self._log_access(
session_id=session_id,
action='lock',
action_by='system',
action_method='auto',
access_granted=True
)
return results
except Exception as e:
self.db.rollback()
session.status = 'failed'
self.db.commit()
raise Exception(f"初始化執行失敗: {str(e)}")
# ==================== 明文密碼管理 ====================
def clear_plain_password(
self,
session_id: int,
reason: str = 'user_confirmed'
) -> bool:
"""
清除臨時密碼的明文
Args:
session_id: 安裝會話 ID
reason: 清除原因
Returns:
是否成功清除
"""
temp_passwords = self.db.query(TemporaryPassword).filter_by(
session_id=session_id,
is_used=False
).filter(
TemporaryPassword.plain_password.isnot(None)
).all()
for temp_pwd in temp_passwords:
temp_pwd.plain_password = None
temp_pwd.plain_password_cleared_at = datetime.now()
temp_pwd.cleared_reason = reason
self.db.commit()
return len(temp_passwords) > 0
# ==================== 存取控制 ====================
def check_session_access(
self,
session_id: int,
action: str,
action_by: str
) -> tuple[bool, Optional[str]]:
"""
檢查是否可以存取安裝會話的敏感資訊
Args:
session_id: 安裝會話 ID
action: 動作 (view/download_pdf)
action_by: 操作人
Returns:
(是否允許, 拒絕原因)
"""
session = self.db.query(InstallationSession).get(session_id)
if not session:
return False, "安裝會話不存在"
# 檢查鎖定狀態
if session.is_locked:
# 檢查臨時解鎖是否有效
if session.unlock_expires_at and session.unlock_expires_at > datetime.now():
return True, None
else:
return False, "會話已鎖定"
return True, None
def _log_access(
self,
session_id: int,
action: str,
action_by: str,
action_method: str,
access_granted: bool,
deny_reason: Optional[str] = None,
sensitive_data_accessed: Optional[List[str]] = None,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None
) -> InstallationAccessLog:
"""
記錄存取日誌
Args:
session_id: 安裝會話 ID
action: 動作
action_by: 操作人
action_method: 操作方式
access_granted: 是否允許
deny_reason: 拒絕原因
sensitive_data_accessed: 存取的敏感資料
ip_address: IP 位址
user_agent: User Agent
Returns:
存取日誌物件
"""
log = InstallationAccessLog(
session_id=session_id,
action=action,
action_by=action_by,
action_method=action_method,
access_granted=access_granted,
deny_reason=deny_reason,
sensitive_data_accessed=sensitive_data_accessed,
ip_address=ip_address,
user_agent=user_agent
)
self.db.add(log)
self.db.commit()
return log
# ==================== 查詢功能 ====================
def get_session_details(
self,
session_id: int,
include_sensitive: bool = False
) -> Dict[str, Any]:
"""
取得安裝會話詳細資訊
Args:
session_id: 安裝會話 ID
include_sensitive: 是否包含敏感資訊(需檢查存取權限)
Returns:
會話詳細資訊
"""
session = self.db.query(InstallationSession).get(session_id)
if not session:
raise ValueError(f"找不到安裝會話 ID: {session_id}")
result = {
"session": {
"id": session.id,
"session_name": session.session_name,
"environment": session.environment,
"status": session.status,
"started_at": session.started_at,
"completed_at": session.completed_at,
"is_locked": session.is_locked,
"locked_at": session.locked_at,
"lock_reason": session.lock_reason,
"unlock_expires_at": session.unlock_expires_at
},
"tenant_info": None,
"departments": [],
"credentials": None
}
# 租戶資訊
tenant_info = session.tenant_info
if tenant_info:
result["tenant_info"] = {
"company_name": tenant_info.company_name,
"company_name_en": tenant_info.company_name_en,
"tax_id": tenant_info.tax_id,
"admin_username": tenant_info.admin_username,
"admin_email": tenant_info.admin_email,
"admin_legal_name": tenant_info.admin_legal_name
}
# 部門設定
result["departments"] = [
{
"code": d.department_code,
"name": d.department_name,
"name_en": d.department_name_en,
"email_domain": d.email_domain,
"is_created": d.is_created
}
for d in session.department_setups
]
# 敏感資訊(密碼)
if include_sensitive and not session.is_locked:
temp_password = self.db.query(TemporaryPassword).filter_by(
session_id=session_id
).first()
if temp_password:
result["credentials"] = {
"password_visible": temp_password.plain_password is not None,
"plain_password": temp_password.plain_password,
"password_hash": temp_password.password_hash,
"created_at": temp_password.created_at,
"expires_at": temp_password.expires_at,
"view_count": temp_password.view_count,
"cleared_at": temp_password.plain_password_cleared_at,
"cleared_reason": temp_password.cleared_reason
}
return result