Files
hr-portal/backend/app/services/keycloak_service.py
Porsche Chen 360533393f feat: HR Portal - Complete Multi-Tenant System with Redis Session Storage
Major Features:
-  Multi-tenant architecture (tenant isolation)
-  Employee CRUD with lifecycle management (onboarding/offboarding)
-  Department tree structure with email domain management
-  Company info management (single-record editing)
-  System functions CRUD (permission management)
-  Email account management (multi-account per employee)
-  Keycloak SSO integration (auth.lab.taipei)
-  Redis session storage (10.1.0.254:6379)
  - Solves Cookie 4KB limitation
  - Cross-system session sharing
  - Sliding expiration (8 hours)
  - Automatic token refresh

Technical Stack:
Backend:
- FastAPI + SQLAlchemy
- PostgreSQL 16 (10.1.0.20:5433)
- Keycloak Admin API integration
- Docker Mailserver integration (SSH)
- Alembic migrations

Frontend:
- Next.js 14 (App Router)
- NextAuth 4 with Keycloak Provider
- Redis session storage (ioredis)
- Tailwind CSS

Infrastructure:
- Redis 7 (10.1.0.254:6379) - Session + Cache
- Keycloak 26.1.0 (auth.lab.taipei)
- Docker Mailserver (10.1.0.254)

Architecture Highlights:
- Session管理由 Keycloak + Redis 統一控制
- 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session
- Token 自動刷新,異質服務整合
- 未來可無縫遷移到雲端

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-23 20:12:43 +08:00

333 lines
9.5 KiB
Python

"""
Keycloak SSO 整合服務
"""
from typing import Optional, Dict, Any
from keycloak import KeycloakAdmin, KeycloakOpenID
from keycloak.exceptions import KeycloakError
from app.core.config import settings
class KeycloakService:
"""Keycloak 服務類別"""
def __init__(self):
"""初始化 Keycloak 連線"""
self.server_url = settings.KEYCLOAK_URL
self.realm_name = settings.KEYCLOAK_REALM
self.client_id = settings.KEYCLOAK_CLIENT_ID
self.client_secret = settings.KEYCLOAK_CLIENT_SECRET
self._admin = None
self._openid = None
@property
def admin(self) -> Optional[KeycloakAdmin]:
"""延遲初始化 Keycloak Admin 客戶端"""
if self._admin is None and settings.KEYCLOAK_ADMIN_USERNAME and settings.KEYCLOAK_ADMIN_PASSWORD:
try:
# Keycloak 26.x 需要完整的 server_url (不含 /auth)
self._admin = KeycloakAdmin(
server_url=self.server_url,
username=settings.KEYCLOAK_ADMIN_USERNAME,
password=settings.KEYCLOAK_ADMIN_PASSWORD,
realm_name=self.realm_name,
user_realm_name="master", # Admin 登入的 realm (通常是 master)
verify=True,
timeout=10 # 設定 10 秒超時
)
except Exception as e:
print(f"Warning: Failed to initialize Keycloak Admin: {e}")
import traceback
traceback.print_exc()
return self._admin
@property
def openid(self) -> KeycloakOpenID:
"""延遲初始化 Keycloak OpenID 客戶端"""
if self._openid is None:
self._openid = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=self.realm_name,
client_secret_key=self.client_secret
)
return self._openid
def create_user(
self,
username: str,
email: str,
first_name: str,
last_name: str,
enabled: bool = True,
email_verified: bool = False,
temporary_password: Optional[str] = None,
) -> Optional[str]:
"""
創建 Keycloak 用戶
Args:
username: 用戶名稱 (username_base@email_domain)
email: 郵件地址 (同 username)
first_name: 名字
last_name: 姓氏
enabled: 是否啟用
email_verified: 郵件是否已驗證
temporary_password: 臨時密碼 (用戶首次登入需修改)
Returns:
str: Keycloak User ID (UUID), 失敗返回 None
"""
if not self.admin:
print("Error: Keycloak Admin not initialized")
return None
try:
# 創建用戶
user_data = {
"username": username,
"email": email,
"firstName": first_name,
"lastName": last_name,
"enabled": enabled,
"emailVerified": email_verified,
}
# 如果提供臨時密碼
if temporary_password:
user_data["credentials"] = [{
"type": "password",
"value": temporary_password,
"temporary": True # 用戶首次登入需修改
}]
user_id = self.admin.create_user(user_data)
print(f"[OK] Keycloak user created: {username} (ID: {user_id})")
return user_id
except KeycloakError as e:
print(f"[ERROR] Failed to create Keycloak user {username}: {e}")
return None
def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
"""
根據用戶名獲取用戶資訊
Args:
username: 用戶名稱
Returns:
dict: 用戶資訊, 不存在返回 None
"""
if not self.admin:
return None
try:
users = self.admin.get_users({"username": username})
if users:
return users[0]
return None
except KeycloakError as e:
print(f"[ERROR] Failed to get user {username}: {e}")
return None
def update_user(self, user_id: str, user_data: Dict[str, Any]) -> bool:
"""
更新用戶資訊
Args:
user_id: Keycloak User ID
user_data: 要更新的用戶資料
Returns:
bool: 成功返回 True
"""
if not self.admin:
return False
try:
self.admin.update_user(user_id, user_data)
print(f"[OK] Keycloak user updated: {user_id}")
return True
except KeycloakError as e:
print(f"[ERROR] Failed to update user {user_id}: {e}")
return False
def disable_user(self, user_id: str) -> bool:
"""
停用用戶
Args:
user_id: Keycloak User ID
Returns:
bool: 成功返回 True
"""
return self.update_user(user_id, {"enabled": False})
def enable_user(self, user_id: str) -> bool:
"""
啟用用戶
Args:
user_id: Keycloak User ID
Returns:
bool: 成功返回 True
"""
return self.update_user(user_id, {"enabled": True})
def delete_user(self, user_id: str) -> bool:
"""
刪除用戶
注意: 這是實際刪除,建議使用 disable_user 進行軟刪除
Args:
user_id: Keycloak User ID
Returns:
bool: 成功返回 True
"""
if not self.admin:
return False
try:
self.admin.delete_user(user_id)
print(f"[OK] Keycloak user deleted: {user_id}")
return True
except KeycloakError as e:
print(f"[ERROR] Failed to delete user {user_id}: {e}")
return False
def reset_password(
self,
user_id: str,
new_password: str,
temporary: bool = True
) -> bool:
"""
重設用戶密碼
Args:
user_id: Keycloak User ID
new_password: 新密碼
temporary: 是否為臨時密碼 (用戶首次登入需修改)
Returns:
bool: 成功返回 True
"""
if not self.admin:
return False
try:
self.admin.set_user_password(
user_id,
new_password,
temporary=temporary
)
print(f"[OK] Password reset for user: {user_id}")
return True
except KeycloakError as e:
print(f"[ERROR] Failed to reset password for {user_id}: {e}")
return False
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
驗證 JWT Token
Args:
token: JWT Token
Returns:
dict: Token payload (包含用戶資訊), 無效返回 None
"""
try:
# python-keycloak 會自動從 Keycloak 獲取公鑰並驗證
token_info = self.openid.decode_token(
token,
validate=True # 驗證簽名和過期時間
)
return token_info
except Exception as e:
print(f"[ERROR] Token verification failed: {e}")
return None
def get_user_info_from_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
從 Token 獲取用戶資訊
Args:
token: JWT Token
Returns:
dict: 用戶資訊
"""
token_info = self.verify_token(token)
if not token_info:
return None
return {
"username": token_info.get("preferred_username"),
"email": token_info.get("email"),
"first_name": token_info.get("given_name"),
"last_name": token_info.get("family_name"),
"sub": token_info.get("sub"), # Keycloak User ID
"iss": token_info.get("iss"), # Issuer (用於多租戶)
"realm_access": token_info.get("realm_access"), # 角色資訊
}
def introspect_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
檢查 Token 狀態
Args:
token: JWT Token
Returns:
dict: Token 資訊 (包含 active 狀態)
"""
try:
return self.openid.introspect(token)
except Exception as e:
print(f"[ERROR] Token introspection failed: {e}")
return None
def is_token_active(self, token: str) -> bool:
"""
檢查 Token 是否有效
Args:
token: JWT Token
Returns:
bool: 有效返回 True
"""
introspection = self.introspect_token(token)
if not introspection:
return False
return introspection.get("active", False)
# 全域 Keycloak 服務實例
# keycloak_service = KeycloakService()
# 延遲初始化服務實例
_keycloak_service_instance: Optional[KeycloakService] = None
def get_keycloak_service() -> KeycloakService:
"""獲取 Keycloak 服務實例 (單例)"""
global _keycloak_service_instance
if _keycloak_service_instance is None:
_keycloak_service_instance = KeycloakService()
return _keycloak_service_instance
# 模擬屬性訪問
class _KeycloakServiceProxy:
def __getattr__(self, name):
return getattr(get_keycloak_service(), name)
keycloak_service = _KeycloakServiceProxy()