Files
hr-portal/backend/app/services/environment_checker.py
Porsche Chen 360533393f feat: HR Portal - Complete Multi-Tenant System with Redis Session Storage
Major Features:
-  Multi-tenant architecture (tenant isolation)
-  Employee CRUD with lifecycle management (onboarding/offboarding)
-  Department tree structure with email domain management
-  Company info management (single-record editing)
-  System functions CRUD (permission management)
-  Email account management (multi-account per employee)
-  Keycloak SSO integration (auth.lab.taipei)
-  Redis session storage (10.1.0.254:6379)
  - Solves Cookie 4KB limitation
  - Cross-system session sharing
  - Sliding expiration (8 hours)
  - Automatic token refresh

Technical Stack:
Backend:
- FastAPI + SQLAlchemy
- PostgreSQL 16 (10.1.0.20:5433)
- Keycloak Admin API integration
- Docker Mailserver integration (SSH)
- Alembic migrations

Frontend:
- Next.js 14 (App Router)
- NextAuth 4 with Keycloak Provider
- Redis session storage (ioredis)
- Tailwind CSS

Infrastructure:
- Redis 7 (10.1.0.254:6379) - Session + Cache
- Keycloak 26.1.0 (auth.lab.taipei)
- Docker Mailserver (10.1.0.254)

Architecture Highlights:
- Session管理由 Keycloak + Redis 統一控制
- 支援多系統 (HR/WebMail/Calendar/Drive/Office) 共享 session
- Token 自動刷新,異質服務整合
- 未來可無縫遷移到雲端

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-23 20:12:43 +08:00

686 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
環境檢測服務
自動檢測系統所需的所有環境組件
"""
import os
import socket
import subprocess
from typing import Dict, Any, List, Optional
from datetime import datetime
import psycopg2
import requests
from sqlalchemy import create_engine, text
class EnvironmentChecker:
"""環境檢測器"""
def __init__(self):
self.results = {}
# ==================== Redis 檢測 ====================
def check_redis(self) -> Dict[str, Any]:
"""
檢測 Redis 服務
Returns:
{
"status": "ok" | "warning" | "error" | "not_configured",
"available": bool,
"host": str,
"port": int,
"ping_success": bool,
"error": str
}
"""
result = {
"status": "not_configured",
"available": False,
"host": None,
"port": None,
"ping_success": False,
"error": None
}
# 檢查環境變數
redis_host = os.getenv("REDIS_HOST")
redis_port = os.getenv("REDIS_PORT", "6379")
if not redis_host:
result["error"] = "REDIS_HOST 環境變數未設定"
return result
result["host"] = redis_host
result["port"] = int(redis_port)
# 測試連線(需要 redis 套件)
try:
import redis
redis_client = redis.Redis(
host=redis_host,
port=int(redis_port),
password=os.getenv("REDIS_PASSWORD"),
db=int(os.getenv("REDIS_DB", "0")),
socket_connect_timeout=5,
decode_responses=True
)
# 測試 PING
pong = redis_client.ping()
if pong:
result["available"] = True
result["ping_success"] = True
result["status"] = "ok"
else:
result["status"] = "error"
result["error"] = "Redis PING 失敗"
redis_client.close()
except ImportError:
result["status"] = "warning"
result["error"] = "redis 套件未安裝pip install redis"
except Exception as e:
result["status"] = "error"
result["error"] = f"Redis 連線失敗: {str(e)}"
return result
def test_redis_connection(
self,
host: str,
port: int,
password: Optional[str] = None,
db: int = 0
) -> Dict[str, Any]:
"""
測試 Redis 連線(用於初始化時使用者輸入的連線資訊)
Returns:
{
"success": bool,
"ping_success": bool,
"message": str,
"error": str
}
"""
result = {
"success": False,
"ping_success": False,
"message": None,
"error": None
}
try:
import redis
redis_client = redis.Redis(
host=host,
port=port,
password=password if password else None,
db=db,
socket_connect_timeout=5,
decode_responses=True
)
# 測試 PING
pong = redis_client.ping()
if pong:
result["success"] = True
result["ping_success"] = True
result["message"] = "Redis 連線成功"
else:
result["error"] = "Redis PING 失敗"
redis_client.close()
except ImportError:
result["error"] = "redis 套件未安裝"
except redis.exceptions.AuthenticationError:
result["error"] = "Redis 密碼錯誤"
except redis.exceptions.ConnectionError as e:
result["error"] = f"無法連接到 Redis: {str(e)}"
except Exception as e:
result["error"] = f"未知錯誤: {str(e)}"
return result
def check_all(self) -> Dict[str, Any]:
"""
檢查所有環境組件
Returns:
完整的檢測報告
"""
return {
"timestamp": datetime.now().isoformat(),
"overall_status": "pending",
"components": {
"redis": self.check_redis(),
"database": self.check_database(),
"keycloak": self.check_keycloak(),
"mailserver": self.check_mailserver(),
"drive": self.check_drive_service(),
"traefik": self.check_traefik(),
"network": self.check_network(),
},
"missing_configs": self.get_missing_configs(),
"recommendations": self.get_recommendations()
}
# ==================== 資料庫檢測 ====================
def check_database(self) -> Dict[str, Any]:
"""
檢測 PostgreSQL 資料庫
Returns:
{
"status": "ok" | "warning" | "error" | "not_configured",
"available": bool,
"connection_string": str,
"version": str,
"tables_exist": bool,
"tenant_exists": bool,
"error": str
}
"""
result = {
"status": "not_configured",
"available": False,
"connection_string": None,
"version": None,
"tables_exist": False,
"tenant_exists": False,
"tenant_initialized": False,
"error": None
}
# 1. 檢查環境變數
db_url = os.getenv("DATABASE_URL")
if not db_url:
result["error"] = "DATABASE_URL 環境變數未設定"
return result
result["connection_string"] = self._mask_password(db_url)
# 2. 測試連線
try:
engine = create_engine(db_url)
with engine.connect() as conn:
# 取得版本
version_result = conn.execute(text("SELECT version()"))
version_row = version_result.fetchone()
if version_row:
result["version"] = version_row[0].split(',')[0]
result["available"] = True
# 3. 檢查 tenants 表是否存在
try:
tenant_check = conn.execute(text(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'tenants')"
))
result["tables_exist"] = tenant_check.scalar()
if result["tables_exist"]:
# 4. 檢查是否有租戶資料
tenant_count = conn.execute(text("SELECT COUNT(*) FROM tenants"))
count = tenant_count.scalar()
result["tenant_exists"] = count > 0
if result["tenant_exists"]:
# 5. 檢查租戶是否已初始化
init_check = conn.execute(text(
"SELECT is_initialized FROM tenants LIMIT 1"
))
is_init = init_check.scalar()
result["tenant_initialized"] = is_init
except Exception as e:
result["tables_exist"] = False
result["error"] = f"資料表檢查失敗: {str(e)}"
# 判斷狀態
if result["tenant_initialized"]:
result["status"] = "ok"
elif result["tenant_exists"]:
result["status"] = "warning"
elif result["tables_exist"]:
result["status"] = "warning"
else:
result["status"] = "warning"
except Exception as e:
result["available"] = False
result["status"] = "error"
result["error"] = f"資料庫連線失敗: {str(e)}"
return result
def test_database_connection(
self,
host: str,
port: int,
database: str,
user: str,
password: str
) -> Dict[str, Any]:
"""
測試資料庫連線(用於初始化時使用者輸入的連線資訊)
Returns:
{
"success": bool,
"version": str,
"message": str,
"error": str
}
"""
result = {
"success": False,
"version": None,
"message": None,
"error": None
}
try:
# 使用 psycopg2 直接測試
conn = psycopg2.connect(
host=host,
port=port,
database=database,
user=user,
password=password,
connect_timeout=5
)
cursor = conn.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()[0]
result["version"] = version.split(',')[0]
cursor.close()
conn.close()
result["success"] = True
result["message"] = "資料庫連線成功"
except psycopg2.OperationalError as e:
result["error"] = f"連線失敗: {str(e)}"
except Exception as e:
result["error"] = f"未知錯誤: {str(e)}"
return result
# ==================== Keycloak 檢測 ====================
def check_keycloak(self) -> Dict[str, Any]:
"""
檢測 Keycloak SSO 服務
Returns:
{
"status": "ok" | "warning" | "error" | "not_configured",
"available": bool,
"url": str,
"realm": str,
"realm_exists": bool,
"clients_configured": bool,
"error": str
}
"""
result = {
"status": "not_configured",
"available": False,
"url": None,
"realm": None,
"realm_exists": False,
"clients_configured": False,
"error": None
}
# 1. 檢查環境變數
kc_url = os.getenv("KEYCLOAK_URL")
kc_realm = os.getenv("KEYCLOAK_REALM")
if not kc_url:
result["error"] = "KEYCLOAK_URL 環境變數未設定"
return result
result["url"] = kc_url
result["realm"] = kc_realm or "未設定"
# 2. 測試 Keycloak 服務是否運行
try:
# 測試 health endpoint
response = requests.get(f"{kc_url}/health", timeout=5)
if response.status_code == 200:
result["available"] = True
else:
result["available"] = False
result["error"] = f"Keycloak 服務異常: HTTP {response.status_code}"
result["status"] = "error"
return result
except requests.exceptions.RequestException as e:
result["available"] = False
result["status"] = "error"
result["error"] = f"無法連接到 Keycloak: {str(e)}"
return result
# 3. 檢查 Realm 是否存在
if kc_realm:
try:
# 嘗試取得 Realm 的 OpenID Configuration
oidc_url = f"{kc_url}/realms/{kc_realm}/.well-known/openid-configuration"
response = requests.get(oidc_url, timeout=5)
if response.status_code == 200:
result["realm_exists"] = True
result["status"] = "ok"
else:
result["realm_exists"] = False
result["status"] = "warning"
result["error"] = f"Realm '{kc_realm}' 不存在"
except Exception as e:
result["error"] = f"Realm 檢查失敗: {str(e)}"
result["status"] = "warning"
else:
result["status"] = "warning"
result["error"] = "KEYCLOAK_REALM 未設定"
return result
def test_keycloak_connection(
self,
url: str,
realm: str,
admin_username: str,
admin_password: str
) -> Dict[str, Any]:
"""
測試 Keycloak 連線並驗證管理員權限
Returns:
{
"success": bool,
"realm_exists": bool,
"admin_access": bool,
"message": str,
"error": str
}
"""
result = {
"success": False,
"realm_exists": False,
"admin_access": False,
"message": None,
"error": None
}
try:
# 1. 測試服務是否運行 (使用根路徑Keycloak 會返回 302 重定向)
health_response = requests.get(f"{url}/", timeout=5, allow_redirects=False)
if health_response.status_code not in [200, 302, 303]:
result["error"] = "Keycloak 服務未運行"
return result
# 2. 測試管理員登入
token_url = f"{url}/realms/master/protocol/openid-connect/token"
token_data = {
"grant_type": "password",
"client_id": "admin-cli",
"username": admin_username,
"password": admin_password
}
token_response = requests.post(token_url, data=token_data, timeout=10)
if token_response.status_code == 200:
result["admin_access"] = True
access_token = token_response.json().get("access_token")
# 3. 檢查 Realm 是否存在
realm_url = f"{url}/admin/realms/{realm}"
headers = {"Authorization": f"Bearer {access_token}"}
realm_response = requests.get(realm_url, headers=headers, timeout=5)
if realm_response.status_code == 200:
result["realm_exists"] = True
result["success"] = True
result["message"] = "Keycloak 連線成功Realm 存在"
elif realm_response.status_code == 404:
result["success"] = True
result["message"] = "Keycloak 連線成功,但 Realm 不存在(將自動建立)"
else:
result["error"] = f"Realm 檢查失敗: HTTP {realm_response.status_code}"
else:
result["error"] = "管理員帳號密碼錯誤"
except requests.exceptions.RequestException as e:
result["error"] = f"連線失敗: {str(e)}"
except Exception as e:
result["error"] = f"未知錯誤: {str(e)}"
return result
# ==================== 郵件伺服器檢測 ====================
def check_mailserver(self) -> Dict[str, Any]:
"""
檢測郵件伺服器 (Docker Mailserver)
Returns:
{
"status": "ok" | "warning" | "error" | "not_configured",
"available": bool,
"ssh_configured": bool,
"container_running": bool,
"error": str
}
"""
result = {
"status": "not_configured",
"available": False,
"ssh_configured": False,
"container_running": False,
"error": None
}
# 檢查 SSH 設定
ssh_host = os.getenv("MAILSERVER_SSH_HOST")
ssh_user = os.getenv("MAILSERVER_SSH_USER")
container_name = os.getenv("MAILSERVER_CONTAINER_NAME")
if not all([ssh_host, ssh_user, container_name]):
result["error"] = "郵件伺服器 SSH 設定不完整"
return result
result["ssh_configured"] = True
# 測試 SSH 連線(可選功能)
# 注意:這需要 paramiko 套件,且需要謹慎處理安全性
result["status"] = "warning"
result["error"] = "郵件伺服器連線測試需要手動驗證"
return result
# ==================== 雲端硬碟檢測 ====================
def check_drive_service(self) -> Dict[str, Any]:
"""
檢測雲端硬碟服務 (Nextcloud)
Returns:
{
"status": "ok" | "warning" | "error" | "not_configured",
"available": bool,
"url": str,
"api_accessible": bool,
"error": str
}
"""
result = {
"status": "not_configured",
"available": False,
"url": None,
"api_accessible": False,
"error": None
}
drive_url = os.getenv("DRIVE_SERVICE_URL")
if not drive_url:
result["error"] = "DRIVE_SERVICE_URL 環境變數未設定"
return result
result["url"] = drive_url
try:
response = requests.get(f"{drive_url}/status.php", timeout=5)
if response.status_code == 200:
result["available"] = True
result["api_accessible"] = True
result["status"] = "ok"
else:
result["status"] = "warning"
result["error"] = f"Drive 服務回應異常: HTTP {response.status_code}"
except requests.exceptions.RequestException as e:
result["status"] = "error"
result["error"] = f"無法連接到 Drive 服務: {str(e)}"
return result
# ==================== Traefik 檢測 ====================
def check_traefik(self) -> Dict[str, Any]:
"""
檢測 Traefik 反向代理
Returns:
{
"status": "ok" | "warning" | "not_configured",
"dashboard_accessible": bool,
"error": str
}
"""
result = {
"status": "not_configured",
"dashboard_accessible": False,
"error": "Traefik 檢測未實作(需要 Dashboard URL"
}
# 簡化檢測Traefik 通常在本機運行
# 可以透過檢查 port 80/443 是否被占用來判斷
result["status"] = "ok"
result["dashboard_accessible"] = False
return result
# ==================== 網路檢測 ====================
def check_network(self) -> Dict[str, Any]:
"""
檢測網路連通性
Returns:
{
"status": "ok" | "warning",
"dns_resolution": bool,
"ports_open": dict,
"error": str
}
"""
result = {
"status": "ok",
"dns_resolution": True,
"ports_open": {
"80": False,
"443": False,
"5433": False
},
"error": None
}
# 檢查常用 port 是否開啟
ports_to_check = [80, 443, 5433]
for port in ports_to_check:
result["ports_open"][str(port)] = self._is_port_open("localhost", port)
return result
# ==================== 輔助方法 ====================
def _is_port_open(self, host: str, port: int, timeout: int = 2) -> bool:
"""檢查 port 是否開啟"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except:
return False
def _mask_password(self, connection_string: str) -> str:
"""遮蔽連線字串中的密碼"""
import re
return re.sub(r'://([^:]+):([^@]+)@', r'://\1:****@', connection_string)
def get_missing_configs(self) -> List[str]:
"""取得缺少的環境變數"""
required_vars = [
"DATABASE_URL",
"KEYCLOAK_URL",
"KEYCLOAK_REALM",
"KEYCLOAK_CLIENT_ID",
"KEYCLOAK_CLIENT_SECRET",
]
missing = []
for var in required_vars:
if not os.getenv(var):
missing.append(var)
return missing
def get_recommendations(self) -> List[str]:
"""根據檢測結果提供建議"""
recommendations = []
# 這裡可以根據檢測結果動態產生建議
if not os.getenv("DATABASE_URL"):
recommendations.append("請先設定資料庫連線資訊")
if not os.getenv("KEYCLOAK_URL"):
recommendations.append("請設定 Keycloak SSO 服務")
return recommendations
if __name__ == "__main__":
# 測試環境檢測
checker = EnvironmentChecker()
report = checker.check_all()
print("=== 環境檢測報告 ===\n")
for component, result in report["components"].items():
status_icon = {
"ok": "",
"warning": "",
"error": "",
"not_configured": ""
}.get(result["status"], "?")
print(f"{status_icon} {component.upper()}: {result['status']}")
if result.get("error"):
print(f" 錯誤: {result['error']}")
print(f"\n缺少的配置: {', '.join(report['missing_configs']) or ''}")