feat(L+M): configurable dashboard widget system + test framework
Phase L: Dashboard widget system - Migration 046: dashboard_configs table (user/tenant/role fallback cascade) - DashboardConfig model + dashboard_service with get/upsert per-user and tenant-default - API router: GET/PUT /api/dashboard/config, GET/PUT /api/dashboard/tenant-default - Frontend: 5 widget components (ProductionStats, QueueStatus, RecentRenders, CostOverview, WorkerStatus) - DashboardGrid with API-backed config, DashboardCustomizeModal (checkbox selection, save/cancel) - Dashboard.tsx: widget grid + analytics section (privileged users) - Admin.tsx: restructured with new section order and Maintenance 2-col grid Phase M: Test framework - Backend: pytest-asyncio + pytest-cov + factory-boy in pyproject.toml - conftest.py: excel_dir fixtures + async DB fixtures + mock storage/celery stubs - Domain tests: billing_service, cache_service, notifications_service, imports_sanity - Integration: test_api_health.py smoke test (requires running backend) - Frontend: vitest + @testing-library/react + msw added to package.json - vite.config.ts: test block (jsdom + globals + setupFiles) - tsconfig.json: exclude src/__tests__ from main tsc (test runner handles its own types) - MSW handlers for /api/auth/me, Billing.test.tsx, formatters.test.ts Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -109,3 +109,140 @@ def parsed_linear_schiene(parsed_excel_all):
|
||||
@pytest.fixture(scope="session")
|
||||
def parsed_anschlagplatten(parsed_excel_all):
|
||||
return parsed_excel_all["Anschlagplatten"]
|
||||
|
||||
|
||||
# ── Test-DB (nutzt separate Test-Datenbank) ──────────────────────────────────
|
||||
|
||||
import os
|
||||
import uuid
|
||||
import pytest_asyncio
|
||||
from typing import AsyncGenerator
|
||||
from httpx import AsyncClient, ASGITransport
|
||||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
||||
|
||||
TEST_DB_URL = os.environ.get(
|
||||
"TEST_DATABASE_URL",
|
||||
"postgresql+asyncpg://schaeffler:schaeffler@localhost:5432/schaeffler_test"
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_engine():
|
||||
from app.database import Base
|
||||
from sqlalchemy import text
|
||||
import app.models # noqa - register all models
|
||||
engine = create_async_engine(TEST_DB_URL, echo=False)
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
yield engine
|
||||
# Use CASCADE to handle circular FK dependencies in drop
|
||||
async with engine.begin() as conn:
|
||||
await conn.execute(text("DROP SCHEMA public CASCADE"))
|
||||
await conn.execute(text("CREATE SCHEMA public"))
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def db(test_engine) -> AsyncGenerator[AsyncSession, None]:
|
||||
session_factory = async_sessionmaker(test_engine, expire_on_commit=False)
|
||||
async with session_factory() as session:
|
||||
yield session
|
||||
await session.rollback()
|
||||
|
||||
|
||||
# ── Mock-Storage (LocalStorage in tmpdir) ───────────────────────────────────
|
||||
|
||||
@pytest.fixture
|
||||
def mock_storage(tmp_path, monkeypatch):
|
||||
"""Ersetzt MinIO-Storage durch lokalen tmpdir."""
|
||||
import app.core.storage as storage_module
|
||||
from unittest.mock import MagicMock
|
||||
mock = MagicMock()
|
||||
mock.exists.return_value = False
|
||||
mock.upload.return_value = "test/key"
|
||||
mock.download_bytes.return_value = b"fake-stl-data"
|
||||
mock.get_presigned_url.return_value = "http://localhost/presigned"
|
||||
monkeypatch.setattr(storage_module, "_storage_instance", mock)
|
||||
return mock
|
||||
|
||||
|
||||
# ── FastAPI Test-Client ──────────────────────────────────────────────────────
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def client(db) -> AsyncGenerator[AsyncClient, None]:
|
||||
"""HTTP-Client mit überschriebener DB-Dependency."""
|
||||
from app.main import app
|
||||
from app.database import get_db
|
||||
|
||||
async def override_get_db():
|
||||
yield db
|
||||
|
||||
app.dependency_overrides[get_db] = override_get_db
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac:
|
||||
yield ac
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
# ── Test-User + Auth-Token ───────────────────────────────────────────────────
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def admin_user(db):
|
||||
from app.domains.auth.models import User, UserRole
|
||||
from app.utils.auth import hash_password
|
||||
user = User(
|
||||
id=uuid.uuid4(),
|
||||
email=f"test-admin-{uuid.uuid4().hex[:8]}@test.com",
|
||||
password_hash=hash_password("TestPass123!"),
|
||||
full_name="Test Admin",
|
||||
role=UserRole.admin,
|
||||
is_active=True,
|
||||
)
|
||||
db.add(user)
|
||||
await db.commit()
|
||||
await db.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def admin_token(admin_user):
|
||||
from app.utils.auth import create_access_token
|
||||
return create_access_token(str(admin_user.id), admin_user.role.value)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auth_headers(admin_token):
|
||||
return {"Authorization": f"Bearer {admin_token}"}
|
||||
|
||||
|
||||
# ── Celery-Mock ──────────────────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_celery_tasks(monkeypatch):
|
||||
"""Verhindert echte Celery-Task-Dispatching in Tests."""
|
||||
from unittest.mock import MagicMock
|
||||
mock_result = MagicMock()
|
||||
mock_result.id = "test-task-" + str(uuid.uuid4())[:8]
|
||||
|
||||
def mock_delay(*args, **kwargs):
|
||||
return mock_result
|
||||
|
||||
task_paths = [
|
||||
"app.domains.materials.tasks.refresh_asset_library_catalog",
|
||||
"app.tasks.step_tasks.process_step_file",
|
||||
"app.tasks.step_tasks.render_step_thumbnail",
|
||||
"app.tasks.step_tasks.generate_stl_cache",
|
||||
"app.domains.imports.tasks.validate_excel_import",
|
||||
"app.domains.rendering.tasks.render_still_task",
|
||||
"app.domains.rendering.tasks.render_turntable_task",
|
||||
]
|
||||
for path in task_paths:
|
||||
try:
|
||||
parts = path.rsplit(".", 1)
|
||||
module_path, attr = parts
|
||||
import importlib
|
||||
module = importlib.import_module(module_path)
|
||||
task = getattr(module, attr, None)
|
||||
if task and hasattr(task, "delay"):
|
||||
monkeypatch.setattr(task, "delay", mock_delay)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
"""Tests for billing service."""
|
||||
import pytest
|
||||
from app.domains.billing.service import create_invoice, get_invoices
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_invoice_minimal(db, admin_user):
|
||||
"""Invoice kann mit Mindestdaten erstellt werden."""
|
||||
invoice = await create_invoice(
|
||||
db,
|
||||
tenant_id=None,
|
||||
order_line_ids=[],
|
||||
notes="Test invoice",
|
||||
)
|
||||
assert invoice.id is not None
|
||||
assert invoice.invoice_number.startswith("INV-")
|
||||
assert invoice.status == "draft"
|
||||
assert invoice.currency == "EUR"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invoice_number_sequential(db, admin_user):
|
||||
"""Invoice-Nummern sind sequenziell und eindeutig."""
|
||||
inv1 = await create_invoice(db, tenant_id=None, order_line_ids=[], notes="First")
|
||||
inv2 = await create_invoice(db, tenant_id=None, order_line_ids=[], notes="Second")
|
||||
assert inv1.invoice_number != inv2.invoice_number
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_invoices_returns_list(db):
|
||||
"""get_invoices gibt eine Liste zurück."""
|
||||
invoices = await get_invoices(db, tenant_id=None)
|
||||
assert isinstance(invoices, list)
|
||||
@@ -0,0 +1,41 @@
|
||||
"""Tests for STL conversion cache service (pure/unit — no DB needed)."""
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from app.domains.products.cache_service import compute_step_hash, _cache_key
|
||||
|
||||
|
||||
def test_compute_step_hash_stable(tmp_path):
|
||||
"""Gleiche Datei → gleicher Hash."""
|
||||
f = tmp_path / "test.stp"
|
||||
f.write_bytes(b"STEP data content")
|
||||
h1 = compute_step_hash(str(f))
|
||||
h2 = compute_step_hash(str(f))
|
||||
assert h1 == h2
|
||||
assert len(h1) == 64 # SHA256 hex
|
||||
|
||||
|
||||
def test_compute_step_hash_differs(tmp_path):
|
||||
"""Andere Datei → anderer Hash."""
|
||||
f1 = tmp_path / "a.stp"
|
||||
f1.write_bytes(b"content A")
|
||||
f2 = tmp_path / "b.stp"
|
||||
f2.write_bytes(b"content B")
|
||||
assert compute_step_hash(str(f1)) != compute_step_hash(str(f2))
|
||||
|
||||
|
||||
def test_cache_key_format():
|
||||
"""Cache-Key hat korrektes Format."""
|
||||
key = _cache_key("abc123", "low")
|
||||
assert key == "conversion-cache/abc123_low.stl"
|
||||
|
||||
|
||||
def test_store_stl_cache_uses_path(tmp_path, mock_storage):
|
||||
"""store_stl_cache übergibt Path-Objekt an storage.upload."""
|
||||
from app.domains.products.cache_service import store_stl_cache
|
||||
stl = tmp_path / "test.stl"
|
||||
stl.write_bytes(b"fake stl")
|
||||
store_stl_cache("abc123", "low", str(stl))
|
||||
call_args = mock_storage.upload.call_args
|
||||
assert call_args is not None
|
||||
first_arg = call_args[0][0]
|
||||
assert isinstance(first_arg, Path), f"Expected Path, got {type(first_arg)}"
|
||||
@@ -0,0 +1,28 @@
|
||||
"""Tests for Excel sanity check service (sync Celery service).
|
||||
|
||||
Note: run_sanity_check opens a real synchronous DB connection, so the
|
||||
meaningful tests are integration-only (marked accordingly). The unit-level
|
||||
test only verifies the import and signature.
|
||||
"""
|
||||
import pytest
|
||||
|
||||
|
||||
def test_run_sanity_check_importable():
|
||||
"""run_sanity_check can be imported without errors."""
|
||||
from app.domains.imports.service import run_sanity_check
|
||||
assert callable(run_sanity_check)
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_run_sanity_check_missing_file_does_not_crash():
|
||||
"""Sanity-check-Service gibt leeres Dict zurück bei nicht existierendem File (integration)."""
|
||||
from app.domains.imports.service import run_sanity_check
|
||||
import uuid
|
||||
import app.models # noqa - register all models so SQLAlchemy relationships resolve
|
||||
result = run_sanity_check(
|
||||
validation_id=str(uuid.uuid4()),
|
||||
excel_path="/nonexistent/test.xlsx",
|
||||
tenant_id=None,
|
||||
)
|
||||
# Service returns {} when validation not found
|
||||
assert isinstance(result, dict)
|
||||
@@ -0,0 +1,27 @@
|
||||
"""Tests for notification config service."""
|
||||
import pytest
|
||||
from app.domains.notifications.service import (
|
||||
upsert_notification_config,
|
||||
get_notification_configs,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upsert_creates_config(db, admin_user):
|
||||
"""Kann Notification-Config anlegen."""
|
||||
await upsert_notification_config(db, admin_user.id, "render_complete", "in_app", True)
|
||||
configs = await get_notification_configs(db, admin_user.id)
|
||||
assert len(configs) >= 1
|
||||
render_cfg = next((c for c in configs if c.event_type == "render_complete"), None)
|
||||
assert render_cfg is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upsert_updates_existing(db, admin_user):
|
||||
"""Update überschreibt bestehende Config."""
|
||||
await upsert_notification_config(db, admin_user.id, "order_submitted", "in_app", True)
|
||||
await upsert_notification_config(db, admin_user.id, "order_submitted", "in_app", False)
|
||||
configs = await get_notification_configs(db, admin_user.id)
|
||||
cfg = next((c for c in configs if c.event_type == "order_submitted"), None)
|
||||
assert cfg is not None
|
||||
assert cfg.enabled is False
|
||||
@@ -0,0 +1,25 @@
|
||||
"""Basic API smoke tests."""
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_health_endpoint(client):
|
||||
resp = await client.get("/health")
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_settings_requires_auth(client):
|
||||
"""Settings endpoint ohne Auth → 401/403."""
|
||||
resp = await client.get("/api/admin/settings")
|
||||
assert resp.status_code in (401, 403)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_settings_with_auth(client, auth_headers):
|
||||
"""Settings endpoint mit Admin-Token → 200."""
|
||||
resp = await client.get("/api/admin/settings", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "blender_engine" in data
|
||||
assert "thumbnail_format" in data
|
||||
Reference in New Issue
Block a user