Major Features: - ✅ Multi-tenant architecture (tenant isolation) - ✅ Employee CRUD with lifecycle management (onboarding/offboarding) - ✅ Department tree structure with email domain management - ✅ Company info management (single-record editing) - ✅ System functions CRUD (permission management) - ✅ Email account management (multi-account per employee) - ✅ Keycloak SSO integration (auth.lab.taipei) - ✅ Redis session storage (10.1.0.254:6379) - Solves Cookie 4KB limitation - Cross-system session sharing - Sliding expiration (8 hours) - Automatic token refresh Technical Stack: Backend: - FastAPI + SQLAlchemy - PostgreSQL 16 (10.1.0.20:5433) - Keycloak Admin API integration - Docker Mailserver integration (SSH) - Alembic migrations Frontend: - Next.js 14 (App Router) - NextAuth 4 with Keycloak Provider - Redis session storage (ioredis) - Tailwind CSS Infrastructure: - Redis 7 (10.1.0.254:6379) - Session + Cache - Keycloak 26.1.0 (auth.lab.taipei) - Docker Mailserver (10.1.0.254) Architecture Highlights: - Session管理由 Keycloak + Redis 統一控制 - 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session - Token 自動刷新,異質服務整合 - 未來可無縫遷移到雲端 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
790 lines
29 KiB
Python
790 lines
29 KiB
Python
"""
|
||
初始化系統服務
|
||
負責初始化流程的業務邏輯
|
||
"""
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Dict, Any, List
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.models import (
|
||
InstallationSession,
|
||
InstallationTenantInfo,
|
||
InstallationDepartmentSetup,
|
||
TemporaryPassword,
|
||
InstallationAccessLog,
|
||
Tenant,
|
||
Department,
|
||
UserRole
|
||
)
|
||
from app.models.emp_resume import EmpResume
|
||
from app.models.emp_setting import EmpSetting
|
||
from app.utils.password_generator import generate_secure_password, hash_password
|
||
from app.services.keycloak_service import KeycloakService
|
||
|
||
|
||
class InstallationService:
|
||
"""初始化服務"""
|
||
|
||
def __init__(self, db: Session):
|
||
self.db = db
|
||
self.keycloak_service = KeycloakService()
|
||
|
||
# ==================== Phase 0: 建立安裝會話 ====================
|
||
|
||
def create_session(
|
||
self,
|
||
tenant_id: int,
|
||
environment: str,
|
||
executed_by: str,
|
||
session_name: Optional[str] = None
|
||
) -> InstallationSession:
|
||
"""
|
||
建立新的安裝會話
|
||
|
||
Args:
|
||
tenant_id: 租戶 ID
|
||
environment: 環境 (development/testing/production)
|
||
executed_by: 執行人
|
||
session_name: 會話名稱(可選)
|
||
|
||
Returns:
|
||
安裝會話物件
|
||
"""
|
||
session = InstallationSession(
|
||
tenant_id=tenant_id,
|
||
session_name=session_name or f"{environment} 環境初始化",
|
||
environment=environment,
|
||
status='in_progress',
|
||
executed_by=executed_by,
|
||
started_at=datetime.now()
|
||
)
|
||
self.db.add(session)
|
||
self.db.commit()
|
||
self.db.refresh(session)
|
||
|
||
# 記錄審計日誌
|
||
self._log_access(
|
||
session_id=session.id,
|
||
action='create_session',
|
||
action_by=executed_by,
|
||
action_method='api',
|
||
access_granted=True
|
||
)
|
||
|
||
return session
|
||
|
||
# ==================== Phase 2: 公司資訊設定 ====================
|
||
|
||
def save_tenant_info(
|
||
self,
|
||
session_id: int,
|
||
tenant_info_data: Dict[str, Any]
|
||
) -> InstallationTenantInfo:
|
||
"""
|
||
儲存租戶初始化資訊
|
||
|
||
Args:
|
||
session_id: 安裝會話 ID
|
||
tenant_info_data: 租戶資訊字典
|
||
|
||
Returns:
|
||
租戶初始化資訊物件
|
||
"""
|
||
session = self.db.query(InstallationSession).get(session_id)
|
||
if not session:
|
||
raise ValueError(f"找不到安裝會話 ID: {session_id}")
|
||
|
||
# 檢查是否已存在(優先用 session_id,找不到則用 tenant_id)
|
||
tenant_info = self.db.query(InstallationTenantInfo).filter_by(
|
||
session_id=session_id
|
||
).first()
|
||
|
||
# 如果 session_id 找不到,嘗試用 tenant_id 查詢(處理舊數據)
|
||
if not tenant_info:
|
||
tenant_info = self.db.query(InstallationTenantInfo).filter_by(
|
||
tenant_id=session.tenant_id
|
||
).first()
|
||
|
||
if tenant_info:
|
||
# 更新現有資料(同時更新 session_id 以保持一致性)
|
||
tenant_info.session_id = session_id
|
||
for key, value in tenant_info_data.items():
|
||
if hasattr(tenant_info, key):
|
||
setattr(tenant_info, key, value)
|
||
tenant_info.updated_at = datetime.now()
|
||
else:
|
||
# 建立新資料
|
||
tenant_info = InstallationTenantInfo(
|
||
tenant_id=session.tenant_id,
|
||
session_id=session_id,
|
||
**tenant_info_data
|
||
)
|
||
self.db.add(tenant_info)
|
||
|
||
self.db.commit()
|
||
self.db.refresh(tenant_info)
|
||
|
||
return tenant_info
|
||
|
||
def setup_admin_credentials(
|
||
self,
|
||
session_id: int,
|
||
admin_data: Dict[str, Any],
|
||
password_method: str = 'auto',
|
||
manual_password: Optional[str] = None
|
||
) -> tuple[InstallationTenantInfo, str]:
|
||
"""
|
||
設定系統管理員並產生初始密碼
|
||
|
||
Args:
|
||
session_id: 安裝會話 ID
|
||
admin_data: 管理員資訊
|
||
password_method: 密碼設定方式 (auto/manual)
|
||
manual_password: 手動設定的密碼(如果 method='manual')
|
||
|
||
Returns:
|
||
(租戶資訊物件, 明文密碼)
|
||
|
||
Raises:
|
||
ValueError: 如果密碼驗證失敗
|
||
"""
|
||
from app.utils.password_generator import validate_password_for_user
|
||
|
||
session = self.db.query(InstallationSession).get(session_id)
|
||
if not session:
|
||
raise ValueError(f"找不到安裝會話 ID: {session_id}")
|
||
|
||
# 產生或驗證密碼
|
||
if password_method == 'auto':
|
||
initial_password = generate_secure_password(16)
|
||
else:
|
||
if not manual_password:
|
||
raise ValueError("手動設定密碼時必須提供 manual_password")
|
||
|
||
# 驗證密碼強度
|
||
is_valid, errors = validate_password_for_user(
|
||
manual_password,
|
||
username=admin_data.get('admin_username'),
|
||
name=admin_data.get('admin_legal_name'),
|
||
email=admin_data.get('admin_email')
|
||
)
|
||
if not is_valid:
|
||
raise ValueError(f"密碼驗證失敗: {', '.join(errors)}")
|
||
|
||
initial_password = manual_password
|
||
|
||
# 加密密碼
|
||
password_hash = hash_password(initial_password)
|
||
|
||
# 儲存管理員資訊
|
||
tenant_info = self.save_tenant_info(session_id, admin_data)
|
||
|
||
# 建立臨時密碼記錄
|
||
temp_password = TemporaryPassword(
|
||
tenant_id=session.tenant_id,
|
||
username=admin_data.get('admin_english_name', 'admin'), # ✅ 使用 admin_english_name (SSO 帳號)
|
||
session_id=session_id,
|
||
password_hash=password_hash,
|
||
plain_password=initial_password, # 明文密碼(僅此階段保存)
|
||
password_method=password_method,
|
||
is_temporary=True,
|
||
must_change_on_login=True,
|
||
created_at=datetime.now(),
|
||
expires_at=datetime.now() + timedelta(days=7), # 7 天有效期
|
||
is_viewable=True,
|
||
viewable_until=datetime.now() + timedelta(hours=1) # 1 小時內可查看
|
||
)
|
||
self.db.add(temp_password)
|
||
self.db.commit()
|
||
|
||
return tenant_info, initial_password
|
||
|
||
# ==================== Phase 3: 組織架構設定 ====================
|
||
|
||
def setup_departments(
|
||
self,
|
||
session_id: int,
|
||
departments_data: List[Dict[str, Any]]
|
||
) -> List[InstallationDepartmentSetup]:
|
||
"""
|
||
設定部門架構
|
||
|
||
Args:
|
||
session_id: 安裝會話 ID
|
||
departments_data: 部門資訊列表
|
||
|
||
Returns:
|
||
部門設定物件列表
|
||
"""
|
||
session = self.db.query(InstallationSession).get(session_id)
|
||
if not session:
|
||
raise ValueError(f"找不到安裝會話 ID: {session_id}")
|
||
|
||
dept_setups = []
|
||
for dept_data in departments_data:
|
||
dept_setup = InstallationDepartmentSetup(
|
||
tenant_id=session.tenant_id,
|
||
session_id=session_id,
|
||
**dept_data
|
||
)
|
||
self.db.add(dept_setup)
|
||
dept_setups.append(dept_setup)
|
||
|
||
self.db.commit()
|
||
|
||
return dept_setups
|
||
|
||
# ==================== Phase 4: 執行初始化 ====================
|
||
|
||
def execute_initialization(
|
||
self,
|
||
session_id: int
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
執行完整的初始化流程
|
||
|
||
Args:
|
||
session_id: 安裝會話 ID
|
||
|
||
Returns:
|
||
執行結果
|
||
|
||
Raises:
|
||
Exception: 如果任何步驟失敗
|
||
"""
|
||
session = self.db.query(InstallationSession).get(session_id)
|
||
if not session:
|
||
raise ValueError(f"找不到安裝會話 ID: {session_id}")
|
||
|
||
tenant_info = self.db.query(InstallationTenantInfo).filter_by(
|
||
session_id=session_id
|
||
).first()
|
||
if not tenant_info:
|
||
# 調試:查看資料庫中所有的 tenant_info 記錄
|
||
all_tenant_infos = self.db.query(InstallationTenantInfo).all()
|
||
tenant_info_list = [f"ID:{t.id}, SessionID:{t.session_id}, TenantID:{t.tenant_id}" for t in all_tenant_infos]
|
||
raise ValueError(
|
||
f"找不到租戶初始化資訊 (session_id={session_id})。"
|
||
f"資料庫中現有記錄: {tenant_info_list}"
|
||
)
|
||
|
||
results = {
|
||
'tenant_updated': False,
|
||
'departments_created': 0,
|
||
'admin_created': False,
|
||
'keycloak_user_created': False,
|
||
'mailbox_created': False,
|
||
'roles_assigned': False
|
||
}
|
||
|
||
try:
|
||
# Step 1: 建立或更新租戶基本資料
|
||
if session.tenant_id:
|
||
# 更新現有租戶
|
||
tenant = self.db.query(Tenant).get(session.tenant_id)
|
||
if not tenant:
|
||
raise ValueError(f"找不到租戶 ID: {session.tenant_id}")
|
||
else:
|
||
# 建立新租戶 (初始化流程)
|
||
# ⚠️ 租戶代碼和 Keycloak Realm 必須為小寫
|
||
tenant_code_lower = tenant_info.tenant_code.lower()
|
||
|
||
tenant = Tenant(
|
||
name=tenant_info.company_name,
|
||
name_eng=tenant_info.company_name_en,
|
||
code=tenant_code_lower,
|
||
keycloak_realm=tenant_code_lower, # Keycloak Realm = tenant_code (小寫)
|
||
prefix=tenant_info.tenant_prefix,
|
||
tax_id=tenant_info.tax_id,
|
||
tel=tenant_info.tel,
|
||
add=tenant_info.add,
|
||
domain_set=tenant_info.domain_set,
|
||
domain=tenant_info.domain,
|
||
is_sysmana=True, # 初始化建立的第一個租戶為系統管理公司
|
||
is_active=True
|
||
)
|
||
self.db.add(tenant)
|
||
self.db.flush() # 取得 tenant.id
|
||
|
||
# 更新 session 的 tenant_id
|
||
session.tenant_id = tenant.id
|
||
tenant_info.tenant_id = tenant.id
|
||
|
||
# 更新租戶資料 (如果是更新模式)
|
||
if session.tenant_id and tenant:
|
||
if tenant_info.company_name:
|
||
tenant.name = tenant_info.company_name
|
||
if tenant_info.company_name_en:
|
||
tenant.name_eng = tenant_info.company_name_en
|
||
if tenant_info.tenant_code:
|
||
tenant.code = tenant_info.tenant_code
|
||
if tenant_info.tenant_prefix:
|
||
tenant.prefix = tenant_info.tenant_prefix
|
||
if tenant_info.tax_id:
|
||
tenant.tax_id = tenant_info.tax_id
|
||
if tenant_info.tel:
|
||
tenant.tel = tenant_info.tel
|
||
if tenant_info.add:
|
||
tenant.add = tenant_info.add
|
||
if tenant_info.domain_set:
|
||
tenant.domain_set = tenant_info.domain_set
|
||
if tenant_info.domain:
|
||
tenant.domain = tenant_info.domain
|
||
|
||
tenant.is_initialized = True
|
||
tenant.initialized_at = datetime.now()
|
||
tenant.initialized_by = session.executed_by
|
||
self.db.commit()
|
||
results['tenant_updated'] = True
|
||
|
||
# Step 2: 建立「初始化部門」(每個租戶必備)
|
||
init_dept_exists = self.db.query(Department).filter_by(
|
||
tenant_id=session.tenant_id,
|
||
code='INIT'
|
||
).first()
|
||
|
||
init_dept = None
|
||
if not init_dept_exists:
|
||
# 決定初始化部門的 email_domain
|
||
# domain_set=1 (組織網域): 使用 tenant.domain
|
||
# domain_set=2 (部門網域): 使用 tenant_info.domain 作為初始化部門的網域
|
||
if tenant_info.domain_set == 1:
|
||
# 組織網域模式: 必須設定 domain
|
||
if not tenant_info.domain:
|
||
raise ValueError("組織網域模式必須設定網域")
|
||
init_dept_domain = tenant_info.domain
|
||
else:
|
||
# 部門網域模式: domain 是初始化部門的預設網域
|
||
if not tenant_info.domain:
|
||
raise ValueError("請設定初始化部門的預設網域")
|
||
init_dept_domain = tenant_info.domain
|
||
|
||
init_dept = Department(
|
||
tenant_id=session.tenant_id,
|
||
seq_no=1, # 第一個部門序號為 1
|
||
code='INIT',
|
||
name='初始化部門',
|
||
name_en='Initialization Department',
|
||
email_domain=init_dept_domain,
|
||
depth=0,
|
||
description='系統初始化專用部門,待組織架構建立完成後可刪除或保留',
|
||
is_active=True
|
||
)
|
||
self.db.add(init_dept)
|
||
self.db.commit()
|
||
self.db.refresh(init_dept)
|
||
results['departments_created'] += 1
|
||
|
||
# Step 2.1: 建立用戶自訂的其他部門 (如果有)
|
||
dept_setups = self.db.query(InstallationDepartmentSetup).filter_by(
|
||
session_id=session_id,
|
||
is_created=False
|
||
).all()
|
||
|
||
for dept_setup in dept_setups:
|
||
dept = Department(
|
||
tenant_id=session.tenant_id,
|
||
code=dept_setup.department_code,
|
||
name=dept_setup.department_name,
|
||
name_en=dept_setup.department_name_en,
|
||
email_domain=dept_setup.email_domain,
|
||
depth=dept_setup.depth,
|
||
is_active=True
|
||
)
|
||
self.db.add(dept)
|
||
dept_setup.is_created = True
|
||
results['departments_created'] += 1
|
||
|
||
self.db.commit()
|
||
|
||
# Step 3: 建立系統管理員員工 (歸屬於初始化部門)
|
||
# 取得初始化部門 ID
|
||
if not init_dept:
|
||
init_dept = self.db.query(Department).filter_by(
|
||
tenant_id=session.tenant_id,
|
||
code='INIT'
|
||
).first()
|
||
|
||
if not init_dept:
|
||
raise ValueError("找不到初始化部門,無法建立管理員")
|
||
|
||
# 決定 SSO 帳號名稱:使用英文名稱(第一個管理員不會有衝突)
|
||
sso_username = tenant_info.admin_english_name
|
||
if not sso_username:
|
||
raise ValueError("管理員英文名稱為必填項")
|
||
|
||
# 檢查是否已存在(使用 tenant_emp_settings 複合主鍵)
|
||
admin_exists = self.db.query(EmpSetting).filter_by(
|
||
tenant_id=session.tenant_id,
|
||
seq_no=1 # 第一個員工
|
||
).first()
|
||
|
||
if not admin_exists:
|
||
# Step 3-1: 建立人員基本資料 (EmpResume)
|
||
resume = EmpResume(
|
||
tenant_id=session.tenant_id,
|
||
seq_no=1, # 第一個員工序號為 1
|
||
legal_name=tenant_info.admin_legal_name,
|
||
english_name=tenant_info.admin_english_name,
|
||
id_number=f"INIT{session.tenant_id}001", # 初始化用臨時身分證號
|
||
mobile=tenant_info.admin_phone,
|
||
personal_email=tenant_info.admin_email,
|
||
is_active=True
|
||
)
|
||
self.db.add(resume)
|
||
self.db.commit()
|
||
self.db.refresh(resume)
|
||
results['resumes_created'] = 1
|
||
|
||
# Step 3-2: 建立員工任用設定 (EmpSetting) - 複合主鍵
|
||
emp_setting = EmpSetting(
|
||
tenant_id=session.tenant_id,
|
||
seq_no=1, # 第一個員工(或由觸發器自動生成)
|
||
tenant_resume_id=resume.id,
|
||
tenant_emp_code=f"{tenant_info.tenant_prefix}0001", # 或由觸發器自動生成
|
||
hire_at=datetime.now().date(),
|
||
employment_type='full_time',
|
||
employment_status='active',
|
||
primary_dept_id=init_dept.id,
|
||
storage_quota_gb=100, # 管理員預設 100GB
|
||
email_quota_mb=10240, # 管理員預設 10GB
|
||
tenant_keycloak_username=sso_username, # 優先使用英文名稱,有衝突時使用 admin_username
|
||
is_active=True
|
||
)
|
||
self.db.add(emp_setting)
|
||
self.db.commit()
|
||
self.db.refresh(emp_setting)
|
||
results['emp_settings_created'] = 1
|
||
|
||
# 決定郵件網域:使用初始化部門的網域
|
||
if not init_dept.email_domain:
|
||
raise ValueError("初始化部門未設定郵件網域,請檢查設定")
|
||
|
||
# 管理員郵件地址:使用 SSO 帳號名稱 + 網域
|
||
admin_email_address = f"{sso_username}@{init_dept.email_domain}"
|
||
|
||
# Step 4: 建立 Keycloak 用戶
|
||
temp_password = self.db.query(TemporaryPassword).filter_by(
|
||
session_id=session_id,
|
||
is_used=False
|
||
).first()
|
||
|
||
if temp_password and temp_password.plain_password:
|
||
# 建立 Keycloak 用戶(同時設定臨時密碼)
|
||
user_id = self.keycloak_service.create_user(
|
||
username=sso_username, # 使用英文名稱作為 SSO 帳號
|
||
email=admin_email_address,
|
||
first_name=tenant_info.admin_legal_name.split()[0] if tenant_info.admin_legal_name else '',
|
||
last_name=tenant_info.admin_legal_name.split()[-1] if len(tenant_info.admin_legal_name.split()) > 1 else '',
|
||
enabled=True,
|
||
temporary_password=temp_password.plain_password # ✅ 設定臨時密碼,強制首次登入修改
|
||
)
|
||
|
||
# 檢查 Keycloak 用戶是否建立成功
|
||
if not user_id:
|
||
raise ValueError(f"Keycloak 用戶建立失敗: {sso_username}")
|
||
|
||
# 更新員工任用設定的 Keycloak User ID
|
||
emp_setting.tenant_keycloak_user_id = user_id
|
||
|
||
# 標記臨時密碼已使用(但保留明文密碼供用戶記錄)
|
||
temp_password.is_used = True
|
||
temp_password.used_at = datetime.now()
|
||
# ⚠️ 不立即清除明文密碼,保留給用戶記錄
|
||
# temp_password.plain_password = None
|
||
# temp_password.plain_password_cleared_at = datetime.now()
|
||
# temp_password.cleared_reason = 'keycloak_created'
|
||
|
||
self.db.commit()
|
||
results['keycloak_user_created'] = True
|
||
|
||
# Step 4.5: 建立郵件帳號 (Docker Mailserver)
|
||
try:
|
||
from app.services.mailserver_service import MailserverService
|
||
from app.utils.password_generator import generate_secure_password
|
||
|
||
mailserver = MailserverService()
|
||
|
||
# 為管理員建立郵件帳號
|
||
# 郵件地址已在 Step 3 決定: admin_email_address
|
||
# 郵件密碼:如果臨時密碼還有明文則使用,否則自動生成新密碼
|
||
mail_password = temp_password.plain_password if (temp_password and temp_password.plain_password) else generate_secure_password()
|
||
|
||
mail_result = mailserver.create_email_account(
|
||
email=admin_email_address,
|
||
password=mail_password,
|
||
quota_mb=emp_setting.email_quota_mb
|
||
)
|
||
|
||
if mail_result.get('success'):
|
||
results['mailbox_created'] = True
|
||
print(f"[OK] 郵件帳號建立成功: {admin_email_address}")
|
||
else:
|
||
# 郵件建立失敗僅記錄 warning,不中斷初始化流程
|
||
print(f"[WARNING] 郵件帳號建立失敗: {mail_result.get('error', 'Unknown error')}")
|
||
results['mailbox_created'] = False
|
||
results['mailbox_error'] = mail_result.get('error')
|
||
|
||
except Exception as mail_error:
|
||
# 郵件系統錯誤不應中斷初始化流程
|
||
print(f"[WARNING] 郵件系統整合失敗: {str(mail_error)}")
|
||
results['mailbox_created'] = False
|
||
results['mailbox_error'] = str(mail_error)
|
||
|
||
# Step 5: 分配系統管理員角色
|
||
sys_admin_role = self.db.query(UserRole).filter_by(
|
||
tenant_id=session.tenant_id,
|
||
role_code='SYS_ADMIN'
|
||
).first()
|
||
|
||
if sys_admin_role:
|
||
from app.models import UserRoleAssignment
|
||
role_assignment = UserRoleAssignment(
|
||
tenant_id=session.tenant_id,
|
||
keycloak_user_id=emp_setting.tenant_keycloak_user_id,
|
||
role_id=sys_admin_role.id,
|
||
is_active=True
|
||
)
|
||
self.db.add(role_assignment)
|
||
self.db.commit()
|
||
results['roles_assigned'] = True
|
||
|
||
# 標記初始化完成並自動鎖定
|
||
session.status = 'completed'
|
||
session.completed_at = datetime.now()
|
||
session.completed_steps = 5
|
||
session.is_locked = True
|
||
session.locked_at = datetime.now()
|
||
session.locked_by = 'system'
|
||
session.lock_reason = '初始化完成自動鎖定'
|
||
|
||
tenant_info.is_completed = True
|
||
tenant_info.completed_at = datetime.now()
|
||
tenant_info.completed_by = session.executed_by
|
||
|
||
# 更新系統狀態:從 initialization → operational
|
||
from app.models.installation import InstallationSystemStatus
|
||
system_status = self.db.query(InstallationSystemStatus).filter_by(id=1).first()
|
||
if system_status:
|
||
system_status.previous_phase = system_status.current_phase
|
||
system_status.current_phase = 'operational'
|
||
system_status.phase_changed_at = datetime.now()
|
||
system_status.phase_changed_by = session.executed_by or 'installer'
|
||
system_status.phase_change_reason = '初始化完成,系統進入正式運作階段'
|
||
system_status.initialization_completed = True
|
||
system_status.initialized_at = datetime.now()
|
||
system_status.initialized_by = session.executed_by or 'installer'
|
||
system_status.operational_since = datetime.now()
|
||
|
||
self.db.commit()
|
||
|
||
# 記錄鎖定日誌
|
||
self._log_access(
|
||
session_id=session_id,
|
||
action='lock',
|
||
action_by='system',
|
||
action_method='auto',
|
||
access_granted=True
|
||
)
|
||
|
||
return results
|
||
|
||
except Exception as e:
|
||
self.db.rollback()
|
||
session.status = 'failed'
|
||
self.db.commit()
|
||
raise Exception(f"初始化執行失敗: {str(e)}")
|
||
|
||
# ==================== 明文密碼管理 ====================
|
||
|
||
def clear_plain_password(
|
||
self,
|
||
session_id: int,
|
||
reason: str = 'user_confirmed'
|
||
) -> bool:
|
||
"""
|
||
清除臨時密碼的明文
|
||
|
||
Args:
|
||
session_id: 安裝會話 ID
|
||
reason: 清除原因
|
||
|
||
Returns:
|
||
是否成功清除
|
||
"""
|
||
temp_passwords = self.db.query(TemporaryPassword).filter_by(
|
||
session_id=session_id,
|
||
is_used=False
|
||
).filter(
|
||
TemporaryPassword.plain_password.isnot(None)
|
||
).all()
|
||
|
||
for temp_pwd in temp_passwords:
|
||
temp_pwd.plain_password = None
|
||
temp_pwd.plain_password_cleared_at = datetime.now()
|
||
temp_pwd.cleared_reason = reason
|
||
|
||
self.db.commit()
|
||
|
||
return len(temp_passwords) > 0
|
||
|
||
# ==================== 存取控制 ====================
|
||
|
||
def check_session_access(
|
||
self,
|
||
session_id: int,
|
||
action: str,
|
||
action_by: str
|
||
) -> tuple[bool, Optional[str]]:
|
||
"""
|
||
檢查是否可以存取安裝會話的敏感資訊
|
||
|
||
Args:
|
||
session_id: 安裝會話 ID
|
||
action: 動作 (view/download_pdf)
|
||
action_by: 操作人
|
||
|
||
Returns:
|
||
(是否允許, 拒絕原因)
|
||
"""
|
||
session = self.db.query(InstallationSession).get(session_id)
|
||
if not session:
|
||
return False, "安裝會話不存在"
|
||
|
||
# 檢查鎖定狀態
|
||
if session.is_locked:
|
||
# 檢查臨時解鎖是否有效
|
||
if session.unlock_expires_at and session.unlock_expires_at > datetime.now():
|
||
return True, None
|
||
else:
|
||
return False, "會話已鎖定"
|
||
|
||
return True, None
|
||
|
||
def _log_access(
|
||
self,
|
||
session_id: int,
|
||
action: str,
|
||
action_by: str,
|
||
action_method: str,
|
||
access_granted: bool,
|
||
deny_reason: Optional[str] = None,
|
||
sensitive_data_accessed: Optional[List[str]] = None,
|
||
ip_address: Optional[str] = None,
|
||
user_agent: Optional[str] = None
|
||
) -> InstallationAccessLog:
|
||
"""
|
||
記錄存取日誌
|
||
|
||
Args:
|
||
session_id: 安裝會話 ID
|
||
action: 動作
|
||
action_by: 操作人
|
||
action_method: 操作方式
|
||
access_granted: 是否允許
|
||
deny_reason: 拒絕原因
|
||
sensitive_data_accessed: 存取的敏感資料
|
||
ip_address: IP 位址
|
||
user_agent: User Agent
|
||
|
||
Returns:
|
||
存取日誌物件
|
||
"""
|
||
log = InstallationAccessLog(
|
||
session_id=session_id,
|
||
action=action,
|
||
action_by=action_by,
|
||
action_method=action_method,
|
||
access_granted=access_granted,
|
||
deny_reason=deny_reason,
|
||
sensitive_data_accessed=sensitive_data_accessed,
|
||
ip_address=ip_address,
|
||
user_agent=user_agent
|
||
)
|
||
self.db.add(log)
|
||
self.db.commit()
|
||
return log
|
||
|
||
# ==================== 查詢功能 ====================
|
||
|
||
def get_session_details(
|
||
self,
|
||
session_id: int,
|
||
include_sensitive: bool = False
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
取得安裝會話詳細資訊
|
||
|
||
Args:
|
||
session_id: 安裝會話 ID
|
||
include_sensitive: 是否包含敏感資訊(需檢查存取權限)
|
||
|
||
Returns:
|
||
會話詳細資訊
|
||
"""
|
||
session = self.db.query(InstallationSession).get(session_id)
|
||
if not session:
|
||
raise ValueError(f"找不到安裝會話 ID: {session_id}")
|
||
|
||
result = {
|
||
"session": {
|
||
"id": session.id,
|
||
"session_name": session.session_name,
|
||
"environment": session.environment,
|
||
"status": session.status,
|
||
"started_at": session.started_at,
|
||
"completed_at": session.completed_at,
|
||
"is_locked": session.is_locked,
|
||
"locked_at": session.locked_at,
|
||
"lock_reason": session.lock_reason,
|
||
"unlock_expires_at": session.unlock_expires_at
|
||
},
|
||
"tenant_info": None,
|
||
"departments": [],
|
||
"credentials": None
|
||
}
|
||
|
||
# 租戶資訊
|
||
tenant_info = session.tenant_info
|
||
if tenant_info:
|
||
result["tenant_info"] = {
|
||
"company_name": tenant_info.company_name,
|
||
"company_name_en": tenant_info.company_name_en,
|
||
"tax_id": tenant_info.tax_id,
|
||
"admin_username": tenant_info.admin_username,
|
||
"admin_email": tenant_info.admin_email,
|
||
"admin_legal_name": tenant_info.admin_legal_name
|
||
}
|
||
|
||
# 部門設定
|
||
result["departments"] = [
|
||
{
|
||
"code": d.department_code,
|
||
"name": d.department_name,
|
||
"name_en": d.department_name_en,
|
||
"email_domain": d.email_domain,
|
||
"is_created": d.is_created
|
||
}
|
||
for d in session.department_setups
|
||
]
|
||
|
||
# 敏感資訊(密碼)
|
||
if include_sensitive and not session.is_locked:
|
||
temp_password = self.db.query(TemporaryPassword).filter_by(
|
||
session_id=session_id
|
||
).first()
|
||
|
||
if temp_password:
|
||
result["credentials"] = {
|
||
"password_visible": temp_password.plain_password is not None,
|
||
"plain_password": temp_password.plain_password,
|
||
"password_hash": temp_password.password_hash,
|
||
"created_at": temp_password.created_at,
|
||
"expires_at": temp_password.expires_at,
|
||
"view_count": temp_password.view_count,
|
||
"cleared_at": temp_password.plain_password_cleared_at,
|
||
"cleared_reason": temp_password.cleared_reason
|
||
}
|
||
|
||
return result
|