Files
hr-portal/backend/app/services/employee_lifecycle.py
Porsche Chen 360533393f feat: HR Portal - Complete Multi-Tenant System with Redis Session Storage
Major Features:
-  Multi-tenant architecture (tenant isolation)
-  Employee CRUD with lifecycle management (onboarding/offboarding)
-  Department tree structure with email domain management
-  Company info management (single-record editing)
-  System functions CRUD (permission management)
-  Email account management (multi-account per employee)
-  Keycloak SSO integration (auth.lab.taipei)
-  Redis session storage (10.1.0.254:6379)
  - Solves Cookie 4KB limitation
  - Cross-system session sharing
  - Sliding expiration (8 hours)
  - Automatic token refresh

Technical Stack:
Backend:
- FastAPI + SQLAlchemy
- PostgreSQL 16 (10.1.0.20:5433)
- Keycloak Admin API integration
- Docker Mailserver integration (SSH)
- Alembic migrations

Frontend:
- Next.js 14 (App Router)
- NextAuth 4 with Keycloak Provider
- Redis session storage (ioredis)
- Tailwind CSS

Infrastructure:
- Redis 7 (10.1.0.254:6379) - Session + Cache
- Keycloak 26.1.0 (auth.lab.taipei)
- Docker Mailserver (10.1.0.254)

Architecture Highlights:
- Session管理由 Keycloak + Redis 統一控制
- 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session
- Token 自動刷新,異質服務整合
- 未來可無縫遷移到雲端

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-23 20:12:43 +08:00

530 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
員工生命週期管理服務
自動化處理員工的新進、異動、離職流程
"""
from typing import Dict, Any, Optional
from sqlalchemy.orm import Session
import logging
import secrets
import string
from app.models.employee import Employee
from app.services.keycloak_admin_client import get_keycloak_admin_client
from app.services.drive_service import get_drive_service_client, get_drive_quota_by_job_level
from app.services.mailserver_service import get_mailserver_service, get_mail_quota_by_job_level
logger = logging.getLogger(__name__)
class EmployeeLifecycleService:
"""員工生命週期管理服務"""
def __init__(self):
self.keycloak_client = None
def _get_keycloak_client(self):
"""延遲初始化 Keycloak Admin 客戶端"""
if self.keycloak_client is None:
self.keycloak_client = get_keycloak_admin_client()
return self.keycloak_client
def _generate_temporary_password(self, length: int = 12) -> str:
"""生成臨時密碼 (包含大小寫字母、數字和特殊字元)"""
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
password = ''.join(secrets.choice(alphabet) for _ in range(length))
return password
async def onboard_employee(
self,
db: Session,
employee: Employee,
create_keycloak: bool = True,
create_email: bool = True,
create_drive: bool = True,
) -> Dict[str, Any]:
"""
員工到職流程 (Onboarding)
自動執行:
1. 建立 Keycloak SSO 帳號
2. 建立主要郵件帳號
3. 建立雲端硬碟帳號 (Drive Service)
4. 記錄審計日誌
Args:
db: 資料庫 Session
employee: 員工物件
create_keycloak: 是否建立 Keycloak 帳號
create_email: 是否建立郵件帳號
create_drive: 是否建立雲端硬碟帳號 (非致命Drive Service 未上線時跳過)
Returns:
執行結果字典
"""
results = {
"employee_id": employee.id,
"employee_number": employee.employee_id,
"legal_name": employee.legal_name,
"username_base": employee.username_base,
"keycloak": {"created": False, "error": None},
"email": {"created": False, "error": None},
"drive": {"created": False, "error": None},
}
logger.info(f"開始員工到職流程: {employee.employee_id} - {employee.legal_name}")
# 1. 建立 Keycloak 帳號
if create_keycloak:
try:
keycloak_result = await self._create_keycloak_account(employee)
results["keycloak"] = keycloak_result
logger.info(f"Keycloak 帳號建立: {keycloak_result}")
except Exception as e:
logger.error(f"建立 Keycloak 帳號失敗: {str(e)}")
results["keycloak"]["error"] = str(e)
# 2. 建立郵件帳號
if create_email:
try:
email_result = await self._create_email_account(employee)
results["email"] = email_result
logger.info(f"郵件帳號建立: {email_result}")
except Exception as e:
logger.error(f"建立郵件帳號失敗: {str(e)}")
results["email"]["error"] = str(e)
# 3. 建立雲端硬碟帳號 (Drive Service - 非致命)
if create_drive:
drive_result = await self._create_drive_account(employee)
results["drive"] = drive_result
if drive_result.get("error"):
logger.warning(f"雲端硬碟帳號建立 (非致命): {drive_result}")
else:
logger.info(f"雲端硬碟帳號建立: {drive_result}")
logger.info(f"員工到職流程完成: {employee.employee_id}")
return results
async def offboard_employee(
self,
db: Session,
employee: Employee,
disable_keycloak: bool = True,
handle_email: str = "forward", # "forward" or "disable"
disable_drive: bool = True,
) -> Dict[str, Any]:
"""
員工離職流程 (Offboarding)
自動執行:
1. 停用 Keycloak SSO 帳號
2. 處理郵件帳號 (轉發或停用)
3. 停用雲端硬碟帳號 (Drive Service - 非致命)
4. 記錄審計日誌
Args:
db: 資料庫 Session
employee: 員工物件
disable_keycloak: 是否停用 Keycloak 帳號
handle_email: 郵件處理方式 ("forward""disable")
disable_drive: 是否停用雲端硬碟帳號 (非致命Drive Service 未上線時跳過)
Returns:
執行結果字典
"""
results = {
"employee_id": employee.id,
"employee_number": employee.employee_id,
"legal_name": employee.legal_name,
"keycloak": {"disabled": False, "error": None},
"email": {"handled": False, "method": handle_email, "error": None},
"drive": {"disabled": False, "error": None},
}
logger.info(f"開始員工離職流程: {employee.employee_id} - {employee.legal_name}")
# 1. 停用 Keycloak 帳號
if disable_keycloak:
try:
keycloak_result = await self._disable_keycloak_account(employee)
results["keycloak"] = keycloak_result
logger.info(f"Keycloak 帳號停用: {keycloak_result}")
except Exception as e:
logger.error(f"停用 Keycloak 帳號失敗: {str(e)}")
results["keycloak"]["error"] = str(e)
# 2. 處理郵件帳號
try:
email_result = await self._handle_email_offboarding(employee, handle_email)
results["email"] = email_result
logger.info(f"郵件帳號處理: {email_result}")
except Exception as e:
logger.error(f"處理郵件帳號失敗: {str(e)}")
results["email"]["error"] = str(e)
# 3. 停用雲端硬碟帳號 (Drive Service - 非致命)
if disable_drive:
drive_result = await self._disable_drive_account(employee)
results["drive"] = drive_result
if drive_result.get("error"):
logger.warning(f"雲端硬碟帳號停用 (非致命): {drive_result}")
else:
logger.info(f"雲端硬碟帳號停用: {drive_result}")
logger.info(f"員工離職流程完成: {employee.employee_id}")
return results
async def _create_keycloak_account(self, employee: Employee) -> Dict[str, Any]:
"""
建立 Keycloak SSO 帳號
執行步驟:
1. 檢查帳號是否已存在
2. 生成臨時密碼
3. 建立 Keycloak 用戶
4. 設定用戶屬性 (姓名、郵件等)
Args:
employee: 員工物件
Returns:
執行結果字典
"""
try:
client = self._get_keycloak_client()
username = employee.username_base
email = f"{username}@porscheworld.tw"
# 1. 檢查帳號是否已存在
existing_user = client.get_user_by_username(username)
if existing_user:
return {
"created": False,
"username": username,
"email": email,
"user_id": existing_user.get("id"),
"message": "Keycloak 帳號已存在",
"error": "用戶已存在",
}
# 2. 生成臨時密碼 (12位隨機密碼)
temporary_password = self._generate_temporary_password(12)
# 3. 分割姓名 (如果有英文名稱使用英文,否則使用中文)
if employee.english_name:
# 英文名稱格式: "FirstName LastName" 或 "FirstName MiddleName LastName"
name_parts = employee.english_name.strip().split()
first_name = name_parts[0] if len(name_parts) > 0 else username
last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else ""
else:
# 中文名稱格式: "姓名" (第一個字是姓,其餘是名)
legal_name = employee.legal_name or username
first_name = legal_name[1:] if len(legal_name) > 1 else legal_name
last_name = legal_name[0] if len(legal_name) > 0 else ""
# 4. 建立 Keycloak 用戶
user_id = client.create_user(
username=username,
email=email,
first_name=first_name,
last_name=last_name,
enabled=True,
email_verified=True,
)
if not user_id:
return {
"created": False,
"username": username,
"email": email,
"message": "Keycloak 用戶建立失敗",
"error": "無法建立用戶 (API 返回 None)",
}
# 5. 設定初始密碼
password_set = client.reset_password(
user_id=user_id,
password=temporary_password,
temporary=True # 用戶首次登入需修改密碼
)
if not password_set:
logger.warning(f"Keycloak 用戶 {username} 建立成功,但密碼設定失敗")
logger.info(f"✓ Keycloak 帳號建立成功: {username} (ID: {user_id})")
return {
"created": True,
"username": username,
"email": email,
"user_id": user_id,
"first_name": first_name,
"last_name": last_name,
"temporary_password": temporary_password, # 應透過安全方式通知用戶
"password_set": password_set,
"message": f"Keycloak 帳號建立成功 (用戶首次登入需修改密碼)",
"error": None,
}
except Exception as e:
logger.error(f"✗ 建立 Keycloak 帳號時發生錯誤: {str(e)}")
return {
"created": False,
"username": employee.username_base,
"email": f"{employee.username_base}@porscheworld.tw",
"message": "建立 Keycloak 帳號時發生錯誤",
"error": str(e),
}
async def _create_email_account(self, employee: Employee) -> Dict[str, Any]:
"""
建立郵件帳號 (Docker Mailserver)
執行步驟:
1. 依職級取得配額
2. 產生臨時密碼 (員工後續透過 Keycloak SSO 登入)
3. 透過 SSH + docker exec 建立帳號
4. 設定配額
"""
email_address = f"{employee.username_base}@porscheworld.tw"
try:
mailserver = get_mailserver_service()
# 依職級取得郵件配額
job_level = getattr(employee, "job_level", "Junior") or "Junior"
quota_mb = get_mail_quota_by_job_level(job_level)
# 產生臨時密碼
temp_password = self._generate_temporary_password()
result = mailserver.create_email_account(
email=email_address,
password=temp_password,
quota_mb=quota_mb,
)
if result["created"]:
logger.info(f"郵件帳號建立成功: {email_address} ({quota_mb}MB)")
else:
logger.warning(f"郵件帳號建立失敗: {email_address} - {result.get('error')}")
return result
except Exception as e:
logger.warning(f"建立郵件帳號時發生非預期錯誤: {str(e)}")
return {
"created": False,
"email": email_address,
"quota_mb": 0,
"message": "建立郵件帳號時發生錯誤",
"error": str(e),
}
async def _create_drive_account(self, employee: Employee) -> Dict[str, Any]:
"""
建立雲端硬碟帳號 (Drive Service)
呼叫 drive-api.ease.taipei 建立 Nextcloud 帳號
Drive Service 未上線時以 warning 記錄,不影響其他流程
"""
try:
client = get_drive_service_client()
from app.core.config import settings
# 根據職級取得配額
job_level = getattr(employee, "job_level", "Junior") or "Junior"
quota_gb = get_drive_quota_by_job_level(job_level)
result = client.create_user(
tenant_id=settings.DRIVE_SERVICE_TENANT_ID,
keycloak_user_id=str(getattr(employee, "keycloak_user_id", "") or ""),
username=employee.username_base,
email=f"{employee.username_base}@porscheworld.tw",
display_name=employee.legal_name or employee.username_base,
quota_gb=quota_gb,
)
return result
except Exception as e:
logger.warning(f"建立雲端硬碟帳號時發生非預期錯誤: {str(e)}")
return {
"created": False,
"username": employee.username_base,
"quota_gb": 0,
"drive_url": None,
"message": "建立雲端硬碟帳號時發生錯誤",
"error": str(e),
}
async def _disable_keycloak_account(self, employee: Employee) -> Dict[str, Any]:
"""
停用 Keycloak SSO 帳號
執行步驟:
1. 查詢用戶 ID
2. 停用帳號 (不刪除,保留審計記錄)
注意: 不刪除帳號,只停用,以保留歷史記錄和審計追蹤
Args:
employee: 員工物件
Returns:
執行結果字典
"""
try:
client = self._get_keycloak_client()
username = employee.username_base
# 1. 查詢用戶
user = client.get_user_by_username(username)
if not user:
return {
"disabled": False,
"username": username,
"message": "Keycloak 帳號不存在",
"error": "用戶不存在",
}
user_id = user.get("id")
# 2. 檢查是否已停用
if not user.get("enabled", False):
return {
"disabled": True,
"username": username,
"user_id": user_id,
"message": "Keycloak 帳號已經是停用狀態",
"error": None,
}
# 3. 停用帳號
success = client.disable_user(user_id)
if success:
logger.info(f"✓ Keycloak 帳號停用成功: {username} (ID: {user_id})")
return {
"disabled": True,
"username": username,
"user_id": user_id,
"message": "Keycloak 帳號已停用 (帳號保留以維持審計記錄)",
"error": None,
}
else:
return {
"disabled": False,
"username": username,
"user_id": user_id,
"message": "Keycloak 帳號停用失敗",
"error": "API 調用失敗",
}
except Exception as e:
logger.error(f"✗ 停用 Keycloak 帳號時發生錯誤: {str(e)}")
return {
"disabled": False,
"username": employee.username_base,
"message": "停用 Keycloak 帳號時發生錯誤",
"error": str(e),
}
async def _handle_email_offboarding(
self, employee: Employee, method: str
) -> Dict[str, Any]:
"""
處理離職員工的郵件帳號 (Docker Mailserver)
Args:
method: "forward" - 停用帳號並標記轉寄
"disable" - 直接刪除郵件帳號
"""
email_address = f"{employee.username_base}@porscheworld.tw"
try:
mailserver = get_mailserver_service()
if method == "forward":
# 刪除帳號 (Docker Mailserver 不支援原生轉寄設定)
# 轉寄規則記錄在 EmailAccount.forward_to由 HR Portal 管理
result = mailserver.delete_email_account(email_address)
return {
"handled": result["deleted"],
"method": "forward",
"email": email_address,
"forward_to": "hr@porscheworld.tw",
"message": "郵件帳號已停用,轉寄規則已記錄" if result["deleted"] else "郵件帳號停用失敗",
"error": result.get("error"),
}
elif method == "disable":
# 刪除郵件帳號
result = mailserver.delete_email_account(email_address)
return {
"handled": result["deleted"],
"method": "disable",
"email": email_address,
"message": "郵件帳號已刪除" if result["deleted"] else "郵件帳號刪除失敗",
"error": result.get("error"),
}
else:
return {
"handled": False,
"method": method,
"email": email_address,
"error": f"不支援的處理方式: {method}",
}
except Exception as e:
logger.warning(f"處理郵件帳號離職時發生非預期錯誤: {str(e)}")
return {
"handled": False,
"method": method,
"email": email_address,
"message": "處理郵件帳號時發生錯誤",
"error": str(e),
}
async def _disable_drive_account(self, employee: Employee) -> Dict[str, Any]:
"""
停用雲端硬碟帳號 (Drive Service)
呼叫 drive-api.ease.taipei 停用 Nextcloud 帳號 (軟刪除,保留檔案)
Drive Service 未上線時以 warning 記錄,不影響其他流程
"""
try:
client = get_drive_service_client()
# 查詢 drive_user_id (目前以 username 查詢)
# Drive Service 上線後需實作 GET /api/v1/drive/users?username={username}
# 暫時回傳 warning 狀態
logger.warning(
f"停用雲端硬碟帳號: {employee.username_base} - "
f"需 Drive Service 上線後實作查詢 user_id 再停用"
)
return {
"disabled": False,
"username": employee.username_base,
"message": "Drive Service 尚未上線,雲端硬碟帳號停用待後續處理",
"error": "Drive Service 未上線",
}
except Exception as e:
logger.warning(f"停用雲端硬碟帳號時發生非預期錯誤: {str(e)}")
return {
"disabled": False,
"username": employee.username_base,
"message": "停用雲端硬碟帳號時發生錯誤",
"error": str(e),
}
# 建立全域實例
employee_lifecycle_service = EmployeeLifecycleService()
def get_employee_lifecycle_service() -> EmployeeLifecycleService:
"""取得員工生命週期服務實例"""
return employee_lifecycle_service