""" 環境檢測服務 自動檢測系統所需的所有環境組件 """ import os import socket import subprocess from typing import Dict, Any, List, Optional from datetime import datetime import psycopg2 import requests from sqlalchemy import create_engine, text class EnvironmentChecker: """環境檢測器""" def __init__(self): self.results = {} # ==================== Redis 檢測 ==================== def check_redis(self) -> Dict[str, Any]: """ 檢測 Redis 服務 Returns: { "status": "ok" | "warning" | "error" | "not_configured", "available": bool, "host": str, "port": int, "ping_success": bool, "error": str } """ result = { "status": "not_configured", "available": False, "host": None, "port": None, "ping_success": False, "error": None } # 檢查環境變數 redis_host = os.getenv("REDIS_HOST") redis_port = os.getenv("REDIS_PORT", "6379") if not redis_host: result["error"] = "REDIS_HOST 環境變數未設定" return result result["host"] = redis_host result["port"] = int(redis_port) # 測試連線(需要 redis 套件) try: import redis redis_client = redis.Redis( host=redis_host, port=int(redis_port), password=os.getenv("REDIS_PASSWORD"), db=int(os.getenv("REDIS_DB", "0")), socket_connect_timeout=5, decode_responses=True ) # 測試 PING pong = redis_client.ping() if pong: result["available"] = True result["ping_success"] = True result["status"] = "ok" else: result["status"] = "error" result["error"] = "Redis PING 失敗" redis_client.close() except ImportError: result["status"] = "warning" result["error"] = "redis 套件未安裝(pip install redis)" except Exception as e: result["status"] = "error" result["error"] = f"Redis 連線失敗: {str(e)}" return result def test_redis_connection( self, host: str, port: int, password: Optional[str] = None, db: int = 0 ) -> Dict[str, Any]: """ 測試 Redis 連線(用於初始化時使用者輸入的連線資訊) Returns: { "success": bool, "ping_success": bool, "message": str, "error": str } """ result = { "success": False, "ping_success": False, "message": None, "error": None } try: import redis redis_client = redis.Redis( host=host, port=port, password=password if password else None, db=db, socket_connect_timeout=5, decode_responses=True ) # 測試 PING pong = redis_client.ping() if pong: result["success"] = True result["ping_success"] = True result["message"] = "Redis 連線成功" else: result["error"] = "Redis PING 失敗" redis_client.close() except ImportError: result["error"] = "redis 套件未安裝" except redis.exceptions.AuthenticationError: result["error"] = "Redis 密碼錯誤" except redis.exceptions.ConnectionError as e: result["error"] = f"無法連接到 Redis: {str(e)}" except Exception as e: result["error"] = f"未知錯誤: {str(e)}" return result def check_all(self) -> Dict[str, Any]: """ 檢查所有環境組件 Returns: 完整的檢測報告 """ return { "timestamp": datetime.now().isoformat(), "overall_status": "pending", "components": { "redis": self.check_redis(), "database": self.check_database(), "keycloak": self.check_keycloak(), "mailserver": self.check_mailserver(), "drive": self.check_drive_service(), "traefik": self.check_traefik(), "network": self.check_network(), }, "missing_configs": self.get_missing_configs(), "recommendations": self.get_recommendations() } # ==================== 資料庫檢測 ==================== def check_database(self) -> Dict[str, Any]: """ 檢測 PostgreSQL 資料庫 Returns: { "status": "ok" | "warning" | "error" | "not_configured", "available": bool, "connection_string": str, "version": str, "tables_exist": bool, "tenant_exists": bool, "error": str } """ result = { "status": "not_configured", "available": False, "connection_string": None, "version": None, "tables_exist": False, "tenant_exists": False, "tenant_initialized": False, "error": None } # 1. 檢查環境變數 db_url = os.getenv("DATABASE_URL") if not db_url: result["error"] = "DATABASE_URL 環境變數未設定" return result result["connection_string"] = self._mask_password(db_url) # 2. 測試連線 try: engine = create_engine(db_url) with engine.connect() as conn: # 取得版本 version_result = conn.execute(text("SELECT version()")) version_row = version_result.fetchone() if version_row: result["version"] = version_row[0].split(',')[0] result["available"] = True # 3. 檢查 tenants 表是否存在 try: tenant_check = conn.execute(text( "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'tenants')" )) result["tables_exist"] = tenant_check.scalar() if result["tables_exist"]: # 4. 檢查是否有租戶資料 tenant_count = conn.execute(text("SELECT COUNT(*) FROM tenants")) count = tenant_count.scalar() result["tenant_exists"] = count > 0 if result["tenant_exists"]: # 5. 檢查租戶是否已初始化 init_check = conn.execute(text( "SELECT is_initialized FROM tenants LIMIT 1" )) is_init = init_check.scalar() result["tenant_initialized"] = is_init except Exception as e: result["tables_exist"] = False result["error"] = f"資料表檢查失敗: {str(e)}" # 判斷狀態 if result["tenant_initialized"]: result["status"] = "ok" elif result["tenant_exists"]: result["status"] = "warning" elif result["tables_exist"]: result["status"] = "warning" else: result["status"] = "warning" except Exception as e: result["available"] = False result["status"] = "error" result["error"] = f"資料庫連線失敗: {str(e)}" return result def test_database_connection( self, host: str, port: int, database: str, user: str, password: str ) -> Dict[str, Any]: """ 測試資料庫連線(用於初始化時使用者輸入的連線資訊) Returns: { "success": bool, "version": str, "message": str, "error": str } """ result = { "success": False, "version": None, "message": None, "error": None } try: # 使用 psycopg2 直接測試 conn = psycopg2.connect( host=host, port=port, database=database, user=user, password=password, connect_timeout=5 ) cursor = conn.cursor() cursor.execute("SELECT version()") version = cursor.fetchone()[0] result["version"] = version.split(',')[0] cursor.close() conn.close() result["success"] = True result["message"] = "資料庫連線成功" except psycopg2.OperationalError as e: result["error"] = f"連線失敗: {str(e)}" except Exception as e: result["error"] = f"未知錯誤: {str(e)}" return result # ==================== Keycloak 檢測 ==================== def check_keycloak(self) -> Dict[str, Any]: """ 檢測 Keycloak SSO 服務 Returns: { "status": "ok" | "warning" | "error" | "not_configured", "available": bool, "url": str, "realm": str, "realm_exists": bool, "clients_configured": bool, "error": str } """ result = { "status": "not_configured", "available": False, "url": None, "realm": None, "realm_exists": False, "clients_configured": False, "error": None } # 1. 檢查環境變數 kc_url = os.getenv("KEYCLOAK_URL") kc_realm = os.getenv("KEYCLOAK_REALM") if not kc_url: result["error"] = "KEYCLOAK_URL 環境變數未設定" return result result["url"] = kc_url result["realm"] = kc_realm or "未設定" # 2. 測試 Keycloak 服務是否運行 try: # 測試 health endpoint response = requests.get(f"{kc_url}/health", timeout=5) if response.status_code == 200: result["available"] = True else: result["available"] = False result["error"] = f"Keycloak 服務異常: HTTP {response.status_code}" result["status"] = "error" return result except requests.exceptions.RequestException as e: result["available"] = False result["status"] = "error" result["error"] = f"無法連接到 Keycloak: {str(e)}" return result # 3. 檢查 Realm 是否存在 if kc_realm: try: # 嘗試取得 Realm 的 OpenID Configuration oidc_url = f"{kc_url}/realms/{kc_realm}/.well-known/openid-configuration" response = requests.get(oidc_url, timeout=5) if response.status_code == 200: result["realm_exists"] = True result["status"] = "ok" else: result["realm_exists"] = False result["status"] = "warning" result["error"] = f"Realm '{kc_realm}' 不存在" except Exception as e: result["error"] = f"Realm 檢查失敗: {str(e)}" result["status"] = "warning" else: result["status"] = "warning" result["error"] = "KEYCLOAK_REALM 未設定" return result def test_keycloak_connection( self, url: str, realm: str, admin_username: str, admin_password: str ) -> Dict[str, Any]: """ 測試 Keycloak 連線並驗證管理員權限 Returns: { "success": bool, "realm_exists": bool, "admin_access": bool, "message": str, "error": str } """ result = { "success": False, "realm_exists": False, "admin_access": False, "message": None, "error": None } try: # 1. 測試服務是否運行 (使用根路徑,Keycloak 會返回 302 重定向) health_response = requests.get(f"{url}/", timeout=5, allow_redirects=False) if health_response.status_code not in [200, 302, 303]: result["error"] = "Keycloak 服務未運行" return result # 2. 測試管理員登入 token_url = f"{url}/realms/master/protocol/openid-connect/token" token_data = { "grant_type": "password", "client_id": "admin-cli", "username": admin_username, "password": admin_password } token_response = requests.post(token_url, data=token_data, timeout=10) if token_response.status_code == 200: result["admin_access"] = True access_token = token_response.json().get("access_token") # 3. 檢查 Realm 是否存在 realm_url = f"{url}/admin/realms/{realm}" headers = {"Authorization": f"Bearer {access_token}"} realm_response = requests.get(realm_url, headers=headers, timeout=5) if realm_response.status_code == 200: result["realm_exists"] = True result["success"] = True result["message"] = "Keycloak 連線成功,Realm 存在" elif realm_response.status_code == 404: result["success"] = True result["message"] = "Keycloak 連線成功,但 Realm 不存在(將自動建立)" else: result["error"] = f"Realm 檢查失敗: HTTP {realm_response.status_code}" else: result["error"] = "管理員帳號密碼錯誤" except requests.exceptions.RequestException as e: result["error"] = f"連線失敗: {str(e)}" except Exception as e: result["error"] = f"未知錯誤: {str(e)}" return result # ==================== 郵件伺服器檢測 ==================== def check_mailserver(self) -> Dict[str, Any]: """ 檢測郵件伺服器 (Docker Mailserver) Returns: { "status": "ok" | "warning" | "error" | "not_configured", "available": bool, "ssh_configured": bool, "container_running": bool, "error": str } """ result = { "status": "not_configured", "available": False, "ssh_configured": False, "container_running": False, "error": None } # 檢查 SSH 設定 ssh_host = os.getenv("MAILSERVER_SSH_HOST") ssh_user = os.getenv("MAILSERVER_SSH_USER") container_name = os.getenv("MAILSERVER_CONTAINER_NAME") if not all([ssh_host, ssh_user, container_name]): result["error"] = "郵件伺服器 SSH 設定不完整" return result result["ssh_configured"] = True # 測試 SSH 連線(可選功能) # 注意:這需要 paramiko 套件,且需要謹慎處理安全性 result["status"] = "warning" result["error"] = "郵件伺服器連線測試需要手動驗證" return result # ==================== 雲端硬碟檢測 ==================== def check_drive_service(self) -> Dict[str, Any]: """ 檢測雲端硬碟服務 (Nextcloud) Returns: { "status": "ok" | "warning" | "error" | "not_configured", "available": bool, "url": str, "api_accessible": bool, "error": str } """ result = { "status": "not_configured", "available": False, "url": None, "api_accessible": False, "error": None } drive_url = os.getenv("DRIVE_SERVICE_URL") if not drive_url: result["error"] = "DRIVE_SERVICE_URL 環境變數未設定" return result result["url"] = drive_url try: response = requests.get(f"{drive_url}/status.php", timeout=5) if response.status_code == 200: result["available"] = True result["api_accessible"] = True result["status"] = "ok" else: result["status"] = "warning" result["error"] = f"Drive 服務回應異常: HTTP {response.status_code}" except requests.exceptions.RequestException as e: result["status"] = "error" result["error"] = f"無法連接到 Drive 服務: {str(e)}" return result # ==================== Traefik 檢測 ==================== def check_traefik(self) -> Dict[str, Any]: """ 檢測 Traefik 反向代理 Returns: { "status": "ok" | "warning" | "not_configured", "dashboard_accessible": bool, "error": str } """ result = { "status": "not_configured", "dashboard_accessible": False, "error": "Traefik 檢測未實作(需要 Dashboard URL)" } # 簡化檢測:Traefik 通常在本機運行 # 可以透過檢查 port 80/443 是否被占用來判斷 result["status"] = "ok" result["dashboard_accessible"] = False return result # ==================== 網路檢測 ==================== def check_network(self) -> Dict[str, Any]: """ 檢測網路連通性 Returns: { "status": "ok" | "warning", "dns_resolution": bool, "ports_open": dict, "error": str } """ result = { "status": "ok", "dns_resolution": True, "ports_open": { "80": False, "443": False, "5433": False }, "error": None } # 檢查常用 port 是否開啟 ports_to_check = [80, 443, 5433] for port in ports_to_check: result["ports_open"][str(port)] = self._is_port_open("localhost", port) return result # ==================== 輔助方法 ==================== def _is_port_open(self, host: str, port: int, timeout: int = 2) -> bool: """檢查 port 是否開啟""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((host, port)) sock.close() return result == 0 except: return False def _mask_password(self, connection_string: str) -> str: """遮蔽連線字串中的密碼""" import re return re.sub(r'://([^:]+):([^@]+)@', r'://\1:****@', connection_string) def get_missing_configs(self) -> List[str]: """取得缺少的環境變數""" required_vars = [ "DATABASE_URL", "KEYCLOAK_URL", "KEYCLOAK_REALM", "KEYCLOAK_CLIENT_ID", "KEYCLOAK_CLIENT_SECRET", ] missing = [] for var in required_vars: if not os.getenv(var): missing.append(var) return missing def get_recommendations(self) -> List[str]: """根據檢測結果提供建議""" recommendations = [] # 這裡可以根據檢測結果動態產生建議 if not os.getenv("DATABASE_URL"): recommendations.append("請先設定資料庫連線資訊") if not os.getenv("KEYCLOAK_URL"): recommendations.append("請設定 Keycloak SSO 服務") return recommendations if __name__ == "__main__": # 測試環境檢測 checker = EnvironmentChecker() report = checker.check_all() print("=== 環境檢測報告 ===\n") for component, result in report["components"].items(): status_icon = { "ok": "✓", "warning": "⚠", "error": "✗", "not_configured": "○" }.get(result["status"], "?") print(f"{status_icon} {component.upper()}: {result['status']}") if result.get("error"): print(f" 錯誤: {result['error']}") print(f"\n缺少的配置: {', '.join(report['missing_configs']) or '無'}")