From 7a1329958d7004243ca1dd7e2835efdf1d1b58d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Fri, 6 Mar 2026 20:49:34 +0100 Subject: [PATCH] feat(J): WebSocket live-events + replace polling + fix ffmpeg turntable timeout - fix(render): ffmpeg overlay=0:0 -> overlay=0:0:shortest=1 to prevent hang on finite PNG sequences - feat(ws): add core/websocket.py ConnectionManager + Redis Pub/Sub subscriber loop - feat(ws): add /api/ws WebSocket endpoint with JWT query-param auth in main.py - feat(ws): emit render_complete/failed + cad_processing_complete events from step_tasks.py - feat(ws): emit order_status_change events from orders router - feat(ws): add beat_tasks.py broadcast_queue_status task (every 10s via Redis __broadcast__) - feat(frontend): add useWebSocket hook with auto-reconnect (exponential backoff, 25s ping) - feat(frontend): add WebSocketContext + WebSocketProvider wrapping App - refactor(frontend): remove polling from WorkerActivity (was 5s/3s) + OrderDetail (was 5s) - refactor(frontend): reduce polling in Layout (8s->60s) + NotificationCenter (15s->60s) - docs: add ffmpeg shortest=1 + WebSocket JWT auth learnings to LEARNINGS.md Co-Authored-By: Claude Sonnet 4.6 --- LEARNINGS.md | 16 + backend/app/api/routers/orders.py | 14 + backend/app/core/websocket.py | 153 +++++++ backend/app/main.py | 54 ++- backend/app/services/render_blender.py | 2 +- backend/app/tasks/beat_tasks.py | 33 ++ backend/app/tasks/celery_app.py | 10 +- backend/app/tasks/step_tasks.py | 38 ++ frontend/src/App.tsx | 7 +- frontend/src/components/layout/Layout.tsx | 4 +- .../components/layout/NotificationCenter.tsx | 4 +- frontend/src/contexts/WebSocketContext.tsx | 62 +++ frontend/src/hooks/useWebSocket.ts | 99 +++++ frontend/src/pages/OrderDetail.tsx | 6 +- frontend/src/pages/WorkerActivity.tsx | 3 +- plan.md | 420 ++++++++++++++++++ 16 files changed, 909 insertions(+), 16 deletions(-) create mode 100644 backend/app/core/websocket.py create mode 100644 backend/app/tasks/beat_tasks.py create mode 100644 frontend/src/contexts/WebSocketContext.tsx create mode 100644 frontend/src/hooks/useWebSocket.ts create mode 100644 plan.md diff --git a/LEARNINGS.md b/LEARNINGS.md index 3cee1b1..39faecc 100644 --- a/LEARNINGS.md +++ b/LEARNINGS.md @@ -246,6 +246,22 @@ SQLAlchemy `Enum(create_type=False)` funktioniert nicht zuverlässig mit asyncpg **Lösung:** Frontend-API-Call auf `/tenants/` mit trailing slash geändert. **Für künftige Projekte:** FastAPI-Router immer mit trailing slash aufrufen oder `redirect_slashes=False` am Router setzen. +### 2026-03-06 | Render-Pipeline | ffmpeg Turntable hängt ohne `shortest=1` +**Problem:** Turntable-Render (Order f0436188) mit bg_color schlug mit Timeout (300s) fehl. ffmpeg-Overlay-Befehl war `[1:v][0:v]overlay=0:0` — der `lavfi color`-Quell-Stream hat unendliche Dauer. ffmpeg wartete nach Ende der PNG-Sequenz weiter auf weitere Farb-Stream-Frames → hing unbegrenzt. +**Lösung:** `overlay=0:0` → `overlay=0:0:shortest=1`. `shortest=1` beendet den Output-Stream sobald der kürzeste Input-Stream endet (die PNG-Sequenz). +**Datei:** `backend/app/services/render_blender.py:507` +**Für künftige Projekte:** Bei ffmpeg-Overlays mit lavfi/color/nullsrc als ein Input IMMER `shortest=1` setzen. Sonst hängt ffmpeg nach Ende des finite Streams. + +--- + +### 2026-03-06 | Architektur | WebSocket Auth via Query-Parameter (JWT) +**Problem:** WebSocket-Verbindungen können keinen `Authorization`-Header senden (Browser-WebSocket-API hat keine Header-Unterstützung). JWT muss anders übertragen werden. +**Lösung:** JWT als Query-Parameter: `ws://host/api/ws?token=`. Backend verifiziert via `jwt.decode()` im WebSocket-Endpoint. +**Sicherheitshinweis:** Token ist in Server-Logs sichtbar. Für v2 akzeptabel. In v3: kurzlebigen WS-Token (TTL 30s) aus JWT generieren. +**Für künftige Projekte:** Immer Query-Param oder Cookie (bei HTTPS) für WebSocket-Auth verwenden; nie erwarten dass der Browser Headers setzen kann. + +--- + ### 2026-03-06 | Celery Inspect | active_queues() zum Worker-Capability-Check **Erkenntnis:** `celery_app.control.inspect().active_queues()` gibt pro Worker zurück welche Queues er konsumiert. Damit kann man gezielt prüfen ob ein Worker mit bestimmten Fähigkeiten (z.B. `thumbnail_rendering`) connected ist — besser als Worker-Namen-Heuristiken. **Anwendung:** `GET /api/worker/health/render` nutzt `active_queues()` um `render_worker_connected` und `blender_available` korrekt zu bestimmen. diff --git a/backend/app/api/routers/orders.py b/backend/app/api/routers/orders.py index ff07f2c..e9929d5 100644 --- a/backend/app/api/routers/orders.py +++ b/backend/app/api/routers/orders.py @@ -486,6 +486,20 @@ async def submit_order( from app.services.pricing_service import refresh_order_price await refresh_order_price(db, order.id) await db.refresh(order) + + # Broadcast WebSocket event for live UI updates + try: + from app.core.websocket import manager as _ws_mgr + _tid = str(user.tenant_id) if user.tenant_id else None + if _tid: + await _ws_mgr.broadcast_to_tenant(_tid, { + "type": "order_status_change", + "order_id": str(order.id), + "status": "submitted", + }) + except Exception: + pass + return order diff --git a/backend/app/core/websocket.py b/backend/app/core/websocket.py new file mode 100644 index 0000000..d30efd1 --- /dev/null +++ b/backend/app/core/websocket.py @@ -0,0 +1,153 @@ +"""WebSocket connection manager with Redis Pub/Sub broadcasting. + +Architecture: + - ConnectionManager holds in-memory mapping: tenant_id -> set[WebSocket] + - A background asyncio task subscribes to Redis Pub/Sub channels + - Backend tasks/routers call publish_event_sync() (sync, Celery-safe) + which does redis.publish(f"tenant:{tenant_id}", json.dumps(event)) + - The subscriber loop receives messages and forwards to all WS for that tenant + +Special channel "__broadcast__" is forwarded to ALL connected clients. +""" +from __future__ import annotations + +import asyncio +import json +import logging +from collections import defaultdict +from typing import Any + +from fastapi import WebSocket, WebSocketDisconnect +from starlette.websockets import WebSocketState + +logger = logging.getLogger(__name__) + + +class ConnectionManager: + def __init__(self) -> None: + # tenant_id (str) -> set of active WebSocket connections + self._connections: dict[str, set[WebSocket]] = defaultdict(set) + self._lock = asyncio.Lock() + self._subscriber_task: asyncio.Task | None = None + + # ── Connection lifecycle ────────────────────────────────────────────────── + + async def connect(self, ws: WebSocket, tenant_id: str) -> None: + await ws.accept() + async with self._lock: + self._connections[tenant_id].add(ws) + logger.debug("WS connected tenant=%s total=%d", tenant_id, self._total()) + + async def disconnect(self, ws: WebSocket, tenant_id: str) -> None: + async with self._lock: + self._connections[tenant_id].discard(ws) + if not self._connections[tenant_id]: + del self._connections[tenant_id] + logger.debug("WS disconnected tenant=%s total=%d", tenant_id, self._total()) + + def _total(self) -> int: + return sum(len(s) for s in self._connections.values()) + + # ── Broadcast ───────────────────────────────────────────────────────────── + + async def broadcast_to_tenant(self, tenant_id: str, event: dict[str, Any]) -> None: + """Send event JSON to all WebSockets for a tenant.""" + message = json.dumps(event) + dead: list[WebSocket] = [] + for ws in list(self._connections.get(tenant_id, set())): + try: + if ws.client_state == WebSocketState.CONNECTED: + await ws.send_text(message) + except Exception: + dead.append(ws) + for ws in dead: + await self.disconnect(ws, tenant_id) + + async def broadcast_all(self, event: dict[str, Any]) -> None: + """Send event to ALL connected WebSockets regardless of tenant.""" + message = json.dumps(event) + dead: list[tuple[WebSocket, str]] = [] + async with self._lock: + snapshot = {tid: set(sockets) for tid, sockets in self._connections.items()} + for tenant_id, sockets in snapshot.items(): + for ws in sockets: + try: + if ws.client_state == WebSocketState.CONNECTED: + await ws.send_text(message) + except Exception: + dead.append((ws, tenant_id)) + for ws, tid in dead: + await self.disconnect(ws, tid) + + # ── Redis Pub/Sub subscriber ────────────────────────────────────────────── + + async def start_redis_subscriber(self) -> None: + """Start background task that listens for Redis Pub/Sub messages.""" + if self._subscriber_task is not None: + return + self._subscriber_task = asyncio.create_task(self._subscribe_loop()) + logger.info("WebSocket Redis subscriber started") + + async def _subscribe_loop(self) -> None: + from app.config import settings + import redis.asyncio as aioredis + + while True: + try: + client = aioredis.from_url(settings.redis_url, decode_responses=True) + pubsub = client.pubsub() + await pubsub.psubscribe("tenant:*", "__broadcast__") + logger.info("Subscribed to Redis channels tenant:* and __broadcast__") + async for message in pubsub.listen(): + if message["type"] not in ("message", "pmessage"): + continue + channel: str = message.get("channel") or message.get("pattern") or "" + data_str: str = message.get("data", "") + try: + event = json.loads(data_str) + except (json.JSONDecodeError, TypeError): + continue + + if channel == "__broadcast__": + await self.broadcast_all(event) + elif channel.startswith("tenant:"): + tenant_id = channel[len("tenant:"):] + await self.broadcast_to_tenant(tenant_id, event) + except asyncio.CancelledError: + break + except Exception as exc: + logger.warning("Redis subscriber error, reconnecting in 3s: %s", exc) + await asyncio.sleep(3) + + async def stop(self) -> None: + if self._subscriber_task: + self._subscriber_task.cancel() + try: + await self._subscriber_task + except asyncio.CancelledError: + pass + self._subscriber_task = None + + +# ── Singleton instance ──────────────────────────────────────────────────────── + +manager = ConnectionManager() + + +# ── Sync helper for Celery tasks ────────────────────────────────────────────── + +def publish_event_sync(tenant_id: str, event: dict[str, Any]) -> None: + """Publish a WebSocket event from a synchronous Celery task. + + Uses a plain (sync) Redis client to publish to the Pub/Sub channel. + The async subscriber loop in the FastAPI process will forward it to WS clients. + """ + try: + import redis as sync_redis + from app.config import settings + r = sync_redis.from_url(settings.redis_url, decode_responses=True) + channel = f"tenant:{tenant_id}" if tenant_id != "__broadcast__" else "__broadcast__" + r.publish(channel, json.dumps(event)) + r.close() + except Exception as exc: + logger.warning("publish_event_sync failed: %s", exc) diff --git a/backend/app/main.py b/backend/app/main.py index 21bd804..8b5a471 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,11 +1,13 @@ from contextlib import asynccontextmanager -from fastapi import FastAPI +import uuid +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pathlib import Path from app.config import settings from app.database import engine, Base +from app.core.websocket import manager as ws_manager # Import routers from domain locations from app.domains.auth.router import router as auth_router @@ -27,7 +29,10 @@ async def lifespan(app: FastAPI): # Create upload directories for subdir in ("step_files", "excel_files", "thumbnails", "renders", "blend-templates"): Path(settings.upload_dir, subdir).mkdir(parents=True, exist_ok=True) + # Start WebSocket Redis subscriber + await ws_manager.start_redis_subscriber() yield + await ws_manager.stop() app = FastAPI( @@ -86,3 +91,50 @@ app.include_router(media_router) @app.get("/health") async def health(): return {"status": "ok", "service": "schaefflerautomat-backend"} + + +@app.websocket("/api/ws") +async def websocket_endpoint( + websocket: WebSocket, + token: str = Query(..., description="JWT access token"), +): + """WebSocket endpoint for real-time events. + + Clients connect with ?token=. Events are scoped by tenant_id. + """ + from app.utils.auth import decode_token + from app.database import AsyncSessionLocal + from sqlalchemy import select + from app.models.user import User + + # Authenticate via token query param (WS cannot send Authorization header) + try: + payload = decode_token(token) + user_id = payload.get("sub") + if not user_id: + await websocket.close(code=4001) + return + except HTTPException: + await websocket.close(code=4001) + return + + # Load user to get tenant_id + async with AsyncSessionLocal() as db: + result = await db.execute(select(User).where(User.id == uuid.UUID(user_id))) + user = result.scalar_one_or_none() + + if not user or not user.is_active: + await websocket.close(code=4001) + return + + tenant_id = str(user.tenant_id) if user.tenant_id else user_id + + await ws_manager.connect(websocket, tenant_id) + try: + while True: + # Keep alive — clients send periodic pings as text + await websocket.receive_text() + except WebSocketDisconnect: + await ws_manager.disconnect(websocket, tenant_id) + except Exception: + await ws_manager.disconnect(websocket, tenant_id) diff --git a/backend/app/services/render_blender.py b/backend/app/services/render_blender.py index 6884d86..d1e9833 100644 --- a/backend/app/services/render_blender.py +++ b/backend/app/services/render_blender.py @@ -504,7 +504,7 @@ def render_turntable_to_file( "-framerate", str(fps), "-i", str(frames_dir / "frame_%04d.png"), "-f", "lavfi", "-i", f"color=c=0x{hex_color}:size={width}x{height}:rate={fps}", - "-filter_complex", "[1:v][0:v]overlay=0:0", + "-filter_complex", "[1:v][0:v]overlay=0:0:shortest=1", "-vcodec", "libx264", "-pix_fmt", "yuv420p", "-crf", "18", diff --git a/backend/app/tasks/beat_tasks.py b/backend/app/tasks/beat_tasks.py new file mode 100644 index 0000000..017d0a1 --- /dev/null +++ b/backend/app/tasks/beat_tasks.py @@ -0,0 +1,33 @@ +"""Celery Beat periodic tasks.""" +from __future__ import annotations + +import json +import logging + +from celery import shared_task + +logger = logging.getLogger(__name__) + + +@shared_task(name="app.tasks.beat_tasks.broadcast_queue_status", queue="step_processing") +def broadcast_queue_status() -> None: + """Broadcast current queue depths to all WebSocket clients every 10s. + + Publishes to the Redis '__broadcast__' channel which the WebSocket + subscriber in the FastAPI process forwards to all connected clients. + """ + try: + import redis as sync_redis + from app.config import settings + + r = sync_redis.from_url(settings.redis_url, decode_responses=True) + depths = { + "step_processing": r.llen("step_processing"), + "thumbnail_rendering": r.llen("thumbnail_rendering"), + } + event = {"type": "queue_update", "depths": depths} + r.publish("__broadcast__", json.dumps(event)) + r.close() + logger.debug("Broadcast queue_update: %s", depths) + except Exception as exc: + logger.warning("broadcast_queue_status failed: %s", exc) diff --git a/backend/app/tasks/celery_app.py b/backend/app/tasks/celery_app.py index 9764d2e..f372898 100644 --- a/backend/app/tasks/celery_app.py +++ b/backend/app/tasks/celery_app.py @@ -1,4 +1,5 @@ from celery import Celery +from celery.schedules import crontab from app.config import settings celery_app = Celery( @@ -8,6 +9,7 @@ celery_app = Celery( include=[ "app.tasks.step_tasks", "app.tasks.ai_tasks", + "app.tasks.beat_tasks", "app.domains.rendering.tasks", "app.domains.products.tasks", "app.domains.imports.tasks", @@ -23,7 +25,13 @@ celery_app.conf.update( task_routes={ "app.tasks.step_tasks.*": {"queue": "step_processing"}, "app.tasks.ai_tasks.*": {"queue": "ai_validation"}, + "app.tasks.beat_tasks.*": {"queue": "step_processing"}, "app.domains.rendering.tasks.*": {"queue": "thumbnail_rendering"}, }, - beat_schedule={}, + beat_schedule={ + "broadcast-queue-status-every-10s": { + "task": "app.tasks.beat_tasks.broadcast_queue_status", + "schedule": 10.0, # every 10 seconds + }, + }, ) diff --git a/backend/app/tasks/step_tasks.py b/backend/app/tasks/step_tasks.py index 35edab9..67f0916 100644 --- a/backend/app/tasks/step_tasks.py +++ b/backend/app/tasks/step_tasks.py @@ -171,6 +171,28 @@ def render_step_thumbnail(self, cad_file_id: str): f"Auto material population failed for cad_file {cad_file_id} (non-fatal)" ) + # Broadcast WebSocket event for live UI updates + try: + from sqlalchemy import create_engine, select as sql_select2 + from sqlalchemy.orm import Session as _Session + from app.config import settings as _cfg + from app.models.cad_file import CadFile as _CadFile + _sync_url = _cfg.database_url.replace("+asyncpg", "") + _eng = create_engine(_sync_url) + with _Session(_eng) as _s: + _cad = _s.get(_CadFile, cad_file_id) + _tid = str(_cad.tenant_id) if _cad and _cad.tenant_id else None + _eng.dispose() + if _tid: + from app.core.websocket import publish_event_sync + publish_event_sync(_tid, { + "type": "cad_processing_complete", + "cad_file_id": cad_file_id, + "status": "completed", + }) + except Exception: + logger.debug("WebSocket publish for CAD complete skipped (non-fatal)") + @celery_app.task(bind=True, name="app.tasks.step_tasks.generate_stl_cache", queue="thumbnail_rendering") def generate_stl_cache(self, cad_file_id: str, quality: str): @@ -559,6 +581,22 @@ def render_order_line_task(self, order_line_id: str): else: emit(order_line_id, f"Render failed after {elapsed:.1f}s", "error") + # Broadcast WebSocket event for live UI updates + try: + from app.core.websocket import publish_event_sync + _tenant_id = str(line.product.cad_file.tenant_id) if ( + line.product and line.product.cad_file and line.product.cad_file.tenant_id + ) else None + if _tenant_id: + publish_event_sync(_tenant_id, { + "type": "render_complete" if success else "render_failed", + "order_line_id": order_line_id, + "order_id": str(line.order_id), + "status": new_status, + }) + except Exception: + logger.debug("WebSocket publish skipped (non-fatal)") + # Notify order creator about render result try: from app.models.order import Order as OrderModel diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index a0a26e4..701ec8f 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -1,5 +1,6 @@ import { BrowserRouter, Routes, Route, Navigate } from 'react-router-dom' import { useAuthStore } from './store/auth' +import { WebSocketProvider } from './contexts/WebSocketContext' import Layout from './components/layout/Layout' import LoginPage from './pages/Login' import DashboardPage from './pages/Dashboard' @@ -38,7 +39,8 @@ function AdminRoute({ children }: { children: React.ReactNode }) { export default function App() { return ( - + + } /> - + + ) } diff --git a/frontend/src/components/layout/Layout.tsx b/frontend/src/components/layout/Layout.tsx index a95705a..222c91a 100644 --- a/frontend/src/components/layout/Layout.tsx +++ b/frontend/src/components/layout/Layout.tsx @@ -23,8 +23,8 @@ export default function Layout() { const { data: activity } = useQuery({ queryKey: ['worker-activity'], queryFn: getWorkerActivity, - refetchInterval: 8000, - staleTime: 4000, + refetchInterval: 60_000, + staleTime: 30_000, }) const { data: draftOrders } = useQuery({ diff --git a/frontend/src/components/layout/NotificationCenter.tsx b/frontend/src/components/layout/NotificationCenter.tsx index 73b9d86..9469571 100644 --- a/frontend/src/components/layout/NotificationCenter.tsx +++ b/frontend/src/components/layout/NotificationCenter.tsx @@ -81,7 +81,7 @@ export default function NotificationCenter() { const { data: unreadCount = 0 } = useQuery({ queryKey: ['notifications', 'unread-count'], queryFn: getUnreadCount, - refetchInterval: 15_000, + refetchInterval: 60_000, staleTime: 5_000, }) @@ -203,7 +203,7 @@ export default function NotificationCenter() {

{cfg.label(n.details)}

- {n.details?.error && ( + {!!n.details?.error && (

{String(n.details.error)}

diff --git a/frontend/src/contexts/WebSocketContext.tsx b/frontend/src/contexts/WebSocketContext.tsx new file mode 100644 index 0000000..1cb5774 --- /dev/null +++ b/frontend/src/contexts/WebSocketContext.tsx @@ -0,0 +1,62 @@ +/** + * WebSocketContext — global WebSocket provider. + * + * Wraps the app with a single WebSocket connection. On incoming events it + * invalidates the relevant React Query caches so all subscribers refresh. + */ +import { createContext, useContext, useCallback } from 'react' +import { useQueryClient } from '@tanstack/react-query' +import { useWebSocket, type WSEvent } from '../hooks/useWebSocket' + +const WebSocketContext = createContext(null) + +export function WebSocketProvider({ children }: { children: React.ReactNode }) { + const qc = useQueryClient() + + const onEvent = useCallback( + (event: WSEvent) => { + switch (event.type) { + case 'render_complete': + case 'render_failed': + qc.invalidateQueries({ queryKey: ['orders'] }) + if (event.order_id) { + qc.invalidateQueries({ queryKey: ['order', event.order_id as string] }) + } + qc.invalidateQueries({ queryKey: ['worker-activity'] }) + break + + case 'cad_processing_complete': + qc.invalidateQueries({ queryKey: ['worker-activity'] }) + break + + case 'order_status_change': + qc.invalidateQueries({ queryKey: ['orders'] }) + if (event.order_id) { + qc.invalidateQueries({ queryKey: ['order', event.order_id as string] }) + } + break + + case 'queue_update': + qc.setQueryData(['queue-status'], event.depths) + break + + default: + break + } + }, + [qc], + ) + + useWebSocket({ onEvent }) + + return ( + + {children} + + ) +} + +// eslint-disable-next-line @typescript-eslint/no-unused-vars +export function useWebSocketContext() { + return useContext(WebSocketContext) +} diff --git a/frontend/src/hooks/useWebSocket.ts b/frontend/src/hooks/useWebSocket.ts new file mode 100644 index 0000000..8b28976 --- /dev/null +++ b/frontend/src/hooks/useWebSocket.ts @@ -0,0 +1,99 @@ +/** + * WebSocket connection hook with auto-reconnect. + * + * Connects to /api/ws?token= and emits parsed JSON events via callbacks. + * Reconnect strategy: 1s, 2s, 4s, 8s, ... capped at 30s. + */ +import { useEffect, useRef, useCallback } from 'react' +import { useAuthStore } from '../store/auth' + +export interface WSEvent { + type: string + [key: string]: unknown +} + +export type WSEventHandler = (event: WSEvent) => void + +interface UseWebSocketOptions { + onEvent?: WSEventHandler + enabled?: boolean +} + +const WS_BASE = + window.location.protocol === 'https:' + ? `wss://${window.location.host}` + : `ws://${window.location.hostname}:8888` + +const PING_INTERVAL_MS = 25_000 +const MAX_BACKOFF_MS = 30_000 + +export function useWebSocket({ onEvent, enabled = true }: UseWebSocketOptions = {}) { + const token = useAuthStore((s) => s.token) + const wsRef = useRef(null) + const backoffRef = useRef(1000) + const pingRef = useRef | null>(null) + const reconnectRef = useRef | null>(null) + const onEventRef = useRef(onEvent) + onEventRef.current = onEvent + + const cleanup = useCallback(() => { + if (pingRef.current) clearInterval(pingRef.current) + if (reconnectRef.current) clearTimeout(reconnectRef.current) + if (wsRef.current) { + wsRef.current.onclose = null + wsRef.current.onerror = null + wsRef.current.onmessage = null + wsRef.current.close() + wsRef.current = null + } + }, []) + + const connect = useCallback(() => { + if (!token || !enabled) return + cleanup() + + const url = `${WS_BASE}/api/ws?token=${encodeURIComponent(token)}` + const ws = new WebSocket(url) + wsRef.current = ws + + ws.onopen = () => { + backoffRef.current = 1000 // reset backoff on successful connect + // Keep-alive pings + pingRef.current = setInterval(() => { + if (ws.readyState === WebSocket.OPEN) { + ws.send('ping') + } + }, PING_INTERVAL_MS) + } + + ws.onmessage = (evt) => { + try { + const event = JSON.parse(evt.data) as WSEvent + onEventRef.current?.(event) + } catch { + // ignore non-JSON messages (e.g. pong) + } + } + + ws.onclose = () => { + if (pingRef.current) clearInterval(pingRef.current) + // Schedule reconnect with exponential backoff + const delay = backoffRef.current + backoffRef.current = Math.min(backoffRef.current * 2, MAX_BACKOFF_MS) + reconnectRef.current = setTimeout(() => connect(), delay) + } + + ws.onerror = () => { + ws.close() + } + }, [token, enabled, cleanup]) + + useEffect(() => { + if (token && enabled) { + connect() + } else { + cleanup() + } + return cleanup + }, [token, enabled, connect, cleanup]) +} diff --git a/frontend/src/pages/OrderDetail.tsx b/frontend/src/pages/OrderDetail.tsx index 3b8fe42..8a5c9b0 100644 --- a/frontend/src/pages/OrderDetail.tsx +++ b/frontend/src/pages/OrderDetail.tsx @@ -70,10 +70,6 @@ export default function OrderDetailPage() { const { data: order, isLoading } = useQuery({ queryKey: ['order', id], queryFn: () => getOrder(id!), - refetchInterval: (query) => { - const status = query.state.data?.status - return status === 'processing' ? 5000 : false - }, }) const submitMut = useMutation({ @@ -169,7 +165,7 @@ export default function OrderDetailPage() { async function handleDownloadRenders() { setIsDownloading(true) try { - await downloadOrderRenders(id!, order.order_number) + await downloadOrderRenders(id!, order!.order_number) } catch (e: any) { toast.error(e.response?.data?.detail || 'Download failed') } finally { diff --git a/frontend/src/pages/WorkerActivity.tsx b/frontend/src/pages/WorkerActivity.tsx index 3b75376..c0737b8 100644 --- a/frontend/src/pages/WorkerActivity.tsx +++ b/frontend/src/pages/WorkerActivity.tsx @@ -24,7 +24,6 @@ export default function WorkerActivityPage() { const { data, isLoading, dataUpdatedAt } = useQuery({ queryKey: ['worker-activity'], queryFn: getWorkerActivity, - refetchInterval: 5000, }) const reprocessMut = useMutation({ @@ -237,7 +236,7 @@ function QueuePanel() { const { data: queue, isLoading } = useQuery({ queryKey: ['worker-queue'], queryFn: getQueueStatus, - refetchInterval: 3000, + refetchInterval: 10_000, }) const purgeMut = useMutation({ diff --git a/plan.md b/plan.md new file mode 100644 index 0000000..ffa455e --- /dev/null +++ b/plan.md @@ -0,0 +1,420 @@ +# Plan: Phase J (WebSocket) + Turntable Bug + Phase K (Asset Library) + +## Kontext + +Analyse des aktuellen Codestands ergab: **Phasen F, G, H, I, L sind bereits vollständig implementiert.** + +| Phase | Status | Beleg | +|-------|--------|-------| +| F - Hash-Caching | DONE | `domains/products/cache_service.py` + migration 041 | +| G - Billing | DONE | `domains/billing/` vollständig, WeasyPrint in Dockerfile | +| H - Excel Sanity-Check | DONE | `domains/imports/service.py run_sanity_check()` + Upload.tsx Dialog | +| I - Notification-Config | DONE | `notification_configs` migration 044, NotificationSettings.tsx | +| L - Dashboard | DONE | AdminDashboard.tsx + ClientDashboard.tsx vollständig | +| **J - WebSocket** | **FEHLT** | Kein `core/websocket.py`, alle Polls noch aktiv | + +Zusätzlich: **Kritischer Bug in `render_blender.py`** — ffmpeg-Overlay-Befehl haengt bei endlicher Frame-Sequenz (kein `shortest=1`) -> Timeout -> Turntable-Render schlaegt fehl. + +--- + +## Bug Fix: Turntable ffmpeg Timeout + +**Root cause**: In `backend/app/services/render_blender.py:507`: +```python +"-filter_complex", "[1:v][0:v]overlay=0:0", +``` +Der `lavfi color`-Quell-Stream hat keine definierte Laenge. Ohne `shortest=1` wartet ffmpeg auf +weitere Frames vom Farb-Stream nachdem die PNG-Sequenz endet -> haengt bis Timeout (300s). + +**Fix**: `overlay=0:0` -> `overlay=0:0:shortest=1` + +--- + +## Phase J: WebSocket Backend + Frontend + +### Architektur (ADR-05: FastAPI nativ + Redis Pub/Sub) + +``` +Backend Task/Router: + -> redis.publish(f"tenant:{tenant_id}", json.dumps(event)) + +core/websocket.py: + ConnectionManager: tenant_id -> set[WebSocket] + background_task: asyncio.Task (redis subscribe loop) + +Frontend: + useWebSocket() hook -> WebSocket('/api/ws') + Empfaengt Events, invalidiert React Query caches +``` + +### Events die gesendet werden: +| Event | Sender | Daten | +|-------|--------|-------| +| `render_complete` | step_tasks.py | order_line_id, status, thumbnail_url | +| `render_failed` | step_tasks.py | order_line_id, error | +| `cad_processing_complete` | step_tasks.py | cad_file_id, status | +| `order_status_change` | orders router | order_id, new_status | +| `queue_update` | beat task (alle 10s) | depth per queue | + +--- + +## Betroffene Dateien + +### Neu erstellen: +- `backend/app/core/websocket.py` -- ConnectionManager + Redis Pub/Sub Loop +- `frontend/src/hooks/useWebSocket.ts` -- WebSocket hook mit Auto-Reconnect +- `frontend/src/contexts/WebSocketContext.tsx` -- Context Provider + +### Aendern: +- `backend/app/services/render_blender.py` -- ffmpeg shortest=1 Bug-Fix +- `backend/app/main.py` -- WebSocket-Endpoint registrieren (`/api/ws`) +- `backend/app/tasks/step_tasks.py` -- WebSocket-Events emittieren +- `backend/app/domains/orders/router.py` -- Order-Status-Events emittieren +- `backend/app/tasks/celery_app.py` -- `broadcast_queue_status` Beat-Task hinzufuegen +- `frontend/src/App.tsx` -- WebSocketProvider wrappen +- `frontend/src/pages/WorkerActivity.tsx` -- polling durch WS ersetzen +- `frontend/src/pages/OrderDetail.tsx` -- polling durch WS ersetzen +- `frontend/src/pages/Orders.tsx` -- polling reduzieren +- `frontend/src/components/layout/Layout.tsx` -- polling reduzieren +- `frontend/src/components/layout/NotificationCenter.tsx` -- polling durch WS ersetzen + +### Nach Phase J Commit -- Phase K: +- `backend/alembic/versions/045_asset_libraries.py` -- asset_libraries Tabelle +- `backend/app/domains/materials/models.py` -- AssetLibrary Model hinzufuegen +- `backend/app/domains/materials/router.py` -- Asset Library CRUD + Upload +- `render-worker/scripts/asset_library.py` -- Materialien + Node-Groups aus .blend laden +- `render-worker/scripts/catalog_assets.py` -- Katalog aus .blend lesen +- `render-worker/scripts/export_gltf.py` -- GLB Export mit Materialien +- `render-worker/scripts/export_blend.py` -- .blend Export mit pack_all() +- `backend/app/domains/rendering/workflow_builder.py` -- Asset Library Nodes +- `frontend/src/pages/Admin.tsx` -- Asset Library Manager UI +- `frontend/src/api/assetLibraries.ts` -- API Client + +--- + +## Tasks (in Reihenfolge) + +### Task 1: Bug-Fix ffmpeg Turntable Timeout [x] +- **Datei**: `backend/app/services/render_blender.py:507` +- **Was**: `"[1:v][0:v]overlay=0:0"` -> `"[1:v][0:v]overlay=0:0:shortest=1"` +- **Akzeptanzkriterium**: Turntable-Render fuer Order f0436188 kann erneut gestartet werden und produziert MP4 +- **Abhaengigkeiten**: keine + +### Task 2: WebSocket Backend -- core/websocket.py [x] +- **Datei**: `backend/app/core/websocket.py` (neu) +- **Was**: + ```python + class ConnectionManager: + _connections: dict[str, set[WebSocket]] # tenant_id -> sockets + async def connect(ws, tenant_id) + def disconnect(ws, tenant_id) + async def broadcast_to_tenant(tenant_id, event: dict) + async def start_redis_subscriber() # asyncio background task + + def publish_event_sync(tenant_id: str, event: dict): + # Sync version fuer Celery tasks -- redis.publish() + ``` + - Redis Pub/Sub: subscribe auf `tenant:*` Channels + - Bei Nachricht: alle WebSockets des Tenants benachrichtigen + - Auto-Ping alle 30s gegen Disconnects +- **Akzeptanzkriterium**: broadcast_to_tenant sendet an alle verbundenen WS des Tenants +- **Abhaengigkeiten**: keine + +### Task 3: WebSocket Endpoint in main.py [x] +- **Datei**: `backend/app/main.py` +- **Was**: + ```python + @app.websocket("/api/ws") + async def ws_endpoint(websocket: WebSocket, token: str = Query(...)): + user = await verify_ws_token(token) + await manager.connect(websocket, str(user.tenant_id)) + try: + while True: + await websocket.receive_text() # Keep-alive pings + except WebSocketDisconnect: + manager.disconnect(websocket, str(user.tenant_id)) + ``` + - Token-Auth via Query-Parameter (WS kann keinen Authorization-Header senden) + - `verify_ws_token`: JWT decode, User laden (analog zu get_current_user) + - `manager` als globale Instanz, gestartet im lifespan +- **Akzeptanzkriterium**: `ws://localhost:8888/api/ws?token=` oeffnet Verbindung +- **Abhaengigkeiten**: Task 2 + +### Task 4: WebSocket Events in step_tasks.py [x] +- **Datei**: `backend/app/tasks/step_tasks.py` +- **Was**: In render_order_line_task und render_step_thumbnail nach Erfolg/Fehler: + ```python + from app.core.websocket import publish_event_sync + # bei render complete: + publish_event_sync(tenant_id, {"type": "render_complete", "order_line_id": str(line.id), "status": "completed"}) + # bei render failed: + publish_event_sync(tenant_id, {"type": "render_failed", "order_line_id": str(line.id), "error": str(exc)}) + # bei CAD processing complete: + publish_event_sync(tenant_id, {"type": "cad_processing_complete", "cad_file_id": str(cad_file.id), "status": "completed"}) + ``` + - tenant_id aus cad_file.tenant_id bzw. order_line -> order -> user.tenant_id laden +- **Akzeptanzkriterium**: Render fertig -> WebSocket-Client empfaengt Event +- **Abhaengigkeiten**: Task 2 + +### Task 5: WebSocket Events in orders router [x] +- **Datei**: `backend/app/domains/orders/router.py` +- **Was**: Bei Order-Status-Aenderung (submit, complete, cancel): + ```python + from app.core.websocket import manager + await manager.broadcast_to_tenant( + str(current_user.tenant_id), + {"type": "order_status_change", "order_id": str(order.id), "status": new_status} + ) + ``` +- **Akzeptanzkriterium**: Order-Submit -> WebSocket-Event geht an alle Browser-Tabs des Tenants +- **Abhaengigkeiten**: Task 2 + +### Task 6: Queue-Update Beat-Task [x] +- **Datei**: `backend/app/tasks/celery_app.py` +- **Was**: Neuer Beat-Task alle 10s: + ```python + @shared_task(name="beat.broadcast_queue_status", queue="step_processing") + def broadcast_queue_status(): + from app.core.websocket import publish_event_sync + from redis import Redis + r = Redis.from_url(settings.redis_url) + depths = { + "step_processing": r.llen("step_processing"), + "thumbnail_rendering": r.llen("thumbnail_rendering"), + } + # Broadcast an alle Tenants (broadcast_all) + r.publish("__broadcast__", json.dumps({"type": "queue_update", "depths": depths})) + ``` + - `__broadcast__` Channel: wird an ALLE verbundenen WS gesendet (nicht tenant-spezifisch) + - ConnectionManager subscribt auch auf `__broadcast__` +- **Akzeptanzkriterium**: WorkerActivity-Queue-Tiefe aktualisiert alle 10s automatisch +- **Abhaengigkeiten**: Task 2 + +### Task 7: Frontend WebSocket Hook [x] +- **Datei**: `frontend/src/hooks/useWebSocket.ts` (neu) +- **Was**: + ```typescript + export function useWebSocketConnection() { + // Verbindet zu ws://localhost:8888/api/ws?token= + // Auto-Reconnect: 1s, 2s, 4s, 8s, ... max 30s + // Emittiert Events via onMessage callback + // Pings alle 25s (keep-alive) + // Trennt Verbindung bei Logout + } + ``` +- **Akzeptanzkriterium**: Verbindung bleibt offen, reconnected nach Netzwerktrennung +- **Abhaengigkeiten**: keine + +### Task 8: Frontend WebSocket Context [x] +- **Datei**: `frontend/src/contexts/WebSocketContext.tsx` (neu), `frontend/src/App.tsx` aendern +- **Was**: + ```typescript + export function WebSocketProvider({ children }) { + const queryClient = useQueryClient() + // on 'render_complete': invalidateQueries(['orders', order_line_id]) + // on 'render_failed': invalidateQueries(['orders', order_line_id]) + // on 'cad_processing_complete': invalidateQueries(['cad-activity']) + // on 'order_status_change': invalidateQueries(['orders']) + // on 'queue_update': queryClient.setQueryData(['queue-status'], ...) + } + // App.tsx: um wrappen + ``` +- **Akzeptanzkriterium**: render_complete Event -> OrderDetail aktualisiert ohne Poll-Interval +- **Abhaengigkeiten**: Task 7 + +### Task 9: Polling ersetzen -- WorkerActivity.tsx [x] +- **Datei**: `frontend/src/pages/WorkerActivity.tsx` +- **Was**: + - `refetchInterval: 5000` entfernen -- bei `cad_processing_complete` invalidieren + - `refetchInterval: 3000` fuer Queue-Status entfernen -- bei `queue_update` setQueryData +- **Akzeptanzkriterium**: Keine automatischen HTTP-Requests im Network-Tab (nur WS-Frames) +- **Abhaengigkeiten**: Task 8 + +### Task 10: Polling ersetzen -- OrderDetail.tsx [x] +- **Datei**: `frontend/src/pages/OrderDetail.tsx` +- **Was**: + - `refetchInterval: (query) => {...}` entfernen + - Stattdessen: bei `render_complete` / `render_failed` fuer matching order_line_id -> invalidate +- **Akzeptanzkriterium**: Render-Status in OrderDetail aktualisiert live ohne Poll +- **Abhaengigkeiten**: Task 8 + +### Task 11: Polling reduzieren -- Layout.tsx + NotificationCenter.tsx [x] +- **Dateien**: `frontend/src/components/layout/Layout.tsx`, `NotificationCenter.tsx` +- **Was**: + - Layout: `refetchInterval: 8000` -> 60000 (1min) + - NotificationCenter: `refetchInterval: 15_000` -> 60000; bei `order_status_change` zusaetzlich invalidieren +- **Akzeptanzkriterium**: Signifikant weniger Poll-Requests im Network-Tab +- **Abhaengigkeiten**: Task 8 + +### Task 12: PLAN.md + LEARNINGS.md + Commit [x] +- **Was**: + - PLAN.md: Phase J als ABGESCHLOSSEN markieren, Status auf "Phase K als naechstes" + - LEARNINGS.md: ffmpeg `shortest=1` Learning + WebSocket Auth via Query-Param Learning + - `git commit -m "feat(J): WebSocket live-events + replace polling + fix ffmpeg turntable timeout"` +- **Abhaengigkeiten**: Tasks 1-11 + +--- + +## Phase K Tasks (nach Commit) + +### Task K1: Migration 045 + AssetLibrary Model [ ] +- **Datei**: `backend/alembic/versions/045_asset_libraries.py` (neu, autogenerate), `domains/materials/models.py` +- **Was**: + ```python + class AssetLibrary(Base): + id: UUID PK, tenant_id FK nullable, name VARCHAR(200) + blend_file_key TEXT, # MinIO key + catalog JSONB, # {materials: [...], node_groups: [...]} + description TEXT, is_active BOOL, created_at TIMESTAMP + ``` + - `render_templates.asset_library_id` FK optional (nullable) + - `output_types.asset_library_id` FK optional (nullable) +- **Akzeptanzkriterium**: `alembic upgrade head` erfolgreich, `asset_libraries` Tabelle in DB + +### Task K2: Asset Library CRUD Backend [ ] +- **Datei**: `backend/app/domains/materials/router.py` + `service.py` + `schemas.py` +- **Was**: + - `POST /api/asset-libraries` -- .blend Upload -> MinIO `asset-libraries/{id}.blend` -> queut Katalog-Refresh + - `GET /api/asset-libraries` -- Liste + - `GET /api/asset-libraries/{id}/catalog` -- Materialien + Node-Groups + - `DELETE /api/asset-libraries/{id}` -- nur wenn nicht in Verwendung (FK-Check) + - `AssetLibraryOut` Schema mit `catalog` field +- **Akzeptanzkriterium**: POST + GET funktionieren, .blend in MinIO gespeichert + +### Task K3: Katalog-Refresh Celery Task + Blender Script [ ] +- **Datei**: `backend/app/domains/materials/tasks.py` (neu), `render-worker/scripts/catalog_assets.py` (neu) +- **Was**: + - Celery Task `refresh_asset_library_catalog(asset_library_id)` auf Queue `thumbnail_rendering` + - Laedt .blend aus MinIO in tmpdir + - Startet `blender --background --python catalog_assets.py -- ` + - `catalog_assets.py`: oeffnet .blend, liest alle markierten Assets: + ```python + import bpy, json, sys + blend_path = sys.argv[sys.argv.index('--') + 1] + bpy.ops.wm.open_mainfile(filepath=blend_path) + catalog = { + "materials": [m.name for m in bpy.data.materials if m.asset_data], + "node_groups": [ng.name for ng in bpy.data.node_groups if ng.asset_data], + } + print(json.dumps(catalog)) + ``` + - Schreibt Katalog in `asset_libraries.catalog JSONB` +- **Akzeptanzkriterium**: Nach .blend-Upload enthaelt `catalog` JSONB die Asset-Namen + +### Task K4: Blender Asset Library Apply Script [ ] +- **Datei**: `render-worker/scripts/asset_library.py` (neu) +- **Was**: + ```python + def apply_asset_library_materials(blend_path: str, material_map: dict) -> None: + """Laedt Materialien aus Asset-Library .blend, wendet auf Mesh-Parts an.""" + with bpy.data.libraries.load(blend_path, link=True, assets_only=True) as (src, dst): + dst.materials = [n for n in src.materials if n in material_map.values()] + for obj in bpy.data.objects: + if obj.type == 'MESH': + for slot in obj.material_slots: + resolved = material_map.get(slot.material.name if slot.material else '') + if resolved and resolved in bpy.data.materials: + slot.material = bpy.data.materials[resolved] + + def apply_asset_library_modifiers(blend_path: str, modifier_map: dict) -> None: + """Laedt Geometry-Node-Gruppen, wendet als Modifier an.""" + with bpy.data.libraries.load(blend_path, link=True, assets_only=True) as (src, dst): + dst.node_groups = [n for n in src.node_groups if n in modifier_map.values()] + for obj in bpy.data.objects: + if obj.type == 'MESH': + for part_name, mod_name in modifier_map.items(): + if part_name.lower() in obj.name.lower(): + mod = obj.modifiers.new(name=mod_name, type='NODES') + mod.node_group = bpy.data.node_groups.get(mod_name) + ``` +- **Akzeptanzkriterium**: Render mit Asset-Library zeigt korrekte Produktionsmaterialien + +### Task K5: export_gltf + export_blend Scripts [ ] +- **Dateien**: `render-worker/scripts/export_gltf.py` (neu), `render-worker/scripts/export_blend.py` (neu) +- **Was**: + - `export_gltf.py`: + 1. STL importieren (`bpy.ops.import_mesh.stl`) + 2. Asset Library laden via `apply_asset_library_materials` + `apply_asset_library_modifiers` + 3. `bpy.ops.export_scene.gltf(filepath=out, export_format='GLB', export_apply=True, export_draco_mesh_compression_enable=True)` + 4. Output nach MinIO `production-exports/{cad_file_id}/{run_id}.glb` + 5. MediaAsset-Record mit `asset_type=gltf_production` + - `export_blend.py`: + 1. STL + Asset Library laden (wie export_gltf) + 2. `bpy.ops.file.pack_all()` + 3. `bpy.ops.wm.save_as_mainfile(filepath=out, compress=True, copy=True)` + 4. MediaAsset-Record mit `asset_type=blend_production` +- **Akzeptanzkriterium**: GLB-Download oeffnet sich im Three.js Viewer mit Materialien + +### Task K6: Workflow-Builder -- Asset Library Nodes [ ] +- **Datei**: `backend/app/domains/rendering/workflow_builder.py` +- **Was**: + - Neue Celery Tasks: `apply_asset_library_materials_task`, `apply_asset_library_modifiers_task`, `export_gltf_task`, `export_blend_task` + - Neuer Workflow-Typ `still_production`: + ```python + chain( + convert_step.si(order_line_id), + group( + chain(apply_asset_library_materials.si(order_line_id), render_still.si(order_line_id)), + chain(apply_asset_library_materials.si(order_line_id), export_gltf.si(order_line_id)), + chain(apply_asset_library_materials.si(order_line_id), export_blend.si(order_line_id)), + ), + generate_thumbnail.si(order_line_id), + publish_asset.si(order_line_id), + ) + ``` +- **Akzeptanzkriterium**: Dispatch eines `still_production` Workflows -> PNG + GLB + .blend erzeugt + +### Task K7: Asset Library Management UI [ ] +- **Dateien**: `frontend/src/api/assetLibraries.ts` (neu), `frontend/src/pages/Admin.tsx` erweitern +- **Was**: + - API Client: `getAssetLibraries`, `uploadAssetLibrary` (multipart), `deleteAssetLibrary`, `getAssetLibraryCatalog` + - Admin.tsx: neues Panel "Asset Libraries" (nach Render Templates) + - Upload-Button + Drag-Drop + - Tabelle: Name, Materialien-Anzahl, Node-Groups-Anzahl, Aktionen + - Katalog-Detail: Material-Badge-Liste (gruen) + Node-Group-Badge-Liste (blau) + - OutputTypeTable: Asset-Library-Dropdown-Spalte +- **Akzeptanzkriterium**: Admin kann .blend hochladen, Katalog sehen, OutputType zuweisen + +### Task K8: PLAN.md + LEARNINGS.md + Commit [ ] +- **Was**: + - PLAN.md: Phase K als ABGESCHLOSSEN markieren + - LEARNINGS.md: Asset Library link=True Pattern, GLB-Export Blender API + - `git commit -m "feat(K): Blender Asset Library + production exports (GLB + .blend)"` + +--- + +## Migrations-Check + +| Migration | Phase | Status | +|-----------|-------|--------| +| 041 step_file_hash | F | existiert | +| 042 invoices | G | existiert | +| 043 import_validations | H | existiert | +| 044 notification_configs | I | existiert | +| **045 asset_libraries** | **K** | **fehlt** | + +--- + +## Reihenfolge-Empfehlung + +``` +Task 1 (Bug-Fix, sofort) + Tasks 2-6 parallel (Backend WebSocket) + Tasks 7-8 parallel (Frontend Hook + Context) + Tasks 9-11 (Polling ersetzen, nach 8) + Task 12 (Commit) + Tasks K1-K3 parallel (Datenmodell + Backend + Blender-Katalog) + Tasks K4-K5 parallel (Blender Scripts) + Tasks K6-K7 (Workflow + UI, nach K1-K5) + Task K8 (Commit) +``` + +--- + +## Risiken / Offene Fragen + +- **WebSocket Auth via Query-Param**: Token in Server-Logs sichtbar. Fuer v2 akzeptabel. In v3: kurzlebigen WS-Token (TTL 30s) aus JWT generieren. +- **Redis Pub/Sub Skalierung**: Bei vielen Tenants/Tabs kann Subscriber-Loop Bottleneck werden. Fuer v2 OK. In v3: Redis Streams. +- **Phase K -- MinIO Bucket**: `asset-libraries` Bucket muss beim Startup erstellt werden (lifespan in main.py). +- **Phase K -- link=True** bedeutet .blend muss vor Render via MinIO heruntergeladen werden (in tmpdir). Bereits einkalkuliert in K3. +- **Bestehende material_libraries**: Die alte `material_libraries` Tabelle/Feature bleibt parallel bestehen -- kein Breaking Change. Asset Libraries sind additiv.