from fastapi import FastAPI, Request, HTTPException, Form, File, UploadFile from fastapi.responses import RedirectResponse, HTMLResponse, JSONResponse from starlette.middleware.sessions import SessionMiddleware from authlib.integrations.starlette_client import OAuth from sqlalchemy import create_engine, text from typing import Optional import redis import json import os import secrets import logging import imaplib import smtplib import email from email.header import decode_header from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime import html logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============================================================================ # 輔助函數:從 Virtual MIS Backend 取得租戶資訊 # ============================================================================ import requests VIRTUAL_MIS_API = "https://vmis.lab.taipei/api/v1" def get_tenant_by_code(tenant_code: str): """從 Virtual MIS Backend 取得租戶資訊""" try: response = requests.get( f"{VIRTUAL_MIS_API}/tenants/by-code/{tenant_code}", timeout=5 ) if response.status_code == 200: tenant_data = response.json() return { "id": tenant_data.get("id"), "code": tenant_data.get("code"), "name": tenant_data.get("name"), "realm": tenant_data.get("code"), "is_manager": tenant_data.get("is_manager", False) } else: logger.error(f"取得租戶資訊失敗: {response.status_code}") return None except Exception as e: logger.error(f"取得租戶資訊錯誤: {e}") return None # ============================================================================ # PKCE 支援函數 # ============================================================================ import hashlib import base64 def generate_pkce_pair(): code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=") code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest()).decode("utf-8").rstrip("=") return code_verifier, code_challenge def get_account_by_sso_uuid_and_realm(sso_uuid: str, realm: str): try: with db_engine.connect() as conn: result = conn.execute(text(""" SELECT a.id, a.email, a.default_password, a.sso_account, a.account_code FROM accounts a JOIN tenants t ON a.tenant_id = t.id WHERE a.sso_uuid = :sso_uuid AND t.keycloak_realm = :realm LIMIT 1 """), {"sso_uuid": sso_uuid, "realm": realm}).fetchone() if result: return {"id": result[0], "email": result[1], "password": result[2], "sso_account": result[3], "account_code": result[4]} return None except Exception as e: logger.error(f"DB error: {e}") return None def render_inbox(request, folder, mail_email, mail_password, username, name, session_id, tenant_code, is_management_tenant=False): folders = get_imap_folders(mail_email, mail_password) messages = get_imap_messages(mail_email, mail_password, folder) msg_rows = "".join([f'{m.get("from","")}{m.get("subject","")}{m.get("date","")}' for m in messages]) folder_links = "".join([f'
  • {f}
  • ' for f in folders]) html = f'''WebMail

    WebMail - {username}

    {mail_email} | Tenant: {tenant_code}

    Logout

    Folder: {folder}

    Compose New Email

    {msg_rows}
    FromSubjectDate
    ''' return HTMLResponse(content=html) def render_compose_form(name, back_url, send_url, tenant_code): html = f'''Compose

    Compose Email

    From: {name}

    Back to Inbox

    ''' return HTMLResponse(content=html) def get_mail_detail(mail_email, mail_password, mail_id, folder): return get_mail_by_id(mail_email, mail_password, mail_id, folder) def delete_emails(mail_email, mail_password, mail_ids, folder): try: mail = imaplib.IMAP4("mailserver", 143) mail.login(mail_email, mail_password) mail.select(folder) for mid in mail_ids: mail.store(mid, "+FLAGS", "\\Deleted") mail.expunge() mail.close() mail.logout() return True except Exception as e: logger.error(f"Delete error: {e}") return False app = FastAPI() SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32)) app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY) # Redis 配置 REDIS_HOST = os.getenv("REDIS_HOST", "10.1.0.20") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "DC1qaz2wsx") REDIS_DB = int(os.getenv("REDIS_DB", "2")) redis_client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=REDIS_DB, decode_responses=True ) # Virtual MIS 資料庫配置 DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://admin:DC1qaz2wsx@10.1.0.20:5433/virtual_mis") db_engine = create_engine(DATABASE_URL, pool_pre_ping=True) # Keycloak OAuth 配置 KEYCLOAK_SERVER_URL = os.getenv("KEYCLOAK_SERVER_URL", "https://auth.lab.taipei") KEYCLOAK_REALM = os.getenv("KEYCLOAK_REALM", "vmis-admin") KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "vmis-services") KEYCLOAK_CLIENT_SECRET = os.getenv("KEYCLOAK_CLIENT_SECRET", "VirtualMIS2026ServiceSecret12345") REDIRECT_URI = os.getenv("REDIRECT_URI", "https://webmail.lab.taipei/callback") oauth = OAuth() oauth.register( name="keycloak", client_id=KEYCLOAK_CLIENT_ID, client_secret=KEYCLOAK_CLIENT_SECRET, server_metadata_url=f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/.well-known/openid-configuration", client_kwargs={"scope": "openid email profile"} ) def get_account_by_sso_uuid(sso_uuid: str): """根據 Keycloak UUID 查詢帳號""" try: with db_engine.connect() as conn: result = conn.execute( text(""" SELECT a.email, a.default_password, a.sso_account, a.account_code FROM accounts a JOIN tenants t ON a.tenant_id = t.id WHERE a.sso_uuid = :sso_uuid AND t.keycloak_realm = :realm LIMIT 1 """), {"sso_uuid": sso_uuid, "realm": KEYCLOAK_REALM} ).fetchone() if result: logger.info(f"Found account: {result[2]} ({result[0]})") return { "email": result[0], "password": result[1], "sso_account": result[2], "account_code": result[3] } logger.warning(f"No account found for sso_uuid: {sso_uuid}") return None except Exception as e: logger.error(f"Database error: {e}") return None def decode_header_value(header_value): """解碼郵件標頭""" if not header_value: return '' decoded = decode_header(header_value) parts = [] for content, charset in decoded: if isinstance(content, bytes): parts.append(content.decode(charset or 'utf-8', errors='replace')) else: parts.append(content) return ''.join(parts) def get_imap_folders(email_addr, password): """取得 IMAP 資料夾列表""" try: mail = imaplib.IMAP4('mailserver', 143) mail.login(email_addr, password) # 列出所有資料夾(包括子資料夾) status, folders = mail.list() folder_list = [] if status == 'OK': import re for folder in folders: # 解析資料夾名稱 folder_str = folder.decode('utf-8', errors='replace') logger.info(f"Raw folder: {folder_str}") # 格式可能是: # 1. (\HasNoChildren) "." "INBOX" (有引號) # 2. (\HasNoChildren) "." INBOX (無引號) # 3. (\HasNoChildren \Sent) "." Sent (無引號 + 多個標誌) # 改進的正則表達式:匹配有引號或無引號的資料夾名稱 # 格式: (flags) "delimiter" folder_name 或 "folder_name" match = re.search(r'\([^)]*\)\s+"[^"]*"\s+(.+)$', folder_str) if match: folder_name = match.group(1).strip() # 移除可能的引號 folder_name = folder_name.strip('"') if folder_name: folder_list.append(folder_name) logger.info(f"Parsed folder: {folder_name}") else: logger.warning(f"Failed to parse folder: {folder_str}") mail.logout() # 記錄所有找到的資料夾 logger.info(f"Found folders: {folder_list}") # 確保至少有 INBOX if not folder_list: folder_list = ['INBOX'] return folder_list except Exception as e: logger.error(f"IMAP folder list error: {e}") import traceback traceback.print_exc() return ['INBOX'] # 至少返回收件匣 def get_imap_messages(email_addr, password, folder='INBOX', limit=50): """從 IMAP 取得郵件列表(含附件、標記等資訊)""" try: # 連接 IMAP mail = imaplib.IMAP4('mailserver', 143) mail.login(email_addr, password) # 選擇資料夾 status, _ = mail.select(folder) if status != 'OK': logger.warning(f"Failed to select folder: {folder}, using INBOX") mail.select('INBOX') # 搜尋所有郵件 status, messages = mail.search(None, 'ALL') mail_ids = messages[0].split() # 取得最新的 N 封 mail_ids = mail_ids[-limit:] mail_ids.reverse() messages_list = [] for mail_id in mail_ids: # 取得郵件 FLAGS 和 RFC822 status, msg_data = mail.fetch(mail_id, '(FLAGS RFC822)') flags = [] msg_bytes = None for response_part in msg_data: if isinstance(response_part, bytes): # 解析 FLAGS flags_match = response_part.decode('utf-8', errors='replace') if '\\Seen' in flags_match: flags.append('seen') if '\\Flagged' in flags_match: flags.append('flagged') if '\\Answered' in flags_match: flags.append('answered') elif isinstance(response_part, tuple): msg_bytes = response_part[1] if msg_bytes: msg = email.message_from_bytes(msg_bytes) # 解析主旨 subject = decode_header_value(msg.get('Subject', '')) # 解析寄件者 from_ = msg.get('From', '') # 解析日期 date_str = msg.get('Date', '') # 檢查是否有附件 has_attachment = False if msg.is_multipart(): for part in msg.walk(): content_disposition = part.get('Content-Disposition', '') if 'attachment' in content_disposition: has_attachment = True break # 是否未讀 is_unread = 'seen' not in flags messages_list.append({ 'id': mail_id.decode(), 'subject': subject or '(無主旨)', 'from': from_, 'date': date_str, 'has_attachment': has_attachment, 'is_unread': is_unread, 'is_flagged': 'flagged' in flags, 'is_answered': 'answered' in flags }) mail.close() mail.logout() return messages_list except Exception as e: logger.error(f"IMAP error: {e}") import traceback traceback.print_exc() return [] def get_mail_by_id(email_addr, password, mail_id, folder='INBOX'): """讀取單封郵件內容""" try: mail = imaplib.IMAP4('mailserver', 143) mail.login(email_addr, password) mail.select(folder) status, msg_data = mail.fetch(mail_id, '(RFC822)') for response_part in msg_data: if isinstance(response_part, tuple): msg = email.message_from_bytes(response_part[1]) # 解析標頭 subject = decode_header_value(msg.get('Subject', '')) from_ = msg.get('From', '') to_ = msg.get('To', '') date_str = msg.get('Date', '') # 解析郵件內容和附件 body = "" attachments = [] if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = part.get('Content-Disposition', '') # 檢查是否為附件 if 'attachment' in content_disposition: filename = part.get_filename() if filename: # 解碼檔名 decoded_filename = decode_header_value(filename) # 取得檔案大小 file_size = len(part.get_payload(decode=True)) attachments.append({ 'filename': decoded_filename, 'size': file_size, 'content_type': content_type }) # 取得郵件本文 elif content_type == "text/plain" and not body: body = part.get_payload(decode=True).decode('utf-8', errors='replace') elif content_type == "text/html" and not body: body = part.get_payload(decode=True).decode('utf-8', errors='replace') else: body = msg.get_payload(decode=True).decode('utf-8', errors='replace') mail.close() mail.logout() return { 'id': mail_id, 'subject': subject or '(無主旨)', 'from': from_, 'to': to_, 'date': date_str, 'body': body, 'attachments': attachments } mail.close() mail.logout() return None except Exception as e: logger.error(f"IMAP read error: {e}") import traceback traceback.print_exc() return None def send_email_smtp(from_addr, password, to_addr, subject, body, content_type='plain', attachments=None): """使用 SMTP 發送郵件(支援附件)""" try: # 建立郵件 msg = MIMEMultipart('mixed') if attachments else MIMEMultipart('alternative') msg['From'] = from_addr msg['To'] = to_addr msg['Subject'] = subject # 建立郵件內容部分 if attachments: # 有附件時,先建立 alternative 部分 msg_alternative = MIMEMultipart('alternative') if content_type == 'html': text_content = body.replace('
    ', '\n').replace('<[^>]*>', '') msg_alternative.attach(MIMEText(text_content, 'plain', 'utf-8')) msg_alternative.attach(MIMEText(body, 'html', 'utf-8')) else: msg_alternative.attach(MIMEText(body, 'plain', 'utf-8')) msg.attach(msg_alternative) else: # 無附件時,直接附加內容 if content_type == 'html': text_content = body.replace('
    ', '\n').replace('<[^>]*>', '') msg.attach(MIMEText(text_content, 'plain', 'utf-8')) msg.attach(MIMEText(body, 'html', 'utf-8')) else: msg.attach(MIMEText(body, 'plain', 'utf-8')) # 附加檔案 if attachments: from email.mime.base import MIMEBase from email import encoders import mimetypes for attachment in attachments: # 猜測 MIME 類型 mime_type, _ = mimetypes.guess_type(attachment['filename']) if mime_type is None: mime_type = 'application/octet-stream' main_type, sub_type = mime_type.split('/', 1) part = MIMEBase(main_type, sub_type) part.set_payload(attachment['content']) encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename="{attachment["filename"]}"') msg.attach(part) # 連接 SMTP (內部網路,不使用 TLS) smtp = smtplib.SMTP('mailserver', 587) smtp.login(from_addr, password) smtp.send_message(msg) smtp.quit() attachment_count = len(attachments) if attachments else 0 logger.info(f"Email sent from {from_addr} to {to_addr} (type: {content_type}, attachments: {attachment_count})") return True except Exception as e: logger.error(f"SMTP error: {e}") import traceback traceback.print_exc() return False # ===== 根路徑重定向 ===== @app.get("/") async def root(): """WebMail 服務說明頁面""" return HTMLResponse(""" WebMail Service - 匠耘虛擬辦公室
    📧

    電子郵件服務

    Virtual Office - Email Service

    企業租戶專屬的郵件服務

    服務位址:

    https://webmail.lab.taipei/{realm}

    * realm 為貴公司的租戶代碼

    虛擬辦公室完整服務:

    🏢 管理入口
    📧 電子郵件
    📅 日曆服務
    💾 硬碟服務
    📄 Office 服務

    """) @app.get("/{tenant_code}") async def tenant_login_page(tenant_code: str, request: Request): """ 租戶登入頁面 - 自動導向 Keycloak URL: https://webmail.lab.taipei/porsche1 """ # 查詢租戶資訊 tenant = get_tenant_by_code(tenant_code) if not tenant: return HTMLResponse( content=f""" 租戶不存在

    租戶不存在

    找不到租戶「{tenant_code}」
    請確認網址是否正確

    """, status_code=404 ) # 生成 PKCE code_verifier, code_challenge = generate_pkce_pair() # 儲存 code_verifier 到 Redis pkce_key = f"pkce:{tenant_code}:{secrets.token_urlsafe(16)}" redis_client.setex(pkce_key, 600, code_verifier) logger.info(f"Generated PKCE for {tenant_code}: key={pkce_key}") # 建立 Keycloak 認證 URL (含 PKCE) realm = tenant.get('realm', tenant.get('keycloak_realm', tenant['code'])) client_id = "webmail" redirect_uri = f"https://webmail.lab.taipei/{tenant_code}/callback" auth_url = ( f"{KEYCLOAK_SERVER_URL}/realms/{realm}/protocol/openid-connect/auth" f"?client_id={client_id}" f"&redirect_uri={redirect_uri}" f"&response_type=code" f"&scope=openid+email+profile" f"&state={pkce_key}" f"&code_challenge={code_challenge}" f"&code_challenge_method=S256" ) logger.info(f"Redirecting {tenant_code} to Keycloak: {auth_url}") # 重定向到 Keycloak return RedirectResponse(url=auth_url) # ===== 租戶 Callback ===== @app.get("/{tenant_code}/callback") async def tenant_callback( tenant_code: str, code: Optional[str] = None, state: Optional[str] = None, error: Optional[str] = None, error_description: Optional[str] = None, request: Request = None ): """ Keycloak 回調處理 - Token 交換 (含 PKCE 驗證) URL: https://webmail.lab.taipei/porsche1/callback?code=xxx&state=pkce:xxx """ # 錯誤處理 if error: logger.error(f"Keycloak error: {error} - {error_description}") return JSONResponse(content={"detail": f"認證錯誤: {error_description}"}, status_code=400) if not code: return JSONResponse(content={"detail": "Missing authorization code"}, status_code=400) # 從 Redis 取回 code_verifier pkce_key = state code_verifier = redis_client.get(pkce_key) if not code_verifier: logger.error(f"PKCE key not found: {pkce_key}") return JSONResponse(content={"detail": "PKCE 驗證失敗或已過期,請重新登入"}, status_code=400) logger.info(f"Retrieved code_verifier from Redis: key={pkce_key}") try: # 1. 查詢租戶資訊 tenant = get_tenant_by_code(tenant_code) if not tenant: raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_code}") realm = tenant.get('realm', tenant.get('keycloak_realm', tenant['code'])) logger.info(f"Processing callback for tenant: {tenant_code}, realm: {realm}") # 2. 交換 Token logger.info(f"Exchanging authorization code for access token") token_url = f"{KEYCLOAK_SERVER_URL}/realms/{realm}/protocol/openid-connect/token" redirect_uri = f"https://webmail.lab.taipei/{tenant_code}/callback" # Public Client + PKCE token_data = { "grant_type": "authorization_code", "client_id": "webmail", "code": code, "redirect_uri": redirect_uri, "code_verifier": code_verifier } import requests from urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) token_response = requests.post(token_url, data=token_data, verify=False, timeout=10) if token_response.status_code != 200: logger.error(f"Token exchange failed: {token_response.status_code} - {token_response.text}") raise HTTPException(status_code=400, detail=f"Token exchange failed: {token_response.text}") token_result = token_response.json() access_token = token_result.get("access_token") refresh_token = token_result.get("refresh_token") if not access_token: raise HTTPException(status_code=400, detail="No access token received") # 刪除已使用的 PKCE key redis_client.delete(pkce_key) logger.info(f"✓ Access token received, PKCE key deleted: {pkce_key}") # 3. 取得使用者資訊 logger.info("Fetching user info from Keycloak") userinfo_url = f"{KEYCLOAK_SERVER_URL}/realms/{realm}/protocol/openid-connect/userinfo" headers = {"Authorization": f"Bearer {access_token}"} userinfo_response = requests.get(userinfo_url, headers=headers, verify=False, timeout=10) if userinfo_response.status_code != 200: logger.error(f"Userinfo fetch failed: {userinfo_response.status_code} - {userinfo_response.text}") raise HTTPException(status_code=400, detail="Failed to fetch user info") user_info = userinfo_response.json() sso_uuid = user_info.get("sub") username = user_info.get("preferred_username") email = user_info.get("email") logger.info(f"✓ User authenticated: {username} (email: {email}, uuid: {sso_uuid})") # 4. 查詢郵件帳號 logger.info(f"Looking up mail account for SSO UUID: {sso_uuid}") account = get_account_by_sso_uuid_and_realm(sso_uuid, realm) if not account: logger.error(f"No mail account found for SSO UUID: {sso_uuid} in realm: {realm}") return HTMLResponse( content=f""" 郵件帳號不存在

    ❌ 郵件帳號不存在

    使用者: {username}

    租戶: {tenant_code}

    您的帳號尚未開通郵件服務

    請聯絡系統管理員

    ← 返回登入頁面
    """, status_code=400 ) logger.info(f"✓ Mail account found: {account['email']}") # 5. 建立 Session logger.info(f"Creating session for user: {username}") # 生成 session_id import secrets session_id = secrets.token_urlsafe(32) # 將使用者資料存入 Redis session_data = { "session_id": session_id, "email": account['email'], "password": account['password'], "username": username, "sso_uuid": sso_uuid, "realm": realm, "tenant_code": tenant_code, "access_token": access_token, "refresh_token": refresh_token, "account_code": account['account_code'] } redis_client.setex( f"webmail:tenant_session:{session_id}", 3600 * 24, # 24 小時 json.dumps(session_data) ) # 將 session_id 存入 Cookie request.session['tenant_session_id'] = session_id request.session['tenant_code'] = tenant_code logger.info(f"✓ Session created: {session_id}") # 6. 重定向到收件匣 logger.info(f"Redirecting to inbox: /{tenant_code}/inbox") return RedirectResponse(url=f"/{tenant_code}/inbox", status_code=302) except HTTPException: raise except Exception as e: logger.error(f"Callback error: {e}") import traceback traceback.print_exc() return HTMLResponse( content=f""" 登入失敗

    ❌ 登入處理失敗

    租戶: {tenant_code}

    無法完成登入流程

    {str(e)}
    ← 返回登入頁面
    """, status_code=500 ) @app.get("/{tenant_code}/compose") async def tenant_compose_form(tenant_code: str, request: Request): """一般租戶撰寫郵件""" tenant_session_id = request.session.get('tenant_session_id') if not tenant_session_id: return RedirectResponse(url=f"/{tenant_code}") session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: return RedirectResponse(url=f"/{tenant_code}") session_data = json.loads(session_data_json) if session_data.get('tenant_code') != tenant_code: return RedirectResponse(url=f"/{tenant_code}") username = session_data.get('username') return render_compose_form( name=username, back_url=f"/{tenant_code}/inbox", send_url=f"/{tenant_code}/send", tenant_code=tenant_code ) @app.get("/{tenant_code}/inbox") async def tenant_inbox(tenant_code: str, request: Request, folder: str = "INBOX"): """ 租戶收件匣頁面 - 使用統一介面 URL: https://webmail.lab.taipei/porsche1/inbox """ # 檢查租戶 Session tenant_session_id = request.session.get('tenant_session_id') if not tenant_session_id: logger.info(f"No tenant session found, redirecting to login") return RedirectResponse(url=f"/{tenant_code}", status_code=302) # 從 Redis 取得租戶 Session 資料 session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: logger.info(f"Tenant session expired, redirecting to login") request.session.clear() return RedirectResponse(url=f"/{tenant_code}", status_code=302) session_data = json.loads(session_data_json) # 驗證 tenant_code if session_data.get('tenant_code') != tenant_code: logger.warning(f"Tenant code mismatch") return RedirectResponse(url=f"/{tenant_code}", status_code=302) # 從 session 取得使用者和郵件帳號資訊 mail_email = session_data.get('email') mail_password = session_data.get('password') username = session_data.get('username') logger.info(f"Loading inbox for tenant user: {username} ({mail_email}), folder: {folder}") # 呼叫統一的 render_inbox 函數 return render_inbox( request=request, folder=folder, mail_email=mail_email, mail_password=mail_password, username=username, name=None, session_id=tenant_session_id, tenant_code=tenant_code, is_management_tenant=False ) # ===== 3. 發送郵件 API (一般租戶) ===== @app.post("/{tenant_code}/send") async def tenant_send_mail( tenant_code: str, request: Request, to: str = Form(...), subject: str = Form(...), body: str = Form(...), content_type: str = Form('plain'), attachments: list[UploadFile] = File(default=[]) ): """一般租戶發送郵件""" tenant_session_id = request.session.get('tenant_session_id') if not tenant_session_id: return JSONResponse({"error": "未登入"}, status_code=401) session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: return JSONResponse({"error": "Session 已過期"}, status_code=401) session_data = json.loads(session_data_json) if session_data.get('tenant_code') != tenant_code: return JSONResponse({"error": "租戶不符"}, status_code=403) # 從 session 取得郵件帳號資訊 mail_email = session_data.get('email') mail_password = session_data.get('password') if not mail_email or not mail_password: return JSONResponse({"error": "找不到郵件帳號"}, status_code=404) # 處理附件 attachment_list = [] if attachments: for upload_file in attachments: if upload_file.filename: content = await upload_file.read() attachment_list.append({ 'filename': upload_file.filename, 'content': content }) # 發送郵件 success = send_email_smtp( mail_email, mail_password, to, subject, body, content_type, attachment_list if attachment_list else None ) if success: return JSONResponse({"success": True, "message": "郵件發送成功"}) else: return JSONResponse({"success": False, "error": "郵件發送失敗"}, status_code=500) # ===== 4. 主題設定 API (管理租戶和一般租戶) ===== @app.post("/theme") async def set_theme(request: Request, theme: str = Form(...)): """管理租戶設定布景主題""" session_id = request.session.get("session_id") if session_id: redis_client.hset(f"webmail:settings:{session_id}", "theme", theme) return RedirectResponse(url="/", status_code=303) @app.post("/{tenant_code}/theme") async def tenant_set_theme(tenant_code: str, request: Request, theme: str = Form(...)): """一般租戶設定布景主題""" tenant_session_id = request.session.get('tenant_session_id') if tenant_session_id: redis_client.hset(f"webmail:settings:{tenant_session_id}", "theme", theme) return RedirectResponse(url=f"/{tenant_code}/inbox", status_code=303) # ===== 5. 登出功能 (一般租戶) ===== @app.get("/{tenant_code}/logout") async def tenant_logout(tenant_code: str, request: Request): """一般租戶登出""" tenant_session_id = request.session.get('tenant_session_id') if tenant_session_id: redis_client.delete(f"webmail:tenant_session:{tenant_session_id}") redis_client.delete(f"webmail:settings:{tenant_session_id}") request.session.clear() return HTMLResponse(f""" 已登出

    ✅ 已成功登出

    租戶: {tenant_code}

    您已安全登出系統

    ← 重新登入
    """) # ===== 6. 郵件查看和刪除 API (一般租戶) ===== @app.get("/{tenant_code}/api/mail/{{mail_id}}") async def tenant_get_mail(tenant_code: str, mail_id: str, folder: str, request: Request): """一般租戶取得單封郵件內容""" tenant_session_id = request.session.get('tenant_session_id') if not tenant_session_id: return JSONResponse({"error": "未登入"}, status_code=401) session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: return JSONResponse({"error": "Session 已過期"}, status_code=401) session_data = json.loads(session_data_json) if session_data.get('tenant_code') != tenant_code: return JSONResponse({"error": "租戶不符"}, status_code=403) mail_email = session_data.get('email') mail_password = session_data.get('password') try: mail_detail = get_mail_detail(mail_email, mail_password, mail_id, folder) return JSONResponse(mail_detail) except Exception as e: logger.error(f"Error getting mail: {e}") return JSONResponse({"error": str(e)}, status_code=500) @app.post("/{tenant_code}/api/delete-mails") async def tenant_delete_mails(tenant_code: str, request: Request): """一般租戶刪除郵件""" tenant_session_id = request.session.get('tenant_session_id') if not tenant_session_id: return JSONResponse({"error": "未登入"}, status_code=401) session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}") if not session_data_json: return JSONResponse({"error": "Session 已過期"}, status_code=401) session_data = json.loads(session_data_json) if session_data.get('tenant_code') != tenant_code: return JSONResponse({"error": "租戶不符"}, status_code=403) mail_email = session_data.get('email') mail_password = session_data.get('password') try: data = await request.json() mail_ids = data.get('mail_ids', []) folder = data.get('folder', 'INBOX') # 刪除郵件 success = delete_emails(mail_email, mail_password, mail_ids, folder) if success: return JSONResponse({"success": True, "message": f"已刪除 {len(mail_ids)} 封郵件"}) else: return JSONResponse({"success": False, "error": "刪除失敗"}, status_code=500) except Exception as e: logger.error(f"Error deleting mails: {e}") return JSONResponse({"error": str(e)}, status_code=500) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=10180)