from fastapi import FastAPI, Request, HTTPException, Form, File, UploadFile from fastapi.responses import RedirectResponse, HTMLResponse, JSONResponse from starlette.middleware.sessions import SessionMiddleware from authlib.integrations.starlette_client import OAuth from sqlalchemy import create_engine, text from typing import Optional import redis import json import os import secrets import logging import imaplib import smtplib import email from email.header import decode_header from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime import html import hashlib import base64 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32)) app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY) # Redis 配置 REDIS_HOST = os.getenv("REDIS_HOST", "10.1.0.20") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "DC1qaz2wsx") REDIS_DB = int(os.getenv("REDIS_DB", "2")) redis_client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=REDIS_DB, decode_responses=True ) # Virtual MIS 資料庫配置 DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://admin:DC1qaz2wsx@10.1.0.20:5433/virtual_mis") db_engine = create_engine(DATABASE_URL, pool_pre_ping=True) # Keycloak OAuth 配置 KEYCLOAK_SERVER_URL = os.getenv("KEYCLOAK_SERVER_URL", "https://auth.lab.taipei") KEYCLOAK_REALM = os.getenv("KEYCLOAK_REALM", "vmis-admin") KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "vmis-services") KEYCLOAK_CLIENT_SECRET = os.getenv("KEYCLOAK_CLIENT_SECRET", "VirtualMIS2026ServiceSecret12345") REDIRECT_URI = os.getenv("REDIRECT_URI", "https://webmail.lab.taipei/callback") # 郵件伺服器配置 MAIL_SERVER = os.getenv("MAIL_SERVER", "10.1.0.254") SMTP_SERVER = os.getenv("SMTP_SERVER", "10.1.0.254") oauth = OAuth() oauth.register( name="keycloak", client_id=KEYCLOAK_CLIENT_ID, client_secret=KEYCLOAK_CLIENT_SECRET, server_metadata_url=f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/.well-known/openid-configuration", client_kwargs={"scope": "openid email profile"} ) def get_account_by_sso_uuid(sso_uuid: str): """根據 Keycloak UUID 查詢帳號""" try: with db_engine.connect() as conn: result = conn.execute( text(""" SELECT a.email, a.default_password, a.sso_account, a.account_code FROM accounts a JOIN tenants t ON a.tenant_id = t.id WHERE a.sso_uuid = :sso_uuid AND t.keycloak_realm = :realm LIMIT 1 """), {"sso_uuid": sso_uuid, "realm": KEYCLOAK_REALM} ).fetchone() if result: logger.info(f"Found account: {result[2]} ({result[0]})") return { "email": result[0], "password": result[1], "sso_account": result[2], "account_code": result[3] } logger.warning(f"No account found for sso_uuid: {sso_uuid}") return None except Exception as e: logger.error(f"Database error: {e}") return None def decode_header_value(header_value): """解碼郵件標頭""" if not header_value: return '' decoded = decode_header(header_value) parts = [] for content, charset in decoded: if isinstance(content, bytes): parts.append(content.decode(charset or 'utf-8', errors='replace')) else: parts.append(content) return ''.join(parts) def get_imap_folders(email_addr, password): """取得 IMAP 資料夾列表""" try: mail = imaplib.IMAP4(MAIL_SERVER, 143) mail.login(email_addr, password) # 列出所有資料夾(包括子資料夾) status, folders = mail.list() folder_list = [] if status == 'OK': import re for folder in folders: # 解析資料夾名稱 folder_str = folder.decode('utf-8', errors='replace') logger.info(f"Raw folder: {folder_str}") # 格式可能是: # 1. (\HasNoChildren) "." "INBOX" (有引號) # 2. (\HasNoChildren) "." INBOX (無引號) # 3. (\HasNoChildren \Sent) "." Sent (無引號 + 多個標誌) # 改進的正則表達式:匹配有引號或無引號的資料夾名稱 # 格式: (flags) "delimiter" folder_name 或 "folder_name" match = re.search(r'\([^)]*\)\s+"[^"]*"\s+(.+)$', folder_str) if match: folder_name = match.group(1).strip() # 移除可能的引號 folder_name = folder_name.strip('"') if folder_name: folder_list.append(folder_name) logger.info(f"Parsed folder: {folder_name}") else: logger.warning(f"Failed to parse folder: {folder_str}") mail.logout() # 記錄所有找到的資料夾 logger.info(f"Found folders: {folder_list}") # 確保至少有 INBOX if not folder_list: folder_list = ['INBOX'] return folder_list except Exception as e: logger.error(f"IMAP folder list error: {e}") import traceback traceback.print_exc() return ['INBOX'] # 至少返回收件匣 def get_imap_messages(email_addr, password, folder='INBOX', limit=50): """從 IMAP 取得郵件列表(含附件、標記等資訊)""" try: # 連接 IMAP mail = imaplib.IMAP4(MAIL_SERVER, 143) mail.login(email_addr, password) # 選擇資料夾 status, _ = mail.select(folder) if status != 'OK': logger.warning(f"Failed to select folder: {folder}, using INBOX") mail.select('INBOX') # 搜尋所有郵件 status, messages = mail.search(None, 'ALL') mail_ids = messages[0].split() # 取得最新的 N 封 mail_ids = mail_ids[-limit:] mail_ids.reverse() messages_list = [] for mail_id in mail_ids: # 取得郵件 FLAGS 和 RFC822 status, msg_data = mail.fetch(mail_id, '(FLAGS RFC822)') flags = [] msg_bytes = None for response_part in msg_data: if isinstance(response_part, bytes): # 解析 FLAGS flags_match = response_part.decode('utf-8', errors='replace') if '\\Seen' in flags_match: flags.append('seen') if '\\Flagged' in flags_match: flags.append('flagged') if '\\Answered' in flags_match: flags.append('answered') elif isinstance(response_part, tuple): msg_bytes = response_part[1] if msg_bytes: msg = email.message_from_bytes(msg_bytes) # 解析主旨 subject = decode_header_value(msg.get('Subject', '')) # 解析寄件者 from_ = msg.get('From', '') # 解析日期 date_str = msg.get('Date', '') # 檢查是否有附件 has_attachment = False if msg.is_multipart(): for part in msg.walk(): content_disposition = part.get('Content-Disposition', '') if 'attachment' in content_disposition: has_attachment = True break # 是否未讀 is_unread = 'seen' not in flags messages_list.append({ 'id': mail_id.decode(), 'subject': subject or '(無主旨)', 'from': from_, 'date': date_str, 'has_attachment': has_attachment, 'is_unread': is_unread, 'is_flagged': 'flagged' in flags, 'is_answered': 'answered' in flags }) mail.close() mail.logout() return messages_list except Exception as e: logger.error(f"IMAP error: {e}") import traceback traceback.print_exc() return [] def get_mail_by_id(email_addr, password, mail_id, folder='INBOX'): """讀取單封郵件內容""" try: mail = imaplib.IMAP4(MAIL_SERVER, 143) mail.login(email_addr, password) mail.select(folder) status, msg_data = mail.fetch(mail_id, '(RFC822)') for response_part in msg_data: if isinstance(response_part, tuple): msg = email.message_from_bytes(response_part[1]) # 解析標頭 subject = decode_header_value(msg.get('Subject', '')) from_ = msg.get('From', '') to_ = msg.get('To', '') date_str = msg.get('Date', '') # 解析郵件內容和附件 body = "" attachments = [] if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = part.get('Content-Disposition', '') # 檢查是否為附件 if 'attachment' in content_disposition: filename = part.get_filename() if filename: # 解碼檔名 decoded_filename = decode_header_value(filename) # 取得檔案大小 file_size = len(part.get_payload(decode=True)) attachments.append({ 'filename': decoded_filename, 'size': file_size, 'content_type': content_type }) # 取得郵件本文 elif content_type == "text/plain" and not body: body = part.get_payload(decode=True).decode('utf-8', errors='replace') elif content_type == "text/html" and not body: body = part.get_payload(decode=True).decode('utf-8', errors='replace') else: body = msg.get_payload(decode=True).decode('utf-8', errors='replace') mail.close() mail.logout() return { 'id': mail_id, 'subject': subject or '(無主旨)', 'from': from_, 'to': to_, 'date': date_str, 'body': body, 'attachments': attachments } mail.close() mail.logout() return None except Exception as e: logger.error(f"IMAP read error: {e}") import traceback traceback.print_exc() return None def send_email_smtp(from_addr, password, to_addr, subject, body, content_type='plain', attachments=None): """使用 SMTP 發送郵件(支援附件)""" try: # 建立郵件 msg = MIMEMultipart('mixed') if attachments else MIMEMultipart('alternative') msg['From'] = from_addr msg['To'] = to_addr msg['Subject'] = subject # 建立郵件內容部分 if attachments: # 有附件時,先建立 alternative 部分 msg_alternative = MIMEMultipart('alternative') if content_type == 'html': text_content = body.replace('
', '\n').replace('<[^>]*>', '') msg_alternative.attach(MIMEText(text_content, 'plain', 'utf-8')) msg_alternative.attach(MIMEText(body, 'html', 'utf-8')) else: msg_alternative.attach(MIMEText(body, 'plain', 'utf-8')) msg.attach(msg_alternative) else: # 無附件時,直接附加內容 if content_type == 'html': text_content = body.replace('
', '\n').replace('<[^>]*>', '') msg.attach(MIMEText(text_content, 'plain', 'utf-8')) msg.attach(MIMEText(body, 'html', 'utf-8')) else: msg.attach(MIMEText(body, 'plain', 'utf-8')) # 附加檔案 if attachments: from email.mime.base import MIMEBase from email import encoders import mimetypes for attachment in attachments: # 猜測 MIME 類型 mime_type, _ = mimetypes.guess_type(attachment['filename']) if mime_type is None: mime_type = 'application/octet-stream' main_type, sub_type = mime_type.split('/', 1) part = MIMEBase(main_type, sub_type) part.set_payload(attachment['content']) encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename="{attachment["filename"]}"') msg.attach(part) # 連接 SMTP (內部網路,不使用 TLS 和認證) smtp = smtplib.SMTP(SMTP_SERVER, 25, timeout=10) # 內部郵件伺服器不需要認證 smtp.send_message(msg) smtp.quit() attachment_count = len(attachments) if attachments else 0 logger.info(f"Email sent from {from_addr} to {to_addr} (type: {content_type}, attachments: {attachment_count})") return True except Exception as e: logger.error(f"SMTP error: {e}") import traceback traceback.print_exc() return False @app.get("/") async def root(request: Request, folder: str = "INBOX"): session_id = request.session.get("session_id") if not session_id: logger.info("No session, redirecting to Keycloak") return await oauth.keycloak.authorize_redirect(request, REDIRECT_URI) user_data_json = redis_client.get(f"webmail:session:{session_id}") if not user_data_json: logger.info("Session expired, redirecting to login") request.session.clear() return await oauth.keycloak.authorize_redirect(request, REDIRECT_URI) user_data = json.loads(user_data_json) sso_uuid = user_data.get("sub") name = user_data.get("name") preferred_username = user_data.get("preferred_username") # 從資料庫取得郵件帳號密碼 credentials = get_account_by_sso_uuid(sso_uuid) if not credentials: return HTMLResponse(f""" 錯誤

找不到郵件帳號

SSO: {preferred_username}

登出 """) mail_email = credentials['email'] mail_password = credentials['password'] # 取得資料夾列表 folders = get_imap_folders(mail_email, mail_password) # 取得郵件列表 messages = get_imap_messages(mail_email, mail_password, folder) # 取得使用者主題設定 theme = redis_client.hget(f"webmail:settings:{session_id}", "theme") or "default" # 定義主題樣式 themes = { "default": { "bg": "#f5f5f5", "header_bg": "#1a73e8", "header_text": "white", "container_bg": "white", "item_hover": "#f8f9fa", "text_secondary": "#5f6368", "btn_bg": "#1a73e8", "btn_hover": "#1557b0" }, "dark": { "bg": "#1f1f1f", "header_bg": "#2d2d2d", "header_text": "#e8eaed", "container_bg": "#2d2d2d", "item_hover": "#3c4043", "text_secondary": "#9aa0a6", "btn_bg": "#8ab4f8", "btn_hover": "#aecbfa" }, "green": { "bg": "#e8f5e9", "header_bg": "#2e7d32", "header_text": "white", "container_bg": "white", "item_hover": "#c8e6c9", "text_secondary": "#558b2f", "btn_bg": "#43a047", "btn_hover": "#66bb6a" }, "purple": { "bg": "#f3e5f5", "header_bg": "#6a1b9a", "header_text": "white", "container_bg": "white", "item_hover": "#e1bee7", "text_secondary": "#7b1fa2", "btn_bg": "#8e24aa", "btn_hover": "#ab47bc" } } current_theme = themes.get(theme, themes["default"]) # 提取主題顏色變數(避免 f-string 中使用字典語法) theme_bg = current_theme['bg'] theme_header_bg = current_theme['header_bg'] theme_header_text = current_theme['header_text'] theme_btn_bg = current_theme['btn_bg'] theme_btn_hover = current_theme['btn_hover'] theme_item_hover = current_theme['item_hover'] theme_container_bg = current_theme['container_bg'] theme_text_secondary = current_theme['text_secondary'] # 計算條件表達式的值 theme_text_color = theme_header_text if theme == 'dark' else '#202124' # 產生資料夾列表 HTML def get_folder_icon(folder_name): """根據資料夾名稱判斷圖示""" folder_lower = folder_name.lower() if 'inbox' in folder_lower: return '📥' elif 'sent' in folder_lower or 'sent items' in folder_lower or 'sent mail' in folder_lower: return '📤' elif 'draft' in folder_lower: return '📝' elif 'trash' in folder_lower or 'deleted' in folder_lower: return '🗑️' elif 'junk' in folder_lower or 'spam' in folder_lower: return '🚫' elif 'archive' in folder_lower: return '📦' elif 'star' in folder_lower or 'flagged' in folder_lower: return '⭐' elif 'important' in folder_lower: return '❗' else: return '📁' def get_folder_display_name(folder_name): """根據資料夾名稱轉換為中文顯示名稱""" folder_lower = folder_name.lower() # 完全匹配 exact_match = { 'INBOX': '收件匣', 'Sent': '寄件備份', 'Sent Items': '寄件備份', 'Sent Mail': '寄件備份', 'Drafts': '草稿', 'Trash': '垃圾桶', 'Deleted Items': '垃圾桶', 'Junk': '垃圾郵件', 'Spam': '垃圾郵件', 'Archive': '封存', 'Starred': '已加星號', 'Flagged': '已標幟', 'Important': '重要郵件' } if folder_name in exact_match: return exact_match[folder_name] # 部分匹配(處理層級資料夾,例如 INBOX.Sent) if 'sent' in folder_lower: return '寄件備份' elif 'draft' in folder_lower: return '草稿' elif 'trash' in folder_lower or 'deleted' in folder_lower: return '垃圾桶' elif 'junk' in folder_lower or 'spam' in folder_lower: return '垃圾郵件' elif 'archive' in folder_lower: return '封存' elif 'star' in folder_lower or 'flagged' in folder_lower: return '已加星號' # 顯示原始名稱(處理層級,例如將 . 替換為 /) return folder_name.replace('.', ' / ') def get_folder_indent_level(folder_name): """計算資料夾的層級(用於縮排)""" return folder_name.count('.') # 資料夾排序:收件匣和寄件備份在最上方 def get_folder_priority(folder_name): """定義資料夾顯示優先順序(數字越小越前面)""" folder_lower = folder_name.lower() if folder_name == 'INBOX' or 'inbox' in folder_lower: return 1 # 收件匣最優先 elif 'sent' in folder_lower: return 2 # 寄件備份第二 elif 'draft' in folder_lower: return 3 # 草稿第三 elif 'trash' in folder_lower or 'deleted' in folder_lower: return 98 # 垃圾桶倒數第二 elif 'junk' in folder_lower or 'spam' in folder_lower: return 99 # 垃圾郵件最後 else: return 50 # 其他資料夾中間 # 對資料夾進行排序 sorted_folders = sorted(folders, key=get_folder_priority) folders_html = "" for f in sorted_folders: icon = get_folder_icon(f) display_name = get_folder_display_name(f) indent_level = get_folder_indent_level(f) active_class = 'active' if f == folder else '' # 計算縮排 indent_style = f"padding-left: {16 + indent_level * 20}px;" if indent_level > 0 else "" folders_html += f"""
  • {icon} {display_name}
  • """ # 產生郵件列表 HTML mail_items_html = "" if messages: for m in messages: # 智慧型標記 markers = [] if m['is_flagged']: markers.append('') else: markers.append('') # 重要標記(這裡先用 flagged 模擬,之後可擴展) markers.append('') if m['is_answered']: markers.append('↩️') markers_html = ''.join(markers) # 附件圖示 attachment_icon = '📎' if m['has_attachment'] else '' # 未讀樣式 unread_class = 'unread' if m['is_unread'] else '' mail_items_html += f"""
  • {markers_html}
    {m['subject']} {attachment_icon}
    {m['date']}
  • """ else: mail_items_html = '
    目前沒有郵件
    ' # 顯示郵件列表(Gmail 風格兩欄式布局) return HTMLResponse(f""" WebMail

    📧 WebMail

    預設藍色
    深色模式
    清新綠色
    優雅紫色

    {get_folder_display_name(folder)}

      {mail_items_html}
    """) @app.get("/api/mail/{mail_id}") async def get_mail_api(request: Request, mail_id: str, folder: str = "INBOX"): """API 端點:取得單封郵件內容(JSON 格式供 Modal 使用)""" session_id = request.session.get("session_id") if not session_id: return {"error": "未登入"} user_data_json = redis_client.get(f"webmail:session:{session_id}") if not user_data_json: return {"error": "Session 過期"} user_data = json.loads(user_data_json) sso_uuid = user_data.get("sub") credentials = get_account_by_sso_uuid(sso_uuid) if not credentials: return {"error": "找不到郵件帳號"} mail_email = credentials['email'] mail_password = credentials['password'] # 讀取郵件 mail_data = get_mail_by_id(mail_email, mail_password, mail_id, folder) if not mail_data: return {"error": "找不到郵件"} return { "subject": mail_data['subject'], "from": mail_data['from'], "to": mail_data['to'], "date": mail_data['date'], "body": mail_data['body'], "attachments": mail_data.get('attachments', []) } @app.post("/api/delete-mails") async def delete_mails_api(request: Request): """API 端點:批次刪除郵件""" session_id = request.session.get("session_id") if not session_id: return {"success": False, "error": "未登入"} user_data_json = redis_client.get(f"webmail:session:{session_id}") if not user_data_json: return {"success": False, "error": "Session 過期"} user_data = json.loads(user_data_json) sso_uuid = user_data.get("sub") credentials = get_account_by_sso_uuid(sso_uuid) if not credentials: return {"success": False, "error": "找不到郵件帳號"} mail_email = credentials['email'] mail_password = credentials['password'] # 取得請求資料 try: data = await request.json() mail_ids = data.get('mail_ids', []) folder = data.get('folder', 'INBOX') if not mail_ids: return {"success": False, "error": "沒有選擇郵件"} # 連接 IMAP 刪除郵件 mail = imaplib.IMAP4(MAIL_SERVER, 143) mail.login(mail_email, mail_password) mail.select(folder) # 標記郵件為刪除 for mail_id in mail_ids: mail.store(mail_id, '+FLAGS', '\\Deleted') # 永久刪除 mail.expunge() mail.close() mail.logout() return {"success": True, "deleted_count": len(mail_ids)} except Exception as e: logger.error(f"Delete mail error: {e}") import traceback traceback.print_exc() return {"success": False, "error": str(e)} @app.get("/callback") async def callback(request: Request): try: token = await oauth.keycloak.authorize_access_token(request) userinfo = token.get("userinfo") if not userinfo: raise HTTPException(status_code=401, detail="No userinfo") sso_uuid = userinfo.get("sub") preferred_username = userinfo.get("preferred_username") email_addr = userinfo.get("email") name = userinfo.get("name", preferred_username) if not sso_uuid: raise HTTPException(status_code=400, detail="No user ID found") logger.info(f"Keycloak callback: {preferred_username} (UUID: {sso_uuid})") session_id = secrets.token_urlsafe(32) user_data = { "sub": sso_uuid, "preferred_username": preferred_username, "email": email_addr, "name": name } redis_client.setex(f"webmail:session:{session_id}", 8*60*60, json.dumps(user_data)) request.session["session_id"] = session_id logger.info(f"Session created for {preferred_username}") return RedirectResponse(url="/") except Exception as e: logger.error(f"Callback error: {e}") import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.get("/compose") async def compose_form(request: Request): session_id = request.session.get("session_id") if not session_id: return RedirectResponse(url="/") user_data_json = redis_client.get(f"webmail:session:{session_id}") if not user_data_json: return RedirectResponse(url="/") user_data = json.loads(user_data_json) name = user_data.get("name") return HTMLResponse(f""" 撰寫郵件

    📧 撰寫新郵件

    {name}
    ← 返回
    純文字
    富文本
    """) @app.post("/send") async def send_mail( request: Request, to: str = Form(...), subject: str = Form(...), body: str = Form(...), content_type: str = Form('plain'), attachments: list[UploadFile] = File(default=[]) ): session_id = request.session.get("session_id") if not session_id: return JSONResponse({"error": "未登入"}, status_code=401) user_data_json = redis_client.get(f"webmail:session:{session_id}") if not user_data_json: return JSONResponse({"error": "Session 已過期"}, status_code=401) user_data = json.loads(user_data_json) sso_uuid = user_data.get("sub") credentials = get_account_by_sso_uuid(sso_uuid) if not credentials: return JSONResponse({"error": "找不到郵件帳號"}, status_code=404) mail_email = credentials['email'] mail_password = credentials['password'] # 處理附件 attachment_list = [] if attachments: for upload_file in attachments: if upload_file.filename: # 確保有檔案名稱 content = await upload_file.read() attachment_list.append({ 'filename': upload_file.filename, 'content': content }) # 發送郵件(支援 HTML 格式和附件) success = send_email_smtp(mail_email, mail_password, to, subject, body, content_type, attachment_list if attachment_list else None) if success: return JSONResponse({"success": True, "message": "郵件發送成功"}) else: return JSONResponse({"success": False, "error": "郵件發送失敗"}, status_code=500) @app.post("/theme") async def set_theme(request: Request, theme: str = Form(...)): """設定使用者布景主題""" session_id = request.session.get("session_id") if session_id: # 儲存主題設定到 Redis redis_client.hset(f"webmail:settings:{session_id}", "theme", theme) return RedirectResponse(url="/", status_code=303) @app.get("/logout") async def logout(request: Request): session_id = request.session.get("session_id") if session_id: redis_client.delete(f"webmail:session:{session_id}") redis_client.delete(f"webmail:settings:{session_id}") request.session.clear() return HTMLResponse("""

    已登出

    重新登入""") @app.get("/health") async def health(): return {"status": "ok"} # ====================================================================== # Unified inbox rendering function # ====================================================================== def get_tenant_by_code(tenant_code: str): """根據租戶代碼查詢租戶資訊""" try: with db_engine.connect() as conn: result = conn.execute( text(""" SELECT id, code, name, domain, keycloak_realm FROM tenants WHERE code = :tenant_code AND is_active = true LIMIT 1 """), {"tenant_code": tenant_code} ).fetchone() if result: return { "id": result[0], "code": result[1], "name": result[2], "domain": result[3], "keycloak_realm": result[4] } return None except Exception as e: logger.error(f"Database error: {e}") return None def generate_pkce_pair(): """生成 PKCE code_verifier 和 code_challenge""" code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=") code_challenge = base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode("utf-8")).digest() ).decode("utf-8").rstrip("=") return code_verifier, code_challenge def get_account_by_sso_uuid_and_realm(sso_uuid: str, realm: str): """根據 Keycloak UUID 和 Realm 查詢帳號""" try: with db_engine.connect() as conn: result = conn.execute( text(""" SELECT a.email, a.default_password, a.sso_account, a.account_code FROM accounts a JOIN tenants t ON a.tenant_id = t.id WHERE a.sso_uuid = :sso_uuid AND t.keycloak_realm = :realm LIMIT 1 """), {"sso_uuid": sso_uuid, "realm": realm} ).fetchone() if result: logger.info(f"Found account: {result[2]} ({result[0]})") return { "email": result[0], "password": result[1], "sso_account": result[2], "account_code": result[3] } logger.warning(f"No account found for sso_uuid: {sso_uuid} in realm: {realm}") return None except Exception as e: logger.error(f"Database error: {e}") return None # ===== 租戶登入頁面 ===== def render_inbox( request: Request, folder: str, mail_email: str, mail_password: str, username: str, name: str = None, session_id: str = None, tenant_code: str = None, is_management_tenant: bool = False ): """ 統一的收件匣渲染函數 參數: - is_management_tenant: True = 管理租戶 (根路徑), False = 一般租戶 - tenant_code: 租戶代碼 (一般租戶必須提供) - session_id: Session ID (用於主題設定) """ try: # 取得資料夾列表 folders = get_imap_folders(mail_email, mail_password) # 取得郵件列表 messages = get_imap_messages(mail_email, mail_password, folder) # 取得使用者主題設定 theme = redis_client.hget(f"webmail:settings:{session_id}", "theme") or "default" # 定義主題樣式 themes = { "default": { "bg": "#f5f5f5", "header_bg": "#1a73e8", "header_text": "white", "container_bg": "white", "item_hover": "#f8f9fa", "text_secondary": "#5f6368", "btn_bg": "#1a73e8", "btn_hover": "#1557b0" }, "dark": { "bg": "#1f1f1f", "header_bg": "#2d2d2d", "header_text": "#e8eaed", "container_bg": "#2d2d2d", "item_hover": "#3c4043", "text_secondary": "#9aa0a6", "btn_bg": "#8ab4f8", "btn_hover": "#aecbfa" }, "green": { "bg": "#e8f5e9", "header_bg": "#2e7d32", "header_text": "white", "container_bg": "white", "item_hover": "#c8e6c9", "text_secondary": "#558b2f", "btn_bg": "#43a047", "btn_hover": "#66bb6a" }, "purple": { "bg": "#f3e5f5", "header_bg": "#6a1b9a", "header_text": "white", "container_bg": "white", "item_hover": "#e1bee7", "text_secondary": "#7b1fa2", "btn_bg": "#8e24aa", "btn_hover": "#ab47bc" } } current_theme = themes.get(theme, themes["default"]) # 提取主題顏色變數(避免 f-string 中使用字典語法) theme_bg = current_theme['bg'] theme_header_bg = current_theme['header_bg'] theme_header_text = current_theme['header_text'] theme_btn_bg = current_theme['btn_bg'] theme_btn_hover = current_theme['btn_hover'] theme_item_hover = current_theme['item_hover'] theme_container_bg = current_theme['container_bg'] theme_text_secondary = current_theme['text_secondary'] # 計算條件表達式的值 theme_text_color = theme_header_text if theme == 'dark' else '#202124' # 資料夾處理函數 def get_folder_icon(folder_name): folder_lower = folder_name.lower() if 'inbox' in folder_lower: return '📥' elif 'sent' in folder_lower or 'sent items' in folder_lower or 'sent mail' in folder_lower: return '📤' elif 'draft' in folder_lower: return '📝' elif 'trash' in folder_lower or 'deleted' in folder_lower: return '🗑️' elif 'junk' in folder_lower or 'spam' in folder_lower: return '🚫' elif 'archive' in folder_lower: return '📦' elif 'star' in folder_lower or 'flagged' in folder_lower: return '⭐' elif 'important' in folder_lower: return '❗' else: return '📁' def get_folder_display_name(folder_name): folder_lower = folder_name.lower() exact_match = { 'INBOX': '收件匣', 'Sent': '寄件備份', 'Sent Items': '寄件備份', 'Sent Mail': '寄件備份', 'Drafts': '草稿', 'Trash': '垃圾桶', 'Deleted Items': '垃圾桶', 'Junk': '垃圾郵件', 'Spam': '垃圾郵件', 'Archive': '封存', 'Starred': '已加星號', 'Flagged': '已標幟', 'Important': '重要郵件' } if folder_name in exact_match: return exact_match[folder_name] if 'sent' in folder_lower: return '寄件備份' elif 'draft' in folder_lower: return '草稿' elif 'trash' in folder_lower or 'deleted' in folder_lower: return '垃圾桶' elif 'junk' in folder_lower or 'spam' in folder_lower: return '垃圾郵件' elif 'archive' in folder_lower: return '封存' elif 'star' in folder_lower or 'flagged' in folder_lower: return '已加星號' return folder_name.replace('.', ' / ') def get_folder_priority(folder_name): folder_lower = folder_name.lower() if folder_name == 'INBOX' or 'inbox' in folder_lower: return 1 elif 'sent' in folder_lower: return 2 elif 'draft' in folder_lower: return 3 elif 'trash' in folder_lower or 'deleted' in folder_lower: return 98 elif 'junk' in folder_lower or 'spam' in folder_lower: return 99 else: return 50 # 排序資料夾 sorted_folders = sorted(folders, key=get_folder_priority) # 產生資料夾列表 HTML - 根據租戶類型使用不同的 URL folders_html = "" for f in sorted_folders: icon = get_folder_icon(f) display_name = get_folder_display_name(f) indent_level = f.count('.') active_class = 'active' if f == folder else '' indent_style = f"padding-left: {16 + indent_level * 20}px;" if indent_level > 0 else "" # 根據租戶類型生成 URL if is_management_tenant: folder_url = f"/?folder={f}" else: folder_url = f"/{tenant_code}/inbox?folder={f}" folders_html += f"""
  • {icon} {display_name}
  • """ # 產生郵件列表 HTML mail_items_html = "" if messages: for m in messages: markers = [] if m['is_flagged']: markers.append('') else: markers.append(f'') markers.append(f'') if m['is_answered']: markers.append('↩️') markers_html = ''.join(markers) attachment_icon = '📎' if m['has_attachment'] else '' unread_class = 'unread' if m['is_unread'] else '' mail_items_html += f"""
  • {markers_html}
    {html.escape(m['subject'])} {attachment_icon}
    {html.escape(m['date'])}
  • """ else: mail_items_html = '
    目前沒有郵件
    ' # 根據租戶類型設定頁面標題和 URL if is_management_tenant: page_title = "WebMail" compose_url = "/compose" logout_url = "/logout" theme_form_action = "/theme" api_mail_url = "/api/mail" api_delete_url = "/api/delete-mails" sidebar_storage_key = "sidebar_width" else: page_title = f"WebMail - {tenant_code}" compose_url = f"/{tenant_code}/compose" logout_url = f"/{tenant_code}/logout" theme_form_action = f"/{tenant_code}/theme" api_mail_url = f"/{tenant_code}/api/mail" api_delete_url = f"/{tenant_code}/api/delete-mails" sidebar_storage_key = f"sidebar_width_{tenant_code}" # 撰寫按鈕 HTML (暫時使用換頁方式,待後續整合 Modal) compose_btn_html = f'✍️ 撰寫' # 使用者顯示名稱 display_name = name if name else username # 渲染完整的 HTML 頁面 return HTMLResponse(f""" {page_title}

    📧 {page_title}

    {compose_btn_html}
    預設藍色
    深色模式
    清新綠色
    優雅紫色

    {get_folder_display_name(folder)}

      {mail_items_html}
    """) except Exception as e: logger.error(f"Error loading inbox: {e}") import traceback traceback.print_exc() error_page_title = f"載入失敗 - {tenant_code}" if tenant_code else "載入失敗" back_url = f"/{tenant_code}" if tenant_code else "/" return HTMLResponse( content=f""" {error_page_title}

    ❌ 載入收件匣失敗

    使用者: {username}

    {html.escape(str(e))}
    ← 返回登入頁面
    """, status_code=500 ) # ====================================================================== # Unified compose form function # ====================================================================== def render_compose_form(name: str, back_url: str, send_url: str, tenant_code: str = None): """統一的撰寫郵件頁面渲染函數""" page_title = f"撰寫郵件 - {tenant_code}" if tenant_code else "撰寫郵件" return HTMLResponse(f""" {page_title}

    📧 撰寫新郵件

    {name}
    ← 返回
    純文字
    富文本編輯
    """) # ====================================================================== # Tenant routes # ====================================================================== @app.get("/{tenant_code}") async def tenant_login_page(tenant_code: str, request: Request): """ 租戶登入頁面 - 自動導向 Keycloak URL: https://webmail.lab.taipei/porsche1 """ # 查詢租戶資訊 tenant = get_tenant_by_code(tenant_code) if not tenant: return HTMLResponse( content=f""" 租戶不存在

    租戶不存在

    找不到租戶「{tenant_code}」
    請確認網址是否正確

    """, status_code=404 ) # 生成 PKCE code_verifier, code_challenge = generate_pkce_pair() # 儲存 code_verifier 到 Redis pkce_key = f"pkce:{tenant_code}:{secrets.token_urlsafe(16)}" redis_client.setex(pkce_key, 600, code_verifier) logger.info(f"Generated PKCE for {tenant_code}: key={pkce_key}") # 建立 Keycloak 認證 URL (含 PKCE) realm = tenant['keycloak_realm'] client_id = "webmail" redirect_uri = f"https://webmail.lab.taipei/{tenant_code}/callback" auth_url = ( f"{KEYCLOAK_SERVER_URL}/realms/{realm}/protocol/openid-connect/auth" f"?client_id={client_id}" f"&redirect_uri={redirect_uri}" f"&response_type=code" f"&scope=openid+email+profile" f"&state={pkce_key}" f"&code_challenge={code_challenge}" f"&code_challenge_method=S256" ) logger.info(f"Redirecting {tenant_code} to Keycloak: {auth_url}") # 重定向到 Keycloak return RedirectResponse(url=auth_url) # ===== 租戶 Callback ===== @app.get("/{tenant_code}/callback") async def tenant_callback( tenant_code: str, code: Optional[str] = None, state: Optional[str] = None, error: Optional[str] = None, error_description: Optional[str] = None, request: Request = None ): """ Keycloak 回調處理 - Token 交換 (含 PKCE 驗證) URL: https://webmail.lab.taipei/porsche1/callback?code=xxx&state=pkce:xxx """ # 錯誤處理 if error: logger.error(f"Keycloak error: {error} - {error_description}") return JSONResponse(content={"detail": f"認證錯誤: {error_description}"}, status_code=400) if not code: raise HTTPException(status_code=400, detail="Missing authorization code") # PKCE 驗證:從 state 取得 pkce_key 並從 Redis 取得 code_verifier pkce_key = state # state 格式: pkce:{tenant_code}:{random} logger.info(f"Attempting to retrieve PKCE key: {pkce_key}") code_verifier = redis_client.get(pkce_key) if not code_verifier: logger.error(f"PKCE code_verifier not found in Redis: {pkce_key}") logger.error(f"Possible reasons: 1) PKCE expired (>10min), 2) Used old login link, 3) Redis connection issue") raise HTTPException(status_code=400, detail="PKCE verification failed: code_verifier not found") # Redis client 已設定 decode_responses=True,所以 code_verifier 已經是字串 logger.info(f"Retrieved code_verifier from Redis: key={pkce_key}, verifier={code_verifier[:10]}...") try: # 1. 查詢租戶資訊 tenant = get_tenant_by_code(tenant_code) if not tenant: raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_code}") realm = tenant['keycloak_realm'] logger.info(f"Processing callback for tenant: {tenant_code}, realm: {realm}") # 2. 交換 Token (使用 PKCE) logger.info(f"Exchanging authorization code for access token") token_url = f"{KEYCLOAK_SERVER_URL}/realms/{realm}/protocol/openid-connect/token" redirect_uri = f"https://webmail.lab.taipei/{tenant_code}/callback" token_data = { "grant_type": "authorization_code", "client_id": "webmail", "code": code, "redirect_uri": redirect_uri, "code_verifier": code_verifier } import requests from urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) token_response = requests.post(token_url, data=token_data, verify=False, timeout=10) if token_response.status_code != 200: logger.error(f"Token exchange failed: {token_response.status_code} - {token_response.text}") raise HTTPException(status_code=400, detail=f"Token exchange failed: {token_response.text}") token_result = token_response.json() access_token = token_result.get("access_token") # 刪除已使用的 PKCE key redis_client.delete(pkce_key) logger.info(f"✓ Access token received, PKCE key deleted: {pkce_key}") # 刪除已使用的 PKCE key redis_client.delete(pkce_key) logger.info(f"✓ Access token received, PKCE key deleted: {pkce_key}") # 刪除已使用的 PKCE key redis_client.delete(pkce_key) logger.info(f"✓ Access token received, PKCE key deleted: {pkce_key}") refresh_token = token_result.get("refresh_token") if not access_token: raise HTTPException(status_code=400, detail="No access token received") logger.info(f"✓ Access token received") # 3. 取得使用者資訊 logger.info("Fetching user info from Keycloak") userinfo_url = f"{KEYCLOAK_SERVER_URL}/realms/{realm}/protocol/openid-connect/userinfo" headers = {"Authorization": f"Bearer {access_token}"} userinfo_response = requests.get(userinfo_url, headers=headers, verify=False, timeout=10) if userinfo_response.status_code != 200: logger.error(f"Userinfo fetch failed: {userinfo_response.status_code} - {userinfo_response.text}") raise HTTPException(status_code=400, detail="Failed to fetch user info") user_info = userinfo_response.json() sso_uuid = user_info.get("sub") username = user_info.get("preferred_username") email = user_info.get("email") logger.info(f"✓ User authenticated: {username} (email: {email}, uuid: {sso_uuid})") # 4. 查詢郵件帳號 logger.info(f"Looking up mail account for SSO UUID: {sso_uuid}") account = get_account_by_sso_uuid_and_realm(sso_uuid, realm) if not account: logger.error(f"No mail account found for SSO UUID: {sso_uuid} in realm: {realm}") return HTMLResponse( content=f""" 郵件帳號不存在

    ❌ 郵件帳號不存在

    使用者: {username}

    租戶: {tenant_code}

    您的帳號尚未開通郵件服務

    請聯絡系統管理員

    ← 返回登入頁面
    """, status_code=400 ) logger.info(f"✓ Mail account found: {account['email']}") # 5. 建立 Session logger.info(f"Creating session for user: {username}") # 生成 session_id import secrets session_id = secrets.token_urlsafe(32) # 將使用者資料存入 Redis session_data = { "session_id": session_id, "email": account['email'], "password": account['password'], "username": username, "sso_uuid": sso_uuid, "realm": realm, "tenant_code": tenant_code, "access_token": access_token, "refresh_token": refresh_token, "account_code": account['account_code'] } redis_client.setex( f"webmail:tenant_session:{session_id}", 3600 * 24, # 24 小時 json.dumps(session_data) ) logger.info(f"✓ Session created: {session_id}") # 6. 重定向到收件匣,並設定租戶專屬 Cookie logger.info(f"Redirecting to inbox: /{tenant_code}/inbox") response = RedirectResponse(url=f"/{tenant_code}/inbox", status_code=302) # 設定租戶專屬的 Cookie (HttpOnly, Secure, SameSite) cookie_name = f"webmail_session_{tenant_code}" response.set_cookie( key=cookie_name, value=session_id, max_age=86400, # 24 小時 httponly=True, samesite='lax', path=f"/{tenant_code}" # Cookie 只在該租戶路徑下有效 ) return response except HTTPException: raise except Exception as e: logger.error(f"Callback error: {e}") import traceback traceback.print_exc() return HTMLResponse( content=f""" 登入失敗

    ❌ 登入處理失敗

    租戶: {tenant_code}

    無法完成登入流程

    {str(e)}
    ← 返回登入頁面
    """, status_code=500 ) @app.get("/{tenant_code}/compose") async def tenant_compose_form(tenant_code: str, request: Request): """一般租戶撰寫郵件""" # 從租戶專屬 Cookie 讀取 session_id cookie_name = f"webmail_session_{tenant_code}" tenant_session_id = request.cookies.get(cookie_name) if not tenant_session_id: return RedirectResponse(url=f"/{tenant_code}") session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: return RedirectResponse(url=f"/{tenant_code}") session_data = json.loads(session_data_json) if session_data.get('tenant_code') != tenant_code: return RedirectResponse(url=f"/{tenant_code}") username = session_data.get('username') return render_compose_form( name=username, back_url=f"/{tenant_code}/inbox", send_url=f"/{tenant_code}/send", tenant_code=tenant_code ) @app.get("/{tenant_code}/inbox") async def tenant_inbox(tenant_code: str, request: Request, folder: str = "INBOX"): """ 租戶收件匣頁面 - 使用統一介面 URL: https://webmail.lab.taipei/porsche1/inbox """ # 從租戶專屬 Cookie 讀取 session_id cookie_name = f"webmail_session_{tenant_code}" tenant_session_id = request.cookies.get(cookie_name) if not tenant_session_id: logger.info(f"No tenant session cookie found, redirecting to login") return RedirectResponse(url=f"/{tenant_code}", status_code=302) # 從 Redis 取得租戶 Session 資料 session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: logger.info(f"Tenant session expired, redirecting to login") return RedirectResponse(url=f"/{tenant_code}", status_code=302) session_data = json.loads(session_data_json) # 驗證 tenant_code if session_data.get('tenant_code') != tenant_code: logger.warning(f"Tenant code mismatch") return RedirectResponse(url=f"/{tenant_code}", status_code=302) # 從 session 取得使用者和郵件帳號資訊 mail_email = session_data.get('email') mail_password = session_data.get('password') username = session_data.get('username') logger.info(f"Loading inbox for tenant user: {username} ({mail_email}), folder: {folder}") # 呼叫統一的 render_inbox 函數 return render_inbox( request=request, folder=folder, mail_email=mail_email, mail_password=mail_password, username=username, name=None, session_id=tenant_session_id, tenant_code=tenant_code, is_management_tenant=False ) # ===== 3. 發送郵件 API (一般租戶) ===== @app.post("/{tenant_code}/send") async def tenant_send_mail( tenant_code: str, request: Request, to: str = Form(...), subject: str = Form(...), body: str = Form(...), content_type: str = Form('plain'), attachments: list[UploadFile] = File(default=[]) ): """一般租戶發送郵件""" # 從租戶專屬 Cookie 讀取 session_id cookie_name = f"webmail_session_{tenant_code}" tenant_session_id = request.cookies.get(cookie_name) if not tenant_session_id: return JSONResponse({"error": "未登入"}, status_code=401) session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: return JSONResponse({"error": "Session 已過期"}, status_code=401) session_data = json.loads(session_data_json) if session_data.get('tenant_code') != tenant_code: return JSONResponse({"error": "租戶不符"}, status_code=403) # 從 session 取得郵件帳號資訊 mail_email = session_data.get('email') mail_password = session_data.get('password') if not mail_email or not mail_password: return JSONResponse({"error": "找不到郵件帳號"}, status_code=404) # 處理附件 attachment_list = [] if attachments: for upload_file in attachments: if upload_file.filename: content = await upload_file.read() attachment_list.append({ 'filename': upload_file.filename, 'content': content }) # 發送郵件 success = send_email_smtp( mail_email, mail_password, to, subject, body, content_type, attachment_list if attachment_list else None ) if success: return JSONResponse({"success": True, "message": "郵件發送成功"}) else: return JSONResponse({"success": False, "error": "郵件發送失敗"}, status_code=500) # ===== 4. 主題設定 API (管理租戶和一般租戶) ===== @app.post("/theme") async def set_theme(request: Request, theme: str = Form(...)): """管理租戶設定布景主題""" session_id = request.session.get("session_id") if session_id: redis_client.hset(f"webmail:settings:{session_id}", "theme", theme) return RedirectResponse(url="/", status_code=303) @app.post("/{tenant_code}/theme") async def tenant_set_theme(tenant_code: str, request: Request, theme: str = Form(...)): """一般租戶設定布景主題""" tenant_session_id = request.session.get('tenant_session_id') if tenant_session_id: redis_client.hset(f"webmail:settings:{tenant_session_id}", "theme", theme) return RedirectResponse(url=f"/{tenant_code}/inbox", status_code=303) # ===== 5. 登出功能 (一般租戶) ===== @app.get("/{tenant_code}/logout") async def tenant_logout(tenant_code: str, request: Request): """一般租戶登出""" tenant_session_id = request.session.get('tenant_session_id') if tenant_session_id: redis_client.delete(f"webmail:tenant_session:{tenant_session_id}") redis_client.delete(f"webmail:settings:{tenant_session_id}") request.session.clear() return HTMLResponse(f""" 已登出

    ✅ 已成功登出

    租戶: {tenant_code}

    您已安全登出系統

    ← 重新登入
    """) # ===== 6. 郵件查看和刪除 API (一般租戶) ===== @app.get("/{tenant_code}/api/mail/{mail_id}") async def tenant_get_mail(tenant_code: str, mail_id: str, folder: str, request: Request): """一般租戶取得單封郵件內容""" # 從租戶專屬 Cookie 讀取 session_id cookie_name = f"webmail_session_{tenant_code}" tenant_session_id = request.cookies.get(cookie_name) if not tenant_session_id: return JSONResponse({"error": "未登入"}, status_code=401) session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: return JSONResponse({"error": "Session 已過期"}, status_code=401) session_data = json.loads(session_data_json) if session_data.get('tenant_code') != tenant_code: return JSONResponse({"error": "租戶不符"}, status_code=403) mail_email = session_data.get('email') mail_password = session_data.get('password') try: mail_detail = get_mail_by_id(mail_email, mail_password, mail_id, folder) return JSONResponse(mail_detail) except Exception as e: logger.error(f"Error getting mail: {e}") return JSONResponse({"error": str(e)}, status_code=500) @app.post("/{tenant_code}/api/delete-mails") async def tenant_delete_mails(tenant_code: str, request: Request): """一般租戶刪除郵件""" # 從租戶專屬 Cookie 讀取 session_id cookie_name = f"webmail_session_{tenant_code}" tenant_session_id = request.cookies.get(cookie_name) if not tenant_session_id: return JSONResponse({"error": "未登入"}, status_code=401) session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: return JSONResponse({"error": "Session 已過期"}, status_code=401) session_data = json.loads(session_data_json) if session_data.get('tenant_code') != tenant_code: return JSONResponse({"error": "租戶不符"}, status_code=403) mail_email = session_data.get('email') mail_password = session_data.get('password') try: data = await request.json() mail_ids = data.get('mail_ids', []) folder = data.get('folder', 'INBOX') # 刪除郵件 success = delete_emails(mail_email, mail_password, mail_ids, folder) if success: return JSONResponse({"success": True, "message": f"已刪除 {len(mail_ids)} 封郵件"}) else: return JSONResponse({"success": False, "error": "刪除失敗"}, status_code=500) except Exception as e: logger.error(f"Error deleting mails: {e}") return JSONResponse({"error": str(e)}, status_code=500) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=10180)