""" 初始化系統服務 負責初始化流程的業務邏輯 """ from datetime import datetime, timedelta from typing import Optional, Dict, Any, List from sqlalchemy.orm import Session from app.models import ( InstallationSession, InstallationTenantInfo, InstallationDepartmentSetup, TemporaryPassword, InstallationAccessLog, Tenant, Department, UserRole ) from app.models.emp_resume import EmpResume from app.models.emp_setting import EmpSetting from app.utils.password_generator import generate_secure_password, hash_password from app.services.keycloak_service import KeycloakService class InstallationService: """初始化服務""" def __init__(self, db: Session): self.db = db self.keycloak_service = KeycloakService() # ==================== Phase 0: 建立安裝會話 ==================== def create_session( self, tenant_id: int, environment: str, executed_by: str, session_name: Optional[str] = None ) -> InstallationSession: """ 建立新的安裝會話 Args: tenant_id: 租戶 ID environment: 環境 (development/testing/production) executed_by: 執行人 session_name: 會話名稱(可選) Returns: 安裝會話物件 """ session = InstallationSession( tenant_id=tenant_id, session_name=session_name or f"{environment} 環境初始化", environment=environment, status='in_progress', executed_by=executed_by, started_at=datetime.now() ) self.db.add(session) self.db.commit() self.db.refresh(session) # 記錄審計日誌 self._log_access( session_id=session.id, action='create_session', action_by=executed_by, action_method='api', access_granted=True ) return session # ==================== Phase 2: 公司資訊設定 ==================== def save_tenant_info( self, session_id: int, tenant_info_data: Dict[str, Any] ) -> InstallationTenantInfo: """ 儲存租戶初始化資訊 Args: session_id: 安裝會話 ID tenant_info_data: 租戶資訊字典 Returns: 租戶初始化資訊物件 """ session = self.db.query(InstallationSession).get(session_id) if not session: raise ValueError(f"找不到安裝會話 ID: {session_id}") # 檢查是否已存在(優先用 session_id,找不到則用 tenant_id) tenant_info = self.db.query(InstallationTenantInfo).filter_by( session_id=session_id ).first() # 如果 session_id 找不到,嘗試用 tenant_id 查詢(處理舊數據) if not tenant_info: tenant_info = self.db.query(InstallationTenantInfo).filter_by( tenant_id=session.tenant_id ).first() if tenant_info: # 更新現有資料(同時更新 session_id 以保持一致性) tenant_info.session_id = session_id for key, value in tenant_info_data.items(): if hasattr(tenant_info, key): setattr(tenant_info, key, value) tenant_info.updated_at = datetime.now() else: # 建立新資料 tenant_info = InstallationTenantInfo( tenant_id=session.tenant_id, session_id=session_id, **tenant_info_data ) self.db.add(tenant_info) self.db.commit() self.db.refresh(tenant_info) return tenant_info def setup_admin_credentials( self, session_id: int, admin_data: Dict[str, Any], password_method: str = 'auto', manual_password: Optional[str] = None ) -> tuple[InstallationTenantInfo, str]: """ 設定系統管理員並產生初始密碼 Args: session_id: 安裝會話 ID admin_data: 管理員資訊 password_method: 密碼設定方式 (auto/manual) manual_password: 手動設定的密碼(如果 method='manual') Returns: (租戶資訊物件, 明文密碼) Raises: ValueError: 如果密碼驗證失敗 """ from app.utils.password_generator import validate_password_for_user session = self.db.query(InstallationSession).get(session_id) if not session: raise ValueError(f"找不到安裝會話 ID: {session_id}") # 產生或驗證密碼 if password_method == 'auto': initial_password = generate_secure_password(16) else: if not manual_password: raise ValueError("手動設定密碼時必須提供 manual_password") # 驗證密碼強度 is_valid, errors = validate_password_for_user( manual_password, username=admin_data.get('admin_username'), name=admin_data.get('admin_legal_name'), email=admin_data.get('admin_email') ) if not is_valid: raise ValueError(f"密碼驗證失敗: {', '.join(errors)}") initial_password = manual_password # 加密密碼 password_hash = hash_password(initial_password) # 儲存管理員資訊 tenant_info = self.save_tenant_info(session_id, admin_data) # 建立臨時密碼記錄 temp_password = TemporaryPassword( tenant_id=session.tenant_id, username=admin_data.get('admin_english_name', 'admin'), # ✅ 使用 admin_english_name (SSO 帳號) session_id=session_id, password_hash=password_hash, plain_password=initial_password, # 明文密碼(僅此階段保存) password_method=password_method, is_temporary=True, must_change_on_login=True, created_at=datetime.now(), expires_at=datetime.now() + timedelta(days=7), # 7 天有效期 is_viewable=True, viewable_until=datetime.now() + timedelta(hours=1) # 1 小時內可查看 ) self.db.add(temp_password) self.db.commit() return tenant_info, initial_password # ==================== Phase 3: 組織架構設定 ==================== def setup_departments( self, session_id: int, departments_data: List[Dict[str, Any]] ) -> List[InstallationDepartmentSetup]: """ 設定部門架構 Args: session_id: 安裝會話 ID departments_data: 部門資訊列表 Returns: 部門設定物件列表 """ session = self.db.query(InstallationSession).get(session_id) if not session: raise ValueError(f"找不到安裝會話 ID: {session_id}") dept_setups = [] for dept_data in departments_data: dept_setup = InstallationDepartmentSetup( tenant_id=session.tenant_id, session_id=session_id, **dept_data ) self.db.add(dept_setup) dept_setups.append(dept_setup) self.db.commit() return dept_setups # ==================== Phase 4: 執行初始化 ==================== def execute_initialization( self, session_id: int ) -> Dict[str, Any]: """ 執行完整的初始化流程 Args: session_id: 安裝會話 ID Returns: 執行結果 Raises: Exception: 如果任何步驟失敗 """ session = self.db.query(InstallationSession).get(session_id) if not session: raise ValueError(f"找不到安裝會話 ID: {session_id}") tenant_info = self.db.query(InstallationTenantInfo).filter_by( session_id=session_id ).first() if not tenant_info: # 調試:查看資料庫中所有的 tenant_info 記錄 all_tenant_infos = self.db.query(InstallationTenantInfo).all() tenant_info_list = [f"ID:{t.id}, SessionID:{t.session_id}, TenantID:{t.tenant_id}" for t in all_tenant_infos] raise ValueError( f"找不到租戶初始化資訊 (session_id={session_id})。" f"資料庫中現有記錄: {tenant_info_list}" ) results = { 'tenant_updated': False, 'departments_created': 0, 'admin_created': False, 'keycloak_user_created': False, 'mailbox_created': False, 'roles_assigned': False } try: # Step 1: 建立或更新租戶基本資料 if session.tenant_id: # 更新現有租戶 tenant = self.db.query(Tenant).get(session.tenant_id) if not tenant: raise ValueError(f"找不到租戶 ID: {session.tenant_id}") else: # 建立新租戶 (初始化流程) # ⚠️ 租戶代碼和 Keycloak Realm 必須為小寫 tenant_code_lower = tenant_info.tenant_code.lower() tenant = Tenant( name=tenant_info.company_name, name_eng=tenant_info.company_name_en, code=tenant_code_lower, keycloak_realm=tenant_code_lower, # Keycloak Realm = tenant_code (小寫) prefix=tenant_info.tenant_prefix, tax_id=tenant_info.tax_id, tel=tenant_info.tel, add=tenant_info.add, domain_set=tenant_info.domain_set, domain=tenant_info.domain, is_sysmana=True, # 初始化建立的第一個租戶為系統管理公司 is_active=True ) self.db.add(tenant) self.db.flush() # 取得 tenant.id # 更新 session 的 tenant_id session.tenant_id = tenant.id tenant_info.tenant_id = tenant.id # 更新租戶資料 (如果是更新模式) if session.tenant_id and tenant: if tenant_info.company_name: tenant.name = tenant_info.company_name if tenant_info.company_name_en: tenant.name_eng = tenant_info.company_name_en if tenant_info.tenant_code: tenant.code = tenant_info.tenant_code if tenant_info.tenant_prefix: tenant.prefix = tenant_info.tenant_prefix if tenant_info.tax_id: tenant.tax_id = tenant_info.tax_id if tenant_info.tel: tenant.tel = tenant_info.tel if tenant_info.add: tenant.add = tenant_info.add if tenant_info.domain_set: tenant.domain_set = tenant_info.domain_set if tenant_info.domain: tenant.domain = tenant_info.domain tenant.is_initialized = True tenant.initialized_at = datetime.now() tenant.initialized_by = session.executed_by self.db.commit() results['tenant_updated'] = True # Step 2: 建立「初始化部門」(每個租戶必備) init_dept_exists = self.db.query(Department).filter_by( tenant_id=session.tenant_id, code='INIT' ).first() init_dept = None if not init_dept_exists: # 決定初始化部門的 email_domain # domain_set=1 (組織網域): 使用 tenant.domain # domain_set=2 (部門網域): 使用 tenant_info.domain 作為初始化部門的網域 if tenant_info.domain_set == 1: # 組織網域模式: 必須設定 domain if not tenant_info.domain: raise ValueError("組織網域模式必須設定網域") init_dept_domain = tenant_info.domain else: # 部門網域模式: domain 是初始化部門的預設網域 if not tenant_info.domain: raise ValueError("請設定初始化部門的預設網域") init_dept_domain = tenant_info.domain init_dept = Department( tenant_id=session.tenant_id, seq_no=1, # 第一個部門序號為 1 code='INIT', name='初始化部門', name_en='Initialization Department', email_domain=init_dept_domain, depth=0, description='系統初始化專用部門,待組織架構建立完成後可刪除或保留', is_active=True ) self.db.add(init_dept) self.db.commit() self.db.refresh(init_dept) results['departments_created'] += 1 # Step 2.1: 建立用戶自訂的其他部門 (如果有) dept_setups = self.db.query(InstallationDepartmentSetup).filter_by( session_id=session_id, is_created=False ).all() for dept_setup in dept_setups: dept = Department( tenant_id=session.tenant_id, code=dept_setup.department_code, name=dept_setup.department_name, name_en=dept_setup.department_name_en, email_domain=dept_setup.email_domain, depth=dept_setup.depth, is_active=True ) self.db.add(dept) dept_setup.is_created = True results['departments_created'] += 1 self.db.commit() # Step 3: 建立系統管理員員工 (歸屬於初始化部門) # 取得初始化部門 ID if not init_dept: init_dept = self.db.query(Department).filter_by( tenant_id=session.tenant_id, code='INIT' ).first() if not init_dept: raise ValueError("找不到初始化部門,無法建立管理員") # 決定 SSO 帳號名稱:使用英文名稱(第一個管理員不會有衝突) sso_username = tenant_info.admin_english_name if not sso_username: raise ValueError("管理員英文名稱為必填項") # 檢查是否已存在(使用 tenant_emp_settings 複合主鍵) admin_exists = self.db.query(EmpSetting).filter_by( tenant_id=session.tenant_id, seq_no=1 # 第一個員工 ).first() if not admin_exists: # Step 3-1: 建立人員基本資料 (EmpResume) resume = EmpResume( tenant_id=session.tenant_id, seq_no=1, # 第一個員工序號為 1 legal_name=tenant_info.admin_legal_name, english_name=tenant_info.admin_english_name, id_number=f"INIT{session.tenant_id}001", # 初始化用臨時身分證號 mobile=tenant_info.admin_phone, personal_email=tenant_info.admin_email, is_active=True ) self.db.add(resume) self.db.commit() self.db.refresh(resume) results['resumes_created'] = 1 # Step 3-2: 建立員工任用設定 (EmpSetting) - 複合主鍵 emp_setting = EmpSetting( tenant_id=session.tenant_id, seq_no=1, # 第一個員工(或由觸發器自動生成) tenant_resume_id=resume.id, tenant_emp_code=f"{tenant_info.tenant_prefix}0001", # 或由觸發器自動生成 hire_at=datetime.now().date(), employment_type='full_time', employment_status='active', primary_dept_id=init_dept.id, storage_quota_gb=100, # 管理員預設 100GB email_quota_mb=10240, # 管理員預設 10GB tenant_keycloak_username=sso_username, # 優先使用英文名稱,有衝突時使用 admin_username is_active=True ) self.db.add(emp_setting) self.db.commit() self.db.refresh(emp_setting) results['emp_settings_created'] = 1 # 決定郵件網域:使用初始化部門的網域 if not init_dept.email_domain: raise ValueError("初始化部門未設定郵件網域,請檢查設定") # 管理員郵件地址:使用 SSO 帳號名稱 + 網域 admin_email_address = f"{sso_username}@{init_dept.email_domain}" # Step 4: 建立 Keycloak 用戶 temp_password = self.db.query(TemporaryPassword).filter_by( session_id=session_id, is_used=False ).first() if temp_password and temp_password.plain_password: # 建立 Keycloak 用戶(同時設定臨時密碼) user_id = self.keycloak_service.create_user( username=sso_username, # 使用英文名稱作為 SSO 帳號 email=admin_email_address, first_name=tenant_info.admin_legal_name.split()[0] if tenant_info.admin_legal_name else '', last_name=tenant_info.admin_legal_name.split()[-1] if len(tenant_info.admin_legal_name.split()) > 1 else '', enabled=True, temporary_password=temp_password.plain_password # ✅ 設定臨時密碼,強制首次登入修改 ) # 檢查 Keycloak 用戶是否建立成功 if not user_id: raise ValueError(f"Keycloak 用戶建立失敗: {sso_username}") # 更新員工任用設定的 Keycloak User ID emp_setting.tenant_keycloak_user_id = user_id # 標記臨時密碼已使用(但保留明文密碼供用戶記錄) temp_password.is_used = True temp_password.used_at = datetime.now() # ⚠️ 不立即清除明文密碼,保留給用戶記錄 # temp_password.plain_password = None # temp_password.plain_password_cleared_at = datetime.now() # temp_password.cleared_reason = 'keycloak_created' self.db.commit() results['keycloak_user_created'] = True # Step 4.5: 建立郵件帳號 (Docker Mailserver) try: from app.services.mailserver_service import MailserverService from app.utils.password_generator import generate_secure_password mailserver = MailserverService() # 為管理員建立郵件帳號 # 郵件地址已在 Step 3 決定: admin_email_address # 郵件密碼:如果臨時密碼還有明文則使用,否則自動生成新密碼 mail_password = temp_password.plain_password if (temp_password and temp_password.plain_password) else generate_secure_password() mail_result = mailserver.create_email_account( email=admin_email_address, password=mail_password, quota_mb=emp_setting.email_quota_mb ) if mail_result.get('success'): results['mailbox_created'] = True print(f"[OK] 郵件帳號建立成功: {admin_email_address}") else: # 郵件建立失敗僅記錄 warning,不中斷初始化流程 print(f"[WARNING] 郵件帳號建立失敗: {mail_result.get('error', 'Unknown error')}") results['mailbox_created'] = False results['mailbox_error'] = mail_result.get('error') except Exception as mail_error: # 郵件系統錯誤不應中斷初始化流程 print(f"[WARNING] 郵件系統整合失敗: {str(mail_error)}") results['mailbox_created'] = False results['mailbox_error'] = str(mail_error) # Step 5: 分配系統管理員角色 sys_admin_role = self.db.query(UserRole).filter_by( tenant_id=session.tenant_id, role_code='SYS_ADMIN' ).first() if sys_admin_role: from app.models import UserRoleAssignment role_assignment = UserRoleAssignment( tenant_id=session.tenant_id, keycloak_user_id=emp_setting.tenant_keycloak_user_id, role_id=sys_admin_role.id, is_active=True ) self.db.add(role_assignment) self.db.commit() results['roles_assigned'] = True # 標記初始化完成並自動鎖定 session.status = 'completed' session.completed_at = datetime.now() session.completed_steps = 5 session.is_locked = True session.locked_at = datetime.now() session.locked_by = 'system' session.lock_reason = '初始化完成自動鎖定' tenant_info.is_completed = True tenant_info.completed_at = datetime.now() tenant_info.completed_by = session.executed_by # 更新系統狀態:從 initialization → operational from app.models.installation import InstallationSystemStatus system_status = self.db.query(InstallationSystemStatus).filter_by(id=1).first() if system_status: system_status.previous_phase = system_status.current_phase system_status.current_phase = 'operational' system_status.phase_changed_at = datetime.now() system_status.phase_changed_by = session.executed_by or 'installer' system_status.phase_change_reason = '初始化完成,系統進入正式運作階段' system_status.initialization_completed = True system_status.initialized_at = datetime.now() system_status.initialized_by = session.executed_by or 'installer' system_status.operational_since = datetime.now() self.db.commit() # 記錄鎖定日誌 self._log_access( session_id=session_id, action='lock', action_by='system', action_method='auto', access_granted=True ) return results except Exception as e: self.db.rollback() session.status = 'failed' self.db.commit() raise Exception(f"初始化執行失敗: {str(e)}") # ==================== 明文密碼管理 ==================== def clear_plain_password( self, session_id: int, reason: str = 'user_confirmed' ) -> bool: """ 清除臨時密碼的明文 Args: session_id: 安裝會話 ID reason: 清除原因 Returns: 是否成功清除 """ temp_passwords = self.db.query(TemporaryPassword).filter_by( session_id=session_id, is_used=False ).filter( TemporaryPassword.plain_password.isnot(None) ).all() for temp_pwd in temp_passwords: temp_pwd.plain_password = None temp_pwd.plain_password_cleared_at = datetime.now() temp_pwd.cleared_reason = reason self.db.commit() return len(temp_passwords) > 0 # ==================== 存取控制 ==================== def check_session_access( self, session_id: int, action: str, action_by: str ) -> tuple[bool, Optional[str]]: """ 檢查是否可以存取安裝會話的敏感資訊 Args: session_id: 安裝會話 ID action: 動作 (view/download_pdf) action_by: 操作人 Returns: (是否允許, 拒絕原因) """ session = self.db.query(InstallationSession).get(session_id) if not session: return False, "安裝會話不存在" # 檢查鎖定狀態 if session.is_locked: # 檢查臨時解鎖是否有效 if session.unlock_expires_at and session.unlock_expires_at > datetime.now(): return True, None else: return False, "會話已鎖定" return True, None def _log_access( self, session_id: int, action: str, action_by: str, action_method: str, access_granted: bool, deny_reason: Optional[str] = None, sensitive_data_accessed: Optional[List[str]] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None ) -> InstallationAccessLog: """ 記錄存取日誌 Args: session_id: 安裝會話 ID action: 動作 action_by: 操作人 action_method: 操作方式 access_granted: 是否允許 deny_reason: 拒絕原因 sensitive_data_accessed: 存取的敏感資料 ip_address: IP 位址 user_agent: User Agent Returns: 存取日誌物件 """ log = InstallationAccessLog( session_id=session_id, action=action, action_by=action_by, action_method=action_method, access_granted=access_granted, deny_reason=deny_reason, sensitive_data_accessed=sensitive_data_accessed, ip_address=ip_address, user_agent=user_agent ) self.db.add(log) self.db.commit() return log # ==================== 查詢功能 ==================== def get_session_details( self, session_id: int, include_sensitive: bool = False ) -> Dict[str, Any]: """ 取得安裝會話詳細資訊 Args: session_id: 安裝會話 ID include_sensitive: 是否包含敏感資訊(需檢查存取權限) Returns: 會話詳細資訊 """ session = self.db.query(InstallationSession).get(session_id) if not session: raise ValueError(f"找不到安裝會話 ID: {session_id}") result = { "session": { "id": session.id, "session_name": session.session_name, "environment": session.environment, "status": session.status, "started_at": session.started_at, "completed_at": session.completed_at, "is_locked": session.is_locked, "locked_at": session.locked_at, "lock_reason": session.lock_reason, "unlock_expires_at": session.unlock_expires_at }, "tenant_info": None, "departments": [], "credentials": None } # 租戶資訊 tenant_info = session.tenant_info if tenant_info: result["tenant_info"] = { "company_name": tenant_info.company_name, "company_name_en": tenant_info.company_name_en, "tax_id": tenant_info.tax_id, "admin_username": tenant_info.admin_username, "admin_email": tenant_info.admin_email, "admin_legal_name": tenant_info.admin_legal_name } # 部門設定 result["departments"] = [ { "code": d.department_code, "name": d.department_name, "name_en": d.department_name_en, "email_domain": d.email_domain, "is_created": d.is_created } for d in session.department_setups ] # 敏感資訊(密碼) if include_sensitive and not session.is_locked: temp_password = self.db.query(TemporaryPassword).filter_by( session_id=session_id ).first() if temp_password: result["credentials"] = { "password_visible": temp_password.plain_password is not None, "plain_password": temp_password.plain_password, "password_hash": temp_password.password_hash, "created_at": temp_password.created_at, "expires_at": temp_password.expires_at, "view_count": temp_password.view_count, "cleared_at": temp_password.plain_password_cleared_at, "cleared_reason": temp_password.cleared_reason } return result