Files
webmail-gateway/app.py
porsche5130 b3c8c28672 Initial commit: WebMail Gateway with PKCE support
- Multi-tenant routing support
- Keycloak SSO integration with PKCE
- Basic inbox functionality
- Redis session management
2026-03-04 01:17:25 +08:00

1277 lines
46 KiB
Python

from fastapi import FastAPI, Request, HTTPException, Form, File, UploadFile
from fastapi.responses import RedirectResponse, HTMLResponse, JSONResponse
from starlette.middleware.sessions import SessionMiddleware
from authlib.integrations.starlette_client import OAuth
from sqlalchemy import create_engine, text
from typing import Optional
import redis
import json
import os
import secrets
import logging
import imaplib
import smtplib
import email
from email.header import decode_header
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import html
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================================================
# 輔助函數:從 Virtual MIS Backend 取得租戶資訊
# ============================================================================
import requests
VIRTUAL_MIS_API = "https://vmis.lab.taipei/api/v1"
def get_tenant_by_code(tenant_code: str):
"""從 Virtual MIS Backend 取得租戶資訊"""
try:
response = requests.get(
f"{VIRTUAL_MIS_API}/tenants/by-code/{tenant_code}",
timeout=5
)
if response.status_code == 200:
tenant_data = response.json()
return {
"id": tenant_data.get("id"),
"code": tenant_data.get("code"),
"name": tenant_data.get("name"),
"realm": tenant_data.get("code"),
"is_manager": tenant_data.get("is_manager", False)
}
else:
logger.error(f"取得租戶資訊失敗: {response.status_code}")
return None
except Exception as e:
logger.error(f"取得租戶資訊錯誤: {e}")
return None
# ============================================================================
# PKCE 支援函數
# ============================================================================
import hashlib
import base64
def generate_pkce_pair():
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=")
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest()).decode("utf-8").rstrip("=")
return code_verifier, code_challenge
def get_account_by_sso_uuid_and_realm(sso_uuid: str, realm: str):
try:
with db_engine.connect() as conn:
result = conn.execute(text("""
SELECT a.id, a.email, a.default_password, a.sso_account, a.account_code
FROM accounts a JOIN tenants t ON a.tenant_id = t.id
WHERE a.sso_uuid = :sso_uuid AND t.keycloak_realm = :realm LIMIT 1
"""), {"sso_uuid": sso_uuid, "realm": realm}).fetchone()
if result:
return {"id": result[0], "email": result[1], "password": result[2], "sso_account": result[3], "account_code": result[4]}
return None
except Exception as e:
logger.error(f"DB error: {e}")
return None
def render_inbox(request, folder, mail_email, mail_password, username, name, session_id, tenant_code, is_management_tenant=False):
folders = get_imap_folders(mail_email, mail_password)
messages = get_imap_messages(mail_email, mail_password, folder)
msg_rows = "".join([f'<tr><td>{m.get("from","")}</td><td>{m.get("subject","")}</td><td>{m.get("date","")}</td></tr>' for m in messages])
folder_links = "".join([f'<li><a href="/{tenant_code}/inbox?folder={f}">{f}</a></li>' for f in folders])
html = f'''<!DOCTYPE html><html><head><meta charset="UTF-8"><title>WebMail</title><style>
body{{font-family:Arial;margin:20px}}.header{{background:#667eea;color:white;padding:20px;border-radius:8px}}
table{{width:100%;border-collapse:collapse;margin-top:20px}}th,td{{padding:10px;text-align:left;border-bottom:1px solid #ddd}}
</style></head><body><div class="header"><h1>WebMail - {username}</h1><p>{mail_email} | Tenant: {tenant_code}</p>
<a href="/{tenant_code}/logout" style="color:white">Logout</a></div>
<h2>Folder: {folder}</h2><ul>{folder_links}</ul>
<p><a href="/{tenant_code}/compose">Compose New Email</a></p>
<table><tr><th>From</th><th>Subject</th><th>Date</th></tr>{msg_rows}</table></body></html>'''
return HTMLResponse(content=html)
def render_compose_form(name, back_url, send_url, tenant_code):
html = f'''<!DOCTYPE html><html><head><meta charset="UTF-8"><title>Compose</title><style>
body{{font-family:Arial;margin:20px}}.header{{background:#667eea;color:white;padding:20px;border-radius:8px}}
label{{display:block;margin-top:15px}}input,textarea{{width:100%;padding:8px;margin-top:5px}}
textarea{{height:200px}}button{{background:#667eea;color:white;padding:10px 20px;border:none;margin-top:15px}}
</style></head><body><div class="header"><h1>Compose Email</h1><p>From: {name}</p></div>
<form method="POST" action="{send_url}">
<label>To:</label><input type="email" name="to" required>
<label>Subject:</label><input type="text" name="subject" required>
<label>Message:</label><textarea name="body" required></textarea>
<input type="hidden" name="content_type" value="plain">
<button type="submit">Send</button></form>
<p><a href="{back_url}">Back to Inbox</a></p></body></html>'''
return HTMLResponse(content=html)
def get_mail_detail(mail_email, mail_password, mail_id, folder):
return get_mail_by_id(mail_email, mail_password, mail_id, folder)
def delete_emails(mail_email, mail_password, mail_ids, folder):
try:
mail = imaplib.IMAP4("mailserver", 143)
mail.login(mail_email, mail_password)
mail.select(folder)
for mid in mail_ids:
mail.store(mid, "+FLAGS", "\\Deleted")
mail.expunge()
mail.close()
mail.logout()
return True
except Exception as e:
logger.error(f"Delete error: {e}")
return False
app = FastAPI()
SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32))
app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY)
# Redis 配置
REDIS_HOST = os.getenv("REDIS_HOST", "10.1.0.20")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "DC1qaz2wsx")
REDIS_DB = int(os.getenv("REDIS_DB", "2"))
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
db=REDIS_DB,
decode_responses=True
)
# Virtual MIS 資料庫配置
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://admin:DC1qaz2wsx@10.1.0.20:5433/virtual_mis")
db_engine = create_engine(DATABASE_URL, pool_pre_ping=True)
# Keycloak OAuth 配置
KEYCLOAK_SERVER_URL = os.getenv("KEYCLOAK_SERVER_URL", "https://auth.lab.taipei")
KEYCLOAK_REALM = os.getenv("KEYCLOAK_REALM", "vmis-admin")
KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "vmis-services")
KEYCLOAK_CLIENT_SECRET = os.getenv("KEYCLOAK_CLIENT_SECRET", "VirtualMIS2026ServiceSecret12345")
REDIRECT_URI = os.getenv("REDIRECT_URI", "https://webmail.lab.taipei/callback")
oauth = OAuth()
oauth.register(
name="keycloak",
client_id=KEYCLOAK_CLIENT_ID,
client_secret=KEYCLOAK_CLIENT_SECRET,
server_metadata_url=f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"}
)
def get_account_by_sso_uuid(sso_uuid: str):
"""根據 Keycloak UUID 查詢帳號"""
try:
with db_engine.connect() as conn:
result = conn.execute(
text("""
SELECT a.email, a.default_password, a.sso_account, a.account_code
FROM accounts a
JOIN tenants t ON a.tenant_id = t.id
WHERE a.sso_uuid = :sso_uuid
AND t.keycloak_realm = :realm
LIMIT 1
"""),
{"sso_uuid": sso_uuid, "realm": KEYCLOAK_REALM}
).fetchone()
if result:
logger.info(f"Found account: {result[2]} ({result[0]})")
return {
"email": result[0],
"password": result[1],
"sso_account": result[2],
"account_code": result[3]
}
logger.warning(f"No account found for sso_uuid: {sso_uuid}")
return None
except Exception as e:
logger.error(f"Database error: {e}")
return None
def decode_header_value(header_value):
"""解碼郵件標頭"""
if not header_value:
return ''
decoded = decode_header(header_value)
parts = []
for content, charset in decoded:
if isinstance(content, bytes):
parts.append(content.decode(charset or 'utf-8', errors='replace'))
else:
parts.append(content)
return ''.join(parts)
def get_imap_folders(email_addr, password):
"""取得 IMAP 資料夾列表"""
try:
mail = imaplib.IMAP4('mailserver', 143)
mail.login(email_addr, password)
# 列出所有資料夾(包括子資料夾)
status, folders = mail.list()
folder_list = []
if status == 'OK':
import re
for folder in folders:
# 解析資料夾名稱
folder_str = folder.decode('utf-8', errors='replace')
logger.info(f"Raw folder: {folder_str}")
# 格式可能是:
# 1. (\HasNoChildren) "." "INBOX" (有引號)
# 2. (\HasNoChildren) "." INBOX (無引號)
# 3. (\HasNoChildren \Sent) "." Sent (無引號 + 多個標誌)
# 改進的正則表達式:匹配有引號或無引號的資料夾名稱
# 格式: (flags) "delimiter" folder_name 或 "folder_name"
match = re.search(r'\([^)]*\)\s+"[^"]*"\s+(.+)$', folder_str)
if match:
folder_name = match.group(1).strip()
# 移除可能的引號
folder_name = folder_name.strip('"')
if folder_name:
folder_list.append(folder_name)
logger.info(f"Parsed folder: {folder_name}")
else:
logger.warning(f"Failed to parse folder: {folder_str}")
mail.logout()
# 記錄所有找到的資料夾
logger.info(f"Found folders: {folder_list}")
# 確保至少有 INBOX
if not folder_list:
folder_list = ['INBOX']
return folder_list
except Exception as e:
logger.error(f"IMAP folder list error: {e}")
import traceback
traceback.print_exc()
return ['INBOX'] # 至少返回收件匣
def get_imap_messages(email_addr, password, folder='INBOX', limit=50):
"""從 IMAP 取得郵件列表(含附件、標記等資訊)"""
try:
# 連接 IMAP
mail = imaplib.IMAP4('mailserver', 143)
mail.login(email_addr, password)
# 選擇資料夾
status, _ = mail.select(folder)
if status != 'OK':
logger.warning(f"Failed to select folder: {folder}, using INBOX")
mail.select('INBOX')
# 搜尋所有郵件
status, messages = mail.search(None, 'ALL')
mail_ids = messages[0].split()
# 取得最新的 N 封
mail_ids = mail_ids[-limit:]
mail_ids.reverse()
messages_list = []
for mail_id in mail_ids:
# 取得郵件 FLAGS 和 RFC822
status, msg_data = mail.fetch(mail_id, '(FLAGS RFC822)')
flags = []
msg_bytes = None
for response_part in msg_data:
if isinstance(response_part, bytes):
# 解析 FLAGS
flags_match = response_part.decode('utf-8', errors='replace')
if '\\Seen' in flags_match:
flags.append('seen')
if '\\Flagged' in flags_match:
flags.append('flagged')
if '\\Answered' in flags_match:
flags.append('answered')
elif isinstance(response_part, tuple):
msg_bytes = response_part[1]
if msg_bytes:
msg = email.message_from_bytes(msg_bytes)
# 解析主旨
subject = decode_header_value(msg.get('Subject', ''))
# 解析寄件者
from_ = msg.get('From', '')
# 解析日期
date_str = msg.get('Date', '')
# 檢查是否有附件
has_attachment = False
if msg.is_multipart():
for part in msg.walk():
content_disposition = part.get('Content-Disposition', '')
if 'attachment' in content_disposition:
has_attachment = True
break
# 是否未讀
is_unread = 'seen' not in flags
messages_list.append({
'id': mail_id.decode(),
'subject': subject or '(無主旨)',
'from': from_,
'date': date_str,
'has_attachment': has_attachment,
'is_unread': is_unread,
'is_flagged': 'flagged' in flags,
'is_answered': 'answered' in flags
})
mail.close()
mail.logout()
return messages_list
except Exception as e:
logger.error(f"IMAP error: {e}")
import traceback
traceback.print_exc()
return []
def get_mail_by_id(email_addr, password, mail_id, folder='INBOX'):
"""讀取單封郵件內容"""
try:
mail = imaplib.IMAP4('mailserver', 143)
mail.login(email_addr, password)
mail.select(folder)
status, msg_data = mail.fetch(mail_id, '(RFC822)')
for response_part in msg_data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
# 解析標頭
subject = decode_header_value(msg.get('Subject', ''))
from_ = msg.get('From', '')
to_ = msg.get('To', '')
date_str = msg.get('Date', '')
# 解析郵件內容和附件
body = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = part.get('Content-Disposition', '')
# 檢查是否為附件
if 'attachment' in content_disposition:
filename = part.get_filename()
if filename:
# 解碼檔名
decoded_filename = decode_header_value(filename)
# 取得檔案大小
file_size = len(part.get_payload(decode=True))
attachments.append({
'filename': decoded_filename,
'size': file_size,
'content_type': content_type
})
# 取得郵件本文
elif content_type == "text/plain" and not body:
body = part.get_payload(decode=True).decode('utf-8', errors='replace')
elif content_type == "text/html" and not body:
body = part.get_payload(decode=True).decode('utf-8', errors='replace')
else:
body = msg.get_payload(decode=True).decode('utf-8', errors='replace')
mail.close()
mail.logout()
return {
'id': mail_id,
'subject': subject or '(無主旨)',
'from': from_,
'to': to_,
'date': date_str,
'body': body,
'attachments': attachments
}
mail.close()
mail.logout()
return None
except Exception as e:
logger.error(f"IMAP read error: {e}")
import traceback
traceback.print_exc()
return None
def send_email_smtp(from_addr, password, to_addr, subject, body, content_type='plain', attachments=None):
"""使用 SMTP 發送郵件(支援附件)"""
try:
# 建立郵件
msg = MIMEMultipart('mixed') if attachments else MIMEMultipart('alternative')
msg['From'] = from_addr
msg['To'] = to_addr
msg['Subject'] = subject
# 建立郵件內容部分
if attachments:
# 有附件時,先建立 alternative 部分
msg_alternative = MIMEMultipart('alternative')
if content_type == 'html':
text_content = body.replace('<br>', '\n').replace('<[^>]*>', '')
msg_alternative.attach(MIMEText(text_content, 'plain', 'utf-8'))
msg_alternative.attach(MIMEText(body, 'html', 'utf-8'))
else:
msg_alternative.attach(MIMEText(body, 'plain', 'utf-8'))
msg.attach(msg_alternative)
else:
# 無附件時,直接附加內容
if content_type == 'html':
text_content = body.replace('<br>', '\n').replace('<[^>]*>', '')
msg.attach(MIMEText(text_content, 'plain', 'utf-8'))
msg.attach(MIMEText(body, 'html', 'utf-8'))
else:
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# 附加檔案
if attachments:
from email.mime.base import MIMEBase
from email import encoders
import mimetypes
for attachment in attachments:
# 猜測 MIME 類型
mime_type, _ = mimetypes.guess_type(attachment['filename'])
if mime_type is None:
mime_type = 'application/octet-stream'
main_type, sub_type = mime_type.split('/', 1)
part = MIMEBase(main_type, sub_type)
part.set_payload(attachment['content'])
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename="{attachment["filename"]}"')
msg.attach(part)
# 連接 SMTP (內部網路,不使用 TLS)
smtp = smtplib.SMTP('mailserver', 587)
smtp.login(from_addr, password)
smtp.send_message(msg)
smtp.quit()
attachment_count = len(attachments) if attachments else 0
logger.info(f"Email sent from {from_addr} to {to_addr} (type: {content_type}, attachments: {attachment_count})")
return True
except Exception as e:
logger.error(f"SMTP error: {e}")
import traceback
traceback.print_exc()
return False
# ===== 根路徑重定向 =====
@app.get("/")
async def root():
"""WebMail 服務說明頁面"""
return HTMLResponse("""<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebMail Service - 匠耘虛擬辦公室</title>
<style>
body {
font-family: system-ui, -apple-system, sans-serif;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
margin: 0;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}
.info-box {
background: white;
padding: 3rem;
border-radius: 1rem;
box-shadow: 0 20px 60px rgba(0,0,0,0.3);
text-align: center;
max-width: 500px;
}
.logo {
margin-bottom: 1.5rem;
}
.logo img {
max-width: 200px;
height: auto;
}
.service-icon {
font-size: 3rem;
margin-bottom: 1rem;
}
h1 {
color: #1a73e8;
margin: 0 0 0.5rem 0;
font-size: 1.8rem;
}
.subtitle {
color: #888;
font-size: 0.9rem;
margin-bottom: 1rem;
}
p {
color: #666;
margin: 0.5rem 0;
line-height: 1.6;
}
.usage {
background: #f8f9fa;
padding: 1rem;
border-radius: 0.5rem;
margin-top: 1.5rem;
text-align: left;
}
.usage code {
color: #1a73e8;
font-weight: 600;
}
.footer {
margin-top: 1.5rem;
padding-top: 1rem;
border-top: 1px solid #e0e0e0;
color: #999;
font-size: 0.85rem;
}
</style>
</head>
<body>
<div class="info-box">
<div class="logo">
<img src="https://porscheworld.tw/wp-content/uploads/2025/01/匠耘logo.png" alt="匠耘 Logo">
</div>
<div class="service-icon">📧</div>
<h1>電子郵件服務</h1>
<div class="subtitle">Virtual Office - Email Service</div>
<p>企業租戶專屬的郵件服務</p>
<div class="usage">
<p><strong>服務位址:</strong></p>
<p><code>https://webmail.lab.taipei/{realm}</code></p>
<p style="margin-top: 1rem; font-size: 0.9rem; color: #888;">
* realm 為貴公司的租戶代碼
</p>
</div>
<div class="usage" style="margin-top: 1rem;">
<p><strong>虛擬辦公室完整服務:</strong></p>
<p style="font-size: 0.9rem; line-height: 2;">
🏢 <a href="https://mana.lab.taipei" style="color: #1a73e8; text-decoration: none;">管理入口</a><br>
📧 <a href="https://webmail.lab.taipei" style="color: #1a73e8; text-decoration: none;">電子郵件</a><br>
📅 <a href="https://calendar.lab.taipei" style="color: #1a73e8; text-decoration: none;">日曆服務</a><br>
💾 <a href="https://drive.lab.taipei" style="color: #1a73e8; text-decoration: none;">硬碟服務</a><br>
📄 <a href="https://office.lab.taipei" style="color: #1a73e8; text-decoration: none;">Office 服務</a>
</p>
</div>
<div class="footer">
© 2025 匠耘科技股份有限公司<br>
Porsche World Corporation
</div>
</div>
</body>
</html>""")
@app.get("/{tenant_code}")
async def tenant_login_page(tenant_code: str, request: Request):
"""
租戶登入頁面 - 自動導向 Keycloak
URL: https://webmail.lab.taipei/porsche1
"""
# 查詢租戶資訊
tenant = get_tenant_by_code(tenant_code)
if not tenant:
return HTMLResponse(
content=f"""
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>租戶不存在</title>
<style>
body {{
font-family: system-ui, -apple-system, sans-serif;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
margin: 0;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}}
.error-box {{
background: white;
padding: 3rem;
border-radius: 1rem;
box-shadow: 0 20px 60px rgba(0,0,0,0.3);
text-align: center;
max-width: 400px;
}}
.error-icon {{
font-size: 4rem;
margin-bottom: 1rem;
}}
h1 {{
color: #e74c3c;
margin: 0 0 1rem 0;
font-size: 1.5rem;
}}
p {{
color: #666;
margin: 0;
line-height: 1.6;
}}
</style>
</head>
<body>
<div class="error-box">
<div class="error-icon">❌</div>
<h1>租戶不存在</h1>
<p>找不到租戶「{tenant_code}」<br>請確認網址是否正確</p>
</div>
</body>
</html>
""",
status_code=404
)
# 生成 PKCE
code_verifier, code_challenge = generate_pkce_pair()
# 儲存 code_verifier 到 Redis
pkce_key = f"pkce:{tenant_code}:{secrets.token_urlsafe(16)}"
redis_client.setex(pkce_key, 600, code_verifier)
logger.info(f"Generated PKCE for {tenant_code}: key={pkce_key}")
# 建立 Keycloak 認證 URL (含 PKCE)
realm = tenant.get('realm', tenant.get('keycloak_realm', tenant['code']))
client_id = "webmail"
redirect_uri = f"https://webmail.lab.taipei/{tenant_code}/callback"
auth_url = (
f"{KEYCLOAK_SERVER_URL}/realms/{realm}/protocol/openid-connect/auth"
f"?client_id={client_id}"
f"&redirect_uri={redirect_uri}"
f"&response_type=code"
f"&scope=openid+email+profile"
f"&state={pkce_key}"
f"&code_challenge={code_challenge}"
f"&code_challenge_method=S256"
)
logger.info(f"Redirecting {tenant_code} to Keycloak: {auth_url}")
# 重定向到 Keycloak
return RedirectResponse(url=auth_url)
# ===== 租戶 Callback =====
@app.get("/{tenant_code}/callback")
async def tenant_callback(
tenant_code: str,
code: Optional[str] = None,
state: Optional[str] = None,
error: Optional[str] = None,
error_description: Optional[str] = None,
request: Request = None
):
"""
Keycloak 回調處理 - Token 交換 (含 PKCE 驗證)
URL: https://webmail.lab.taipei/porsche1/callback?code=xxx&state=pkce:xxx
"""
# 錯誤處理
if error:
logger.error(f"Keycloak error: {error} - {error_description}")
return JSONResponse(content={"detail": f"認證錯誤: {error_description}"}, status_code=400)
if not code:
return JSONResponse(content={"detail": "Missing authorization code"}, status_code=400)
# 從 Redis 取回 code_verifier
pkce_key = state
code_verifier = redis_client.get(pkce_key)
if not code_verifier:
logger.error(f"PKCE key not found: {pkce_key}")
return JSONResponse(content={"detail": "PKCE 驗證失敗或已過期,請重新登入"}, status_code=400)
logger.info(f"Retrieved code_verifier from Redis: key={pkce_key}")
try:
# 1. 查詢租戶資訊
tenant = get_tenant_by_code(tenant_code)
if not tenant:
raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_code}")
realm = tenant.get('realm', tenant.get('keycloak_realm', tenant['code']))
logger.info(f"Processing callback for tenant: {tenant_code}, realm: {realm}")
# 2. 交換 Token
logger.info(f"Exchanging authorization code for access token")
token_url = f"{KEYCLOAK_SERVER_URL}/realms/{realm}/protocol/openid-connect/token"
redirect_uri = f"https://webmail.lab.taipei/{tenant_code}/callback"
# Public Client + PKCE
token_data = {
"grant_type": "authorization_code",
"client_id": "webmail",
"code": code,
"redirect_uri": redirect_uri,
"code_verifier": code_verifier
}
import requests
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
token_response = requests.post(token_url, data=token_data, verify=False, timeout=10)
if token_response.status_code != 200:
logger.error(f"Token exchange failed: {token_response.status_code} - {token_response.text}")
raise HTTPException(status_code=400, detail=f"Token exchange failed: {token_response.text}")
token_result = token_response.json()
access_token = token_result.get("access_token")
refresh_token = token_result.get("refresh_token")
if not access_token:
raise HTTPException(status_code=400, detail="No access token received")
# 刪除已使用的 PKCE key
redis_client.delete(pkce_key)
logger.info(f"✓ Access token received, PKCE key deleted: {pkce_key}")
# 3. 取得使用者資訊
logger.info("Fetching user info from Keycloak")
userinfo_url = f"{KEYCLOAK_SERVER_URL}/realms/{realm}/protocol/openid-connect/userinfo"
headers = {"Authorization": f"Bearer {access_token}"}
userinfo_response = requests.get(userinfo_url, headers=headers, verify=False, timeout=10)
if userinfo_response.status_code != 200:
logger.error(f"Userinfo fetch failed: {userinfo_response.status_code} - {userinfo_response.text}")
raise HTTPException(status_code=400, detail="Failed to fetch user info")
user_info = userinfo_response.json()
sso_uuid = user_info.get("sub")
username = user_info.get("preferred_username")
email = user_info.get("email")
logger.info(f"✓ User authenticated: {username} (email: {email}, uuid: {sso_uuid})")
# 4. 查詢郵件帳號
logger.info(f"Looking up mail account for SSO UUID: {sso_uuid}")
account = get_account_by_sso_uuid_and_realm(sso_uuid, realm)
if not account:
logger.error(f"No mail account found for SSO UUID: {sso_uuid} in realm: {realm}")
return HTMLResponse(
content=f"""
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>郵件帳號不存在</title>
<style>
body {{
font-family: system-ui, -apple-system, sans-serif;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
margin: 0;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}}
.error-box {{
background: white;
padding: 3rem;
border-radius: 1rem;
box-shadow: 0 20px 60px rgba(0,0,0,0.3);
text-align: center;
max-width: 400px;
}}
h1 {{
color: #e74c3c;
margin: 0 0 1rem 0;
}}
p {{
color: #666;
line-height: 1.6;
margin: 0.5rem 0;
}}
.back-link {{
display: inline-block;
margin-top: 1.5rem;
color: #667eea;
text-decoration: none;
font-weight: 500;
}}
.back-link:hover {{
text-decoration: underline;
}}
</style>
</head>
<body>
<div class="error-box">
<h1>❌ 郵件帳號不存在</h1>
<p><strong>使用者:</strong> {username}</p>
<p><strong>租戶:</strong> {tenant_code}</p>
<p>您的帳號尚未開通郵件服務</p>
<p>請聯絡系統管理員</p>
<a href="/{tenant_code}" class="back-link">← 返回登入頁面</a>
</div>
</body>
</html>
""",
status_code=400
)
logger.info(f"✓ Mail account found: {account['email']}")
# 5. 建立 Session
logger.info(f"Creating session for user: {username}")
# 生成 session_id
import secrets
session_id = secrets.token_urlsafe(32)
# 將使用者資料存入 Redis
session_data = {
"session_id": session_id,
"email": account['email'],
"password": account['password'],
"username": username,
"sso_uuid": sso_uuid,
"realm": realm,
"tenant_code": tenant_code,
"access_token": access_token,
"refresh_token": refresh_token,
"account_code": account['account_code']
}
redis_client.setex(
f"webmail:tenant_session:{session_id}",
3600 * 24, # 24 小時
json.dumps(session_data)
)
# 將 session_id 存入 Cookie
request.session['tenant_session_id'] = session_id
request.session['tenant_code'] = tenant_code
logger.info(f"✓ Session created: {session_id}")
# 6. 重定向到收件匣
logger.info(f"Redirecting to inbox: /{tenant_code}/inbox")
return RedirectResponse(url=f"/{tenant_code}/inbox", status_code=302)
except HTTPException:
raise
except Exception as e:
logger.error(f"Callback error: {e}")
import traceback
traceback.print_exc()
return HTMLResponse(
content=f"""
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>登入失敗</title>
<style>
body {{
font-family: system-ui, -apple-system, sans-serif;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
margin: 0;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}}
.error-box {{
background: white;
padding: 3rem;
border-radius: 1rem;
box-shadow: 0 20px 60px rgba(0,0,0,0.3);
text-align: center;
max-width: 500px;
}}
h1 {{
color: #e74c3c;
margin: 0 0 1rem 0;
}}
p {{
color: #666;
line-height: 1.6;
margin: 0.5rem 0;
}}
.error-detail {{
background: #f8f9fa;
padding: 1rem;
border-radius: 0.5rem;
margin-top: 1rem;
font-family: monospace;
font-size: 0.875rem;
text-align: left;
color: #333;
max-height: 200px;
overflow-y: auto;
}}
.back-link {{
display: inline-block;
margin-top: 1.5rem;
color: #667eea;
text-decoration: none;
font-weight: 500;
}}
.back-link:hover {{
text-decoration: underline;
}}
</style>
</head>
<body>
<div class="error-box">
<h1>❌ 登入處理失敗</h1>
<p><strong>租戶:</strong> {tenant_code}</p>
<p>無法完成登入流程</p>
<div class="error-detail">
{str(e)}
</div>
<a href="/{tenant_code}" class="back-link">← 返回登入頁面</a>
</div>
</body>
</html>
""",
status_code=500
)
@app.get("/{tenant_code}/compose")
async def tenant_compose_form(tenant_code: str, request: Request):
"""一般租戶撰寫郵件"""
tenant_session_id = request.session.get('tenant_session_id')
if not tenant_session_id:
return RedirectResponse(url=f"/{tenant_code}")
session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}")
if not session_data_json:
return RedirectResponse(url=f"/{tenant_code}")
session_data = json.loads(session_data_json)
if session_data.get('tenant_code') != tenant_code:
return RedirectResponse(url=f"/{tenant_code}")
username = session_data.get('username')
return render_compose_form(
name=username,
back_url=f"/{tenant_code}/inbox",
send_url=f"/{tenant_code}/send",
tenant_code=tenant_code
)
@app.get("/{tenant_code}/inbox")
async def tenant_inbox(tenant_code: str, request: Request, folder: str = "INBOX"):
"""
租戶收件匣頁面 - 使用統一介面
URL: https://webmail.lab.taipei/porsche1/inbox
"""
# 檢查租戶 Session
tenant_session_id = request.session.get('tenant_session_id')
if not tenant_session_id:
logger.info(f"No tenant session found, redirecting to login")
return RedirectResponse(url=f"/{tenant_code}", status_code=302)
# 從 Redis 取得租戶 Session 資料
session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}")
if not session_data_json:
logger.info(f"Tenant session expired, redirecting to login")
request.session.clear()
return RedirectResponse(url=f"/{tenant_code}", status_code=302)
session_data = json.loads(session_data_json)
# 驗證 tenant_code
if session_data.get('tenant_code') != tenant_code:
logger.warning(f"Tenant code mismatch")
return RedirectResponse(url=f"/{tenant_code}", status_code=302)
# 從 session 取得使用者和郵件帳號資訊
mail_email = session_data.get('email')
mail_password = session_data.get('password')
username = session_data.get('username')
logger.info(f"Loading inbox for tenant user: {username} ({mail_email}), folder: {folder}")
# 呼叫統一的 render_inbox 函數
return render_inbox(
request=request,
folder=folder,
mail_email=mail_email,
mail_password=mail_password,
username=username,
name=None,
session_id=tenant_session_id,
tenant_code=tenant_code,
is_management_tenant=False
)
# ===== 3. 發送郵件 API (一般租戶) =====
@app.post("/{tenant_code}/send")
async def tenant_send_mail(
tenant_code: str,
request: Request,
to: str = Form(...),
subject: str = Form(...),
body: str = Form(...),
content_type: str = Form('plain'),
attachments: list[UploadFile] = File(default=[])
):
"""一般租戶發送郵件"""
tenant_session_id = request.session.get('tenant_session_id')
if not tenant_session_id:
return JSONResponse({"error": "未登入"}, status_code=401)
session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}")
if not session_data_json:
return JSONResponse({"error": "Session 已過期"}, status_code=401)
session_data = json.loads(session_data_json)
if session_data.get('tenant_code') != tenant_code:
return JSONResponse({"error": "租戶不符"}, status_code=403)
# 從 session 取得郵件帳號資訊
mail_email = session_data.get('email')
mail_password = session_data.get('password')
if not mail_email or not mail_password:
return JSONResponse({"error": "找不到郵件帳號"}, status_code=404)
# 處理附件
attachment_list = []
if attachments:
for upload_file in attachments:
if upload_file.filename:
content = await upload_file.read()
attachment_list.append({
'filename': upload_file.filename,
'content': content
})
# 發送郵件
success = send_email_smtp(
mail_email,
mail_password,
to,
subject,
body,
content_type,
attachment_list if attachment_list else None
)
if success:
return JSONResponse({"success": True, "message": "郵件發送成功"})
else:
return JSONResponse({"success": False, "error": "郵件發送失敗"}, status_code=500)
# ===== 4. 主題設定 API (管理租戶和一般租戶) =====
@app.post("/theme")
async def set_theme(request: Request, theme: str = Form(...)):
"""管理租戶設定布景主題"""
session_id = request.session.get("session_id")
if session_id:
redis_client.hset(f"webmail:settings:{session_id}", "theme", theme)
return RedirectResponse(url="/", status_code=303)
@app.post("/{tenant_code}/theme")
async def tenant_set_theme(tenant_code: str, request: Request, theme: str = Form(...)):
"""一般租戶設定布景主題"""
tenant_session_id = request.session.get('tenant_session_id')
if tenant_session_id:
redis_client.hset(f"webmail:settings:{tenant_session_id}", "theme", theme)
return RedirectResponse(url=f"/{tenant_code}/inbox", status_code=303)
# ===== 5. 登出功能 (一般租戶) =====
@app.get("/{tenant_code}/logout")
async def tenant_logout(tenant_code: str, request: Request):
"""一般租戶登出"""
tenant_session_id = request.session.get('tenant_session_id')
if tenant_session_id:
redis_client.delete(f"webmail:tenant_session:{tenant_session_id}")
redis_client.delete(f"webmail:settings:{tenant_session_id}")
request.session.clear()
return HTMLResponse(f"""<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>已登出</title>
<style>
body {{
font-family: system-ui, -apple-system, sans-serif;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
margin: 0;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}}
.logout-box {{
background: white;
padding: 3rem;
border-radius: 1rem;
box-shadow: 0 20px 60px rgba(0,0,0,0.3);
text-align: center;
max-width: 400px;
}}
h1 {{
color: #333;
margin: 0 0 1rem 0;
}}
p {{
color: #666;
line-height: 1.6;
margin: 0.5rem 0;
}}
.login-link {{
display: inline-block;
margin-top: 1.5rem;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 0.75rem 2rem;
border-radius: 0.5rem;
text-decoration: none;
font-weight: 500;
}}
.login-link:hover {{
box-shadow: 0 4px 12px rgba(102, 126, 234, 0.4);
}}
</style>
</head>
<body>
<div class="logout-box">
<h1>✅ 已成功登出</h1>
<p><strong>租戶:</strong> {tenant_code}</p>
<p>您已安全登出系統</p>
<a href="/{tenant_code}" class="login-link">← 重新登入</a>
</div>
</body>
</html>
""")
# ===== 6. 郵件查看和刪除 API (一般租戶) =====
@app.get("/{tenant_code}/api/mail/{{mail_id}}")
async def tenant_get_mail(tenant_code: str, mail_id: str, folder: str, request: Request):
"""一般租戶取得單封郵件內容"""
tenant_session_id = request.session.get('tenant_session_id')
if not tenant_session_id:
return JSONResponse({"error": "未登入"}, status_code=401)
session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}")
if not session_data_json:
return JSONResponse({"error": "Session 已過期"}, status_code=401)
session_data = json.loads(session_data_json)
if session_data.get('tenant_code') != tenant_code:
return JSONResponse({"error": "租戶不符"}, status_code=403)
mail_email = session_data.get('email')
mail_password = session_data.get('password')
try:
mail_detail = get_mail_detail(mail_email, mail_password, mail_id, folder)
return JSONResponse(mail_detail)
except Exception as e:
logger.error(f"Error getting mail: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@app.post("/{tenant_code}/api/delete-mails")
async def tenant_delete_mails(tenant_code: str, request: Request):
"""一般租戶刪除郵件"""
tenant_session_id = request.session.get('tenant_session_id')
if not tenant_session_id:
return JSONResponse({"error": "未登入"}, status_code=401)
session_data_json = redis_client.get(f"webmail:tenant_session:{tenant_session_id}")
if not session_data_json:
return JSONResponse({"error": "Session 已過期"}, status_code=401)
session_data = json.loads(session_data_json)
if session_data.get('tenant_code') != tenant_code:
return JSONResponse({"error": "租戶不符"}, status_code=403)
mail_email = session_data.get('email')
mail_password = session_data.get('password')
try:
data = await request.json()
mail_ids = data.get('mail_ids', [])
folder = data.get('folder', 'INBOX')
# 刪除郵件
success = delete_emails(mail_email, mail_password, mail_ids, folder)
if success:
return JSONResponse({"success": True, "message": f"已刪除 {len(mail_ids)} 封郵件"})
else:
return JSONResponse({"success": False, "error": "刪除失敗"}, status_code=500)
except Exception as e:
logger.error(f"Error deleting mails: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=10180)