Files
vmis/backend/app/services/scheduler/schedule_account.py
VMIS Developer cf1e26c2e9 revert: all tenants (including manager) need NC+OO containers
Manager tenant is a real company tenant with employees who need
NC Drive + OO. is_manager only controls admin portal access and
Traefik route inclusion, not whether NC/OO infrastructure is needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 22:33:25 +08:00

188 lines
7.6 KiB
Python

"""
Schedule 2 — 帳號檢查(每 3 分鐘)
檢查每個 active 帳號的: SSO使用者 / Mailbox / NC使用者 / Quota
"""
import logging
from datetime import datetime
from app.core.utils import now_tw
from sqlalchemy.orm import Session
from app.models.account import Account
from app.models.result import AccountScheduleResult
logger = logging.getLogger(__name__)
def run_account_check(schedule_log_id: int, db: Session):
from app.services.mail_client import MailClient
from app.services.nextcloud_client import NextcloudClient
from app.services.keycloak_client import get_keycloak_client
accounts = (
db.query(Account)
.filter(Account.is_active == True)
.all()
)
kc = get_keycloak_client()
mail = MailClient()
for account in accounts:
tenant = account.tenant
realm = tenant.keycloak_realm or tenant.code
result = AccountScheduleResult(
schedule_log_id=schedule_log_id,
account_id=account.id,
sso_account=account.sso_account,
recorded_at=now_tw(),
)
fail_reasons = []
# [1] SSO user check
try:
sso_uuid = kc.get_user_uuid(realm, account.sso_account)
if sso_uuid:
result.sso_result = True
result.sso_uuid = sso_uuid
if not account.sso_uuid:
account.sso_uuid = sso_uuid
else:
kc_email = account.notification_email or account.email
sso_uuid = kc.create_user(realm, account.sso_account, kc_email, account.default_password)
result.sso_result = sso_uuid is not None
result.sso_uuid = sso_uuid
if sso_uuid and not account.sso_uuid:
account.sso_uuid = sso_uuid
# 新使用者:寄送歡迎信(含設定密碼連結)
if account.notification_email:
kc.send_welcome_email(realm, sso_uuid)
result.sso_done_at = now_tw()
except Exception as e:
result.sso_result = False
result.sso_done_at = now_tw()
fail_reasons.append(f"sso: {e}")
# [2] Mailbox check (skip if mail domain not ready)
try:
email = account.email or f"{account.sso_account}@{tenant.domain}"
mb_exists = mail.mailbox_exists(email)
if mb_exists:
result.mailbox_result = True
else:
created = mail.create_mailbox(email, account.default_password, account.quota_limit)
result.mailbox_result = created
result.mailbox_done_at = now_tw()
except Exception as e:
result.mailbox_result = False
result.mailbox_done_at = now_tw()
fail_reasons.append(f"mailbox: {e}")
# [3] NC user check
try:
from app.core.config import settings as _cfg
nc = NextcloudClient(tenant.domain, _cfg.NC_ADMIN_USER, _cfg.NC_ADMIN_PASSWORD)
nc_exists = nc.user_exists(account.sso_account)
if nc_exists:
result.nc_result = True
else:
created = nc.create_user(account.sso_account, account.default_password, account.quota_limit)
result.nc_result = created
if result.nc_result and account.quota_limit:
nc.set_user_quota(account.sso_account, account.quota_limit)
result.nc_done_at = now_tw()
except Exception as e:
result.nc_result = False
result.nc_done_at = now_tw()
fail_reasons.append(f"nc: {e}")
# [3.5] NC 台灣國定假日行事曆訂閱
TW_HOLIDAYS_ICS_URL = "https://www.officeholidays.com/ics-clean/taiwan"
if result.nc_result:
try:
cal_ok = nc.subscribe_calendar(
account.sso_account,
"taiwan-public-holidays",
"台灣國定假日",
TW_HOLIDAYS_ICS_URL,
"#e9322d",
)
if cal_ok:
logger.info(f"NC calendar subscribed [{account.sso_account}]: taiwan-public-holidays")
else:
logger.warning(f"NC calendar subscribe failed [{account.sso_account}]")
except Exception as e:
logger.warning(f"NC calendar subscribe error [{account.sso_account}]: {e}")
# [4] NC Mail 帳號設定
try:
if result.nc_result and result.mailbox_result:
import paramiko
from app.core.config import settings as _cfg
is_active = tenant.status == "active"
nc_container = f"nc-{tenant.code}" if is_active else f"nc-{tenant.code}-test"
email = account.email or f"{account.sso_account}@{tenant.domain}"
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(_cfg.DOCKER_SSH_HOST, username=_cfg.DOCKER_SSH_USER, timeout=15)
def _ssh_run(cmd, timeout=60):
_, stdout, _ = ssh.exec_command(cmd)
stdout.channel.settimeout(timeout)
try:
out = stdout.read().decode().strip()
except Exception:
out = ""
return out
export_out = _ssh_run(
f"docker exec -u www-data {nc_container} "
f"php /var/www/html/occ mail:account:export {account.sso_account} 2>/dev/null"
)
already_set = len(export_out) > 10 and "imap" in export_out.lower()
if already_set:
result.nc_mail_result = True
else:
display = account.legal_name or account.sso_account
create_cmd = (
f"docker exec -u www-data {nc_container} "
f"php /var/www/html/occ mail:account:create "
f"'{account.sso_account}' '{display}' '{email}' "
f"10.1.0.254 143 none '{email}' '{account.default_password}' "
f"10.1.0.254 587 none '{email}' '{account.default_password}' 2>&1"
)
out_text = _ssh_run(create_cmd)
logger.info(f"NC mail:account:create [{account.sso_account}]: {out_text}")
result.nc_mail_result = (
"error" not in out_text.lower() and "exception" not in out_text.lower()
)
ssh.close()
else:
result.nc_mail_result = False
fail_reasons.append(
f"nc_mail: skipped (nc={result.nc_result}, mailbox={result.mailbox_result})"
)
result.nc_mail_done_at = now_tw()
except Exception as e:
result.nc_mail_result = False
result.nc_mail_done_at = now_tw()
fail_reasons.append(f"nc_mail: {e}")
# [5] Quota
try:
nc = NextcloudClient(tenant.domain, _cfg.NC_ADMIN_USER, _cfg.NC_ADMIN_PASSWORD)
result.quota_usage = nc.get_user_quota_used_gb(account.sso_account)
except Exception as e:
logger.warning(f"Quota check failed for {account.account_code}: {e}")
if fail_reasons:
result.fail_reason = "; ".join(fail_reasons)
db.add(result)
db.commit()
db.flush()
logger.info(f"Account check done: {len(accounts)} accounts processed")