[Phase 0 Reset]
- 清除舊版 app/、alembic/versions/、雜亂測試腳本
- 新 requirements.txt (移除 caldav/redis/keycloak-lib,加入 apscheduler/croniter/docker/paramiko/ping3/dnspython)
[Phase 1 資料庫]
- 9 張資料表 SQLAlchemy Models:tenants / accounts / schedules / schedule_logs /
tenant_schedule_results / account_schedule_results / servers / server_status_logs / system_status_logs
- Alembic migration 001_create_all_tables (已套用到 10.1.0.20:5433/virtual_mis)
- seed.py:schedules 初始 3 筆 / servers 初始 4 筆
[Phase 2 CRUD API]
- GET/POST/PUT/DELETE: /api/v1/tenants / accounts / servers / schedules
- /api/v1/system-status
- 帳號編碼自動產生 (prefix + seq_no 4碼左補0)
- 燈號 (lights) 從最新排程結果取得
[Phase 3 Watchdog]
- APScheduler interval 3分鐘,原子 UPDATE status=Going 防重複執行
- 手動觸發 API: POST /api/v1/schedules/{id}/run
[Phase 4 Service Clients]
- KeycloakClient:vmis-admin realm,REST API (不用 python-keycloak)
- MailClient:Docker Mailserver @ 10.1.0.254:8080,含 MX DNS 驗證
- DockerClient:docker-py 本機 + paramiko SSH 遠端 compose
- NextcloudClient:OCS API user/quota
- SystemChecker:功能驗證 (traefik routers>0 / keycloak token / SMTP EHLO / DB SELECT 1 / ping)
[TDD]
- 37 tests / 37 passed (2.11s)
- SQLite in-memory + StaticPool,無需外部 DB
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
111 lines
4.0 KiB
Python
111 lines
4.0 KiB
Python
"""TDD: Server CRUD + availability API tests"""
|
|
|
|
|
|
def test_create_server(client):
|
|
payload = {
|
|
"name": "home",
|
|
"ip_address": "10.1.0.254",
|
|
"description": "核心服務主機",
|
|
"sort_order": 1,
|
|
}
|
|
resp = client.post("/api/v1/servers", json=payload)
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["name"] == "home"
|
|
assert data["ip_address"] == "10.1.0.254"
|
|
assert data["sort_order"] == 1
|
|
assert data["is_active"] is True
|
|
assert data["last_result"] is None
|
|
# New server has no status logs: availability is None (not yet computed)
|
|
assert data["availability"] is None
|
|
|
|
|
|
def test_create_server_duplicate_ip(client):
|
|
client.post("/api/v1/servers", json={"name": "A", "ip_address": "10.1.0.10"})
|
|
resp = client.post("/api/v1/servers", json={"name": "B", "ip_address": "10.1.0.10"})
|
|
assert resp.status_code == 409
|
|
|
|
|
|
def test_list_servers_ordered_by_sort_order(client):
|
|
client.post("/api/v1/servers", json={"name": "C", "ip_address": "10.1.0.30", "sort_order": 3})
|
|
client.post("/api/v1/servers", json={"name": "A", "ip_address": "10.1.0.10", "sort_order": 1})
|
|
client.post("/api/v1/servers", json={"name": "B", "ip_address": "10.1.0.20", "sort_order": 2})
|
|
resp = client.get("/api/v1/servers")
|
|
assert resp.status_code == 200
|
|
servers = resp.json()
|
|
assert [s["name"] for s in servers] == ["A", "B", "C"]
|
|
|
|
|
|
def test_get_server(client):
|
|
r = client.post("/api/v1/servers", json={"name": "nas", "ip_address": "10.1.0.20", "sort_order": 2})
|
|
sid = r.json()["id"]
|
|
resp = client.get(f"/api/v1/servers/{sid}")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["ip_address"] == "10.1.0.20"
|
|
|
|
|
|
def test_get_server_not_found(client):
|
|
resp = client.get("/api/v1/servers/99999")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
def test_update_server(client):
|
|
r = client.post("/api/v1/servers", json={"name": "old", "ip_address": "10.1.0.50"})
|
|
sid = r.json()["id"]
|
|
resp = client.put(f"/api/v1/servers/{sid}", json={"name": "new", "sort_order": 5})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["name"] == "new"
|
|
assert data["sort_order"] == 5
|
|
|
|
|
|
def test_delete_server(client):
|
|
r = client.post("/api/v1/servers", json={"name": "temp", "ip_address": "10.1.0.99"})
|
|
sid = r.json()["id"]
|
|
resp = client.delete(f"/api/v1/servers/{sid}")
|
|
assert resp.status_code == 204
|
|
resp = client.get(f"/api/v1/servers/{sid}")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
def test_server_availability_with_logs(client, db):
|
|
"""驗證可用率計算(有歷史紀錄)"""
|
|
from datetime import datetime
|
|
from app.models.schedule import Schedule, ScheduleLog
|
|
from app.models.server import Server, ServerStatusLog
|
|
|
|
# Create schedule and log
|
|
s = Schedule(id=10, name="test", cron_timer="0 * * * * *", status="Waiting", recorded_at=datetime.utcnow())
|
|
db.add(s)
|
|
slog = ScheduleLog(schedule_id=10, schedule_name="test", started_at=datetime.utcnow(), status="ok")
|
|
db.add(slog)
|
|
db.commit()
|
|
db.refresh(slog)
|
|
|
|
# Create server
|
|
r = client.post("/api/v1/servers", json={"name": "pingable", "ip_address": "10.1.0.100"})
|
|
server_id = r.json()["id"]
|
|
|
|
# Insert 3 ok + 1 fail status logs with distinct timestamps to ensure ordering
|
|
from datetime import timedelta
|
|
base_time = datetime.utcnow()
|
|
for i, ok in enumerate([True, True, True, False]):
|
|
db.add(ServerStatusLog(
|
|
schedule_log_id=slog.id, server_id=server_id,
|
|
result=ok, response_time=12.5 if ok else None,
|
|
recorded_at=base_time + timedelta(seconds=i), # ascending timestamps
|
|
))
|
|
db.commit()
|
|
|
|
resp = client.get(f"/api/v1/servers/{server_id}")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["last_result"] is False # last inserted (highest id, latest time) was False
|
|
assert data["availability"]["availability_30d"] == 75.0
|
|
|
|
|
|
def test_system_status_empty(client):
|
|
resp = client.get("/api/v1/system-status")
|
|
assert resp.status_code == 200
|
|
assert resp.json() == []
|