feat(N): workflow pipeline, 3D viewer, worker management, QC tests

- workflow_builder.py: fix broken stubs, add render_order_line_still_task
  (resolves step_path from DB instead of passing order_line_id as step_path)
- domains/rendering/tasks.py: add render_order_line_still_task,
  export_gltf_for_order_line_task, export_blend_for_order_line_task,
  generate_gltf_geometry_task (trimesh STL→GLB, no Blender needed)
- tasks/step_tasks.py: add generate_gltf_geometry_task for CadFile GLB export
- cad router: POST /{id}/generate-gltf-geometry endpoint (admin/PM)
- worker router: GET /celery-workers + POST /scale (docker compose subprocess)
- Dockerfile: pip install -e "[dev]" to enable pytest
- docker-compose.yml: docker socket + compose file mount on backend
- ThreeDViewer.tsx: mode toggle (geometry/production), wireframe, env presets,
  download buttons (GLB + .blend)
- CadPreview.tsx: load gltf_geometry/gltf_production/blend_production assets
  from MediaAsset table and pass URLs to ThreeDViewer
- ProductDetail.tsx: "View 3D" button → /cad/:id, "Generate GLB" button
- media router/service: cad_file_id filter on GET /api/media
- WorkerManagement.tsx: new page with worker status, queue depth, scale controls
- App.tsx + Layout.tsx: /workers route + sidebar link (admin/PM)
- tests: test_rendering_service.py, test_orders_service.py (backend)
- tests: WorkerActivity.test.tsx, WorkerManagement.test.tsx (frontend)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 22:56:53 +01:00
parent 208eb21988
commit a70cb55d01
24 changed files with 1828 additions and 448 deletions
+9 -2
View File
@@ -1,3 +1,6 @@
# Stage 0: grab docker CLI + compose plugin
FROM docker:cli AS docker-cli
FROM python:3.11-slim
WORKDIR /app
@@ -13,9 +16,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libffi-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
# Copy docker CLI for worker scaling
COPY --from=docker-cli /usr/local/bin/docker /usr/local/bin/docker
COPY --from=docker-cli /usr/local/lib/docker/cli-plugins /usr/local/lib/docker/cli-plugins
# Install Python dependencies (including dev extras for pytest)
COPY pyproject.toml .
RUN pip install --no-cache-dir -e .
RUN pip install --no-cache-dir -e ".[dev]"
# Copy app code
COPY . .
+32
View File
@@ -320,6 +320,38 @@ async def generate_stl(
return {"status": "queued", "task_id": task.id, "quality": quality}
@router.post("/{id}/generate-gltf-geometry", status_code=status.HTTP_202_ACCEPTED)
async def generate_gltf_geometry(
id: uuid.UUID,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Queue GLB geometry export from the existing STL cache (trimesh, no Blender).
Stores the result as a MediaAsset with asset_type='gltf_geometry'.
The STL low-quality cache must already exist (run a thumbnail render first).
"""
if user.role.value not in ("admin", "project_manager"):
raise HTTPException(status_code=403, detail="Insufficient permissions")
cad = await _get_cad_file(id, db)
if not cad.stored_path:
raise HTTPException(status_code=404, detail="STEP file not uploaded for this CAD file")
step_path = Path(cad.stored_path)
stl_path = step_path.parent / f"{step_path.stem}_low.stl"
if not stl_path.exists():
raise HTTPException(
status_code=404,
detail="STL low-quality cache not found. Trigger a render first to generate it.",
)
# Queue as a thumbnail_rendering task (trimesh available in render-worker)
from app.tasks.step_tasks import generate_gltf_geometry_task
task = generate_gltf_geometry_task.delay(str(id))
return {"status": "queued", "task_id": task.id, "cad_file_id": str(id)}
@router.post(
"/{id}/regenerate-thumbnail",
status_code=status.HTTP_202_ACCEPTED,
+98
View File
@@ -356,6 +356,104 @@ async def cancel_task(task_id: str, user: User = Depends(require_admin_or_pm)):
return {"revoked": task_id}
# ---------------------------------------------------------------------------
# Worker management — list workers + scale
# ---------------------------------------------------------------------------
class ScaleRequest(BaseModel):
service: str # "render-worker" | "worker" | "worker-thumbnail"
count: int # 020
@router.get("/celery-workers")
async def get_celery_workers(user: User = Depends(require_admin_or_pm)):
"""List active Celery workers with their queues and active task counts."""
import asyncio
from app.tasks.celery_app import celery_app
def _inspect() -> dict:
try:
insp = celery_app.control.inspect(timeout=2.0)
return {
"active_queues": insp.active_queues() or {},
"active": insp.active() or {},
"stats": insp.stats() or {},
}
except Exception as exc:
return {"error": str(exc)}
data = await asyncio.to_thread(_inspect)
if "error" in data:
return {"workers": [], "error": data["error"]}
workers = []
for worker_name, queues in data.get("active_queues", {}).items():
queue_names = [q.get("name") for q in (queues or [])]
active_tasks = data.get("active", {}).get(worker_name, [])
stats = data.get("stats", {}).get(worker_name, {})
workers.append({
"name": worker_name,
"queues": queue_names,
"active_task_count": len(active_tasks),
"active_tasks": [
{"name": t.get("name"), "id": t.get("id")} for t in active_tasks
],
"total_tasks_processed": stats.get("total", {}),
})
return {"workers": workers}
@router.post("/scale", status_code=http_status.HTTP_202_ACCEPTED)
async def scale_workers(
body: ScaleRequest,
user: User = Depends(require_admin_or_pm),
):
"""Scale a Compose service (render-worker, worker, worker-thumbnail) up or down.
Requires the docker socket and compose file to be accessible inside the container
(see docker-compose.yml COMPOSE_PROJECT_DIR env var).
"""
import asyncio
import os
import subprocess
from fastapi import HTTPException
ALLOWED_SERVICES = {"render-worker", "worker", "worker-thumbnail"}
if body.service not in ALLOWED_SERVICES:
raise HTTPException(400, detail=f"service must be one of {ALLOWED_SERVICES}")
if not (0 <= body.count <= 20):
raise HTTPException(400, detail="count must be between 0 and 20")
compose_dir = os.environ.get("COMPOSE_PROJECT_DIR", "/compose")
compose_file = os.path.join(compose_dir, "docker-compose.yml")
def _scale() -> subprocess.CompletedProcess:
return subprocess.run(
[
"docker", "compose",
"-f", compose_file,
"up",
"--scale", f"{body.service}={body.count}",
"--no-recreate",
"-d",
],
capture_output=True, text=True, timeout=120,
)
try:
result = await asyncio.to_thread(_scale)
except subprocess.TimeoutExpired:
raise HTTPException(504, detail="Scale operation timed out")
if result.returncode != 0:
raise HTTPException(
500,
detail=f"docker compose scale failed: {result.stderr[-500:]}",
)
return {"service": body.service, "count": body.count, "status": "scaling"}
# ---------------------------------------------------------------------------
# Render health check
# ---------------------------------------------------------------------------
+2
View File
@@ -19,6 +19,7 @@ router = APIRouter(prefix="/api/media", tags=["media"])
async def list_assets(
product_id: uuid.UUID | None = None,
order_line_id: uuid.UUID | None = None,
cad_file_id: uuid.UUID | None = None,
asset_type: MediaAssetType | None = None,
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=500),
@@ -28,6 +29,7 @@ async def list_assets(
db,
product_id=product_id,
order_line_id=order_line_id,
cad_file_id=cad_file_id,
asset_type=asset_type,
skip=skip,
limit=limit,
+3
View File
@@ -9,6 +9,7 @@ async def list_media_assets(
db: AsyncSession,
product_id: uuid.UUID | None = None,
order_line_id: uuid.UUID | None = None,
cad_file_id: uuid.UUID | None = None,
asset_type: MediaAssetType | None = None,
is_archived: bool | None = False,
skip: int = 0,
@@ -19,6 +20,8 @@ async def list_media_assets(
q = q.where(MediaAsset.product_id == product_id)
if order_line_id:
q = q.where(MediaAsset.order_line_id == order_line_id)
if cad_file_id:
q = q.where(MediaAsset.cad_file_id == cad_file_id)
if asset_type:
q = q.where(MediaAsset.asset_type == asset_type)
if is_archived is not None:
+170
View File
@@ -269,6 +269,176 @@ def publish_asset(
return asyncio.get_event_loop().run_until_complete(_run())
def _resolve_step_path_for_order_line(order_line_id: str) -> tuple[str | None, str | None]:
"""Sync helper: resolves (step_path, cad_file_id) from an OrderLine via DB."""
import asyncio
async def _inner() -> tuple[str | None, str | None]:
from app.database import AsyncSessionLocal
from app.domains.orders.models import OrderLine
from app.domains.products.models import Product
from app.models.cad_file import CadFile
from sqlalchemy import select
from sqlalchemy.orm import selectinload
async with AsyncSessionLocal() as db:
res = await db.execute(
select(OrderLine)
.options(selectinload(OrderLine.product))
.where(OrderLine.id == order_line_id)
)
line = res.scalar_one_or_none()
if not line or not line.product or not line.product.cad_file_id:
return None, None
cad_res = await db.execute(
select(CadFile).where(CadFile.id == line.product.cad_file_id)
)
cad = cad_res.scalar_one_or_none()
if not cad or not cad.stored_path:
return None, None
return cad.stored_path, str(line.product.cad_file_id)
return asyncio.get_event_loop().run_until_complete(_inner())
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.render_order_line_still_task",
queue="thumbnail_rendering",
max_retries=2,
)
def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
"""Render a still image for an order line, resolving STEP path from DB.
Wraps render_still_task logic but accepts order_line_id instead of step_path.
On success, creates a MediaAsset record via publish_asset.
"""
step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id)
if not step_path_str:
raise RuntimeError(
f"Cannot resolve STEP path for order_line {order_line_id}: "
"product missing or has no linked CAD file"
)
step = Path(step_path_str)
output_dir = step.parent / "renders"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"line_{order_line_id}.png"
try:
from app.services.render_blender import render_still
result = render_still(
step_path=step,
output_path=output_path,
**params,
)
publish_asset.delay(
order_line_id,
"still",
str(output_path),
render_config=result,
)
logger.info(
"render_order_line_still_task completed for line %s in %.1fs",
order_line_id, result.get("total_duration_s", 0),
)
return result
except Exception as exc:
logger.error("render_order_line_still_task failed for %s: %s", order_line_id, exc)
raise self.retry(exc=exc, countdown=30)
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.export_gltf_for_order_line_task",
queue="thumbnail_rendering",
max_retries=1,
)
def export_gltf_for_order_line_task(self, order_line_id: str) -> dict:
"""Export a geometry-only GLB from the STL cache using trimesh (no Blender).
Publishes a MediaAsset with asset_type='gltf_geometry'.
Requires the STL low-quality cache to exist.
"""
step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id)
if not step_path_str:
raise RuntimeError(f"Cannot resolve STEP path for order_line {order_line_id}")
step = Path(step_path_str)
stl_path = step.parent / f"{step.stem}_low.stl"
if not stl_path.exists():
raise RuntimeError(
f"STL cache not found: {stl_path}. Run thumbnail generation first."
)
output_path = step.parent / f"{step.stem}_geometry.glb"
try:
import trimesh
mesh = trimesh.load(str(stl_path))
mesh.export(str(output_path))
publish_asset.delay(order_line_id, "gltf_geometry", str(output_path))
logger.info("export_gltf_for_order_line_task completed: %s", output_path.name)
return {"glb_path": str(output_path)}
except Exception as exc:
logger.error("export_gltf_for_order_line_task failed for %s: %s", order_line_id, exc)
raise self.retry(exc=exc, countdown=15)
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.export_blend_for_order_line_task",
queue="thumbnail_rendering",
max_retries=1,
)
def export_blend_for_order_line_task(self, order_line_id: str) -> dict:
"""Export a production-quality GLB via Blender + asset library (export_gltf.py).
Publishes a MediaAsset with asset_type='blend_production'.
Requires Blender + the render-scripts directory.
"""
import os
import subprocess
step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id)
if not step_path_str:
raise RuntimeError(f"Cannot resolve STEP path for order_line {order_line_id}")
step = Path(step_path_str)
stl_path = step.parent / f"{step.stem}_low.stl"
if not stl_path.exists():
raise RuntimeError(f"STL cache not found: {stl_path}")
output_path = step.parent / f"{step.stem}_production.glb"
scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
export_script = scripts_dir / "export_gltf.py"
from app.services.render_blender import find_blender
blender_bin = find_blender()
if not blender_bin:
raise RuntimeError("Blender binary not found — cannot run export_blend task")
try:
cmd = [
blender_bin, "--background",
"--python", str(export_script),
"--",
"--stl_path", str(stl_path),
"--output_path", str(output_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
raise RuntimeError(
f"export_gltf.py exited {result.returncode}:\n{result.stderr[-500:]}"
)
publish_asset.delay(order_line_id, "blend_production", str(output_path))
logger.info("export_blend_for_order_line_task completed: %s", output_path.name)
return {"glb_path": str(output_path)}
except Exception as exc:
logger.error("export_blend_for_order_line_task failed for %s: %s", order_line_id, exc)
raise self.retry(exc=exc, countdown=30)
def _build_ffmpeg_cmd(
frames_dir: Path, output_mp4: Path, fps: int = 30, bg_color: str = ""
) -> list:
@@ -20,6 +20,7 @@ def dispatch_workflow(
"still": _build_still,
"turntable": _build_turntable,
"multi_angle": _build_multi_angle,
"still_with_exports": _build_still_with_exports,
}
builder = builders.get(workflow_type)
if not builder:
@@ -30,24 +31,56 @@ def dispatch_workflow(
def _build_still(order_line_id: str, params: dict):
from app.domains.rendering.tasks import render_still_task
"""Still render: resolves STEP path from order_line DB record."""
from app.domains.rendering.tasks import render_order_line_still_task
return chain(
render_still_task.si(order_line_id, **params)
render_order_line_still_task.si(order_line_id, **params)
)
def _build_turntable(order_line_id: str, params: dict):
"""Turntable animation: requires step_path + output_dir in params."""
from app.domains.rendering.tasks import render_turntable_task
step_path = params.get("step_path")
output_dir = params.get("output_dir")
if not step_path or not output_dir:
raise ValueError(
"turntable workflow requires 'step_path' and 'output_dir' in params"
)
remaining = {k: v for k, v in params.items() if k not in ("step_path", "output_dir")}
return chain(
render_turntable_task.si(order_line_id, **params)
render_turntable_task.si(step_path, output_dir, **remaining)
)
def _build_multi_angle(order_line_id: str, params: dict):
from app.domains.rendering.tasks import render_still_task
angles = params.get("angles", [0, 45, 90])
p = {k: v for k, v in params.items() if k != "angles"}
"""Multi-angle stills: renders the same order_line from multiple rotation_z angles."""
from app.domains.rendering.tasks import render_order_line_still_task
angles = params.pop("angles", [0, 45, 90])
return group(
render_still_task.si(order_line_id, camera_angle=angle, **p)
render_order_line_still_task.si(order_line_id, rotation_z=float(angle), **params)
for angle in angles
)
def _build_still_with_exports(order_line_id: str, params: dict):
"""Still render + parallel GLB exports (geometry + production quality).
Pipeline:
render_order_line_still_task → group(
export_gltf_for_order_line_task,
export_blend_for_order_line_task,
)
"""
from app.domains.rendering.tasks import (
render_order_line_still_task,
export_gltf_for_order_line_task,
export_blend_for_order_line_task,
)
return chain(
render_order_line_still_task.si(order_line_id, **params),
group(
export_gltf_for_order_line_task.si(order_line_id),
export_blend_for_order_line_task.si(order_line_id),
),
)
+63
View File
@@ -245,6 +245,69 @@ def generate_stl_cache(self, cad_file_id: str, quality: str):
raise self.retry(exc=exc, countdown=30, max_retries=2)
@celery_app.task(bind=True, name="app.tasks.step_tasks.generate_gltf_geometry_task", queue="thumbnail_rendering", max_retries=1)
def generate_gltf_geometry_task(self, cad_file_id: str):
"""Export a geometry-only GLB from the STL low-quality cache using trimesh.
Creates a MediaAsset with asset_type='gltf_geometry' and cad_file_id set.
No Blender required — trimesh handles the STL→GLB conversion.
"""
from pathlib import Path as _Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
cad_file = session.get(CadFile, cad_file_id)
if not cad_file or not cad_file.stored_path:
logger.error("generate_gltf_geometry_task: no stored_path for %s", cad_file_id)
return
step_path_str = cad_file.stored_path
eng.dispose()
step = _Path(step_path_str)
stl_path = step.parent / f"{step.stem}_low.stl"
if not stl_path.exists():
logger.error("generate_gltf_geometry_task: STL not found %s", stl_path)
raise RuntimeError(f"STL cache not found: {stl_path}")
output_path = step.parent / f"{step.stem}_geometry.glb"
try:
import trimesh
mesh = trimesh.load(str(stl_path))
mesh.export(str(output_path))
logger.info("generate_gltf_geometry_task: exported %s", output_path.name)
except Exception as exc:
logger.error("generate_gltf_geometry_task failed for %s: %s", cad_file_id, exc)
raise self.retry(exc=exc, countdown=15)
# Create MediaAsset record
import asyncio
async def _store():
from app.database import AsyncSessionLocal
from app.domains.media.models import MediaAsset, MediaAssetType
async with AsyncSessionLocal() as db:
import uuid
asset = MediaAsset(
cad_file_id=uuid.UUID(cad_file_id),
asset_type=MediaAssetType.gltf_geometry,
storage_key=str(output_path),
mime_type="model/gltf-binary",
file_size_bytes=output_path.stat().st_size if output_path.exists() else None,
)
db.add(asset)
await db.commit()
return str(asset.id)
asset_id = asyncio.get_event_loop().run_until_complete(_store())
logger.info("generate_gltf_geometry_task: MediaAsset %s created for cad %s", asset_id, cad_file_id)
return {"glb_path": str(output_path), "asset_id": asset_id}
@celery_app.task(bind=True, name="app.tasks.step_tasks.regenerate_thumbnail", queue="thumbnail_rendering")
def regenerate_thumbnail(self, cad_file_id: str, part_colors: dict):
"""Regenerate thumbnail with per-part colours."""
@@ -0,0 +1,191 @@
"""Tests for orders domain — order creation, status transitions, and pricing."""
import uuid
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _create_test_product(db):
from app.domains.products.models import Product
product = Product(
id=uuid.uuid4(),
name=f"Test Product {uuid.uuid4().hex[:6]}",
category_key="TRB",
components=[],
cad_part_materials=[],
)
db.add(product)
await db.commit()
await db.refresh(product)
return product
async def _create_test_order(db, user):
from app.domains.orders.models import Order, OrderStatus
order = Order(
id=uuid.uuid4(),
order_number=f"TEST-{uuid.uuid4().hex[:6].upper()}",
status=OrderStatus.draft,
created_by=user.id,
tenant_id=None,
)
db.add(order)
await db.commit()
await db.refresh(order)
return order
# ---------------------------------------------------------------------------
# Order creation
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_order_draft_status(db, admin_user):
"""New order starts in draft status."""
order = await _create_test_order(db, admin_user)
assert order.id is not None
assert order.status.value == "draft"
assert order.order_number.startswith("TEST-")
@pytest.mark.asyncio
async def test_order_has_no_lines_initially(db, admin_user):
"""New order starts with zero order lines."""
from sqlalchemy import select
from app.domains.orders.models import Order, OrderLine
order = await _create_test_order(db, admin_user)
result = await db.execute(
select(OrderLine).where(OrderLine.order_id == order.id)
)
lines = result.scalars().all()
assert len(lines) == 0
# ---------------------------------------------------------------------------
# Order line creation
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_add_order_line(db, admin_user):
"""Order line can be added to a draft order."""
from app.domains.orders.models import OrderLine
product = await _create_test_product(db)
order = await _create_test_order(db, admin_user)
line = OrderLine(
id=uuid.uuid4(),
order_id=order.id,
product_id=product.id,
render_status="pending",
item_status="pending",
tenant_id=None,
)
db.add(line)
await db.commit()
await db.refresh(line)
assert line.id is not None
assert line.order_id == order.id
assert line.render_status == "pending"
# ---------------------------------------------------------------------------
# Status transitions
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_order_status_transition_to_submitted(db, admin_user):
"""Order status can be changed from draft to submitted."""
from app.domains.orders.models import Order, OrderStatus
order = await _create_test_order(db, admin_user)
order.status = OrderStatus.submitted
await db.commit()
await db.refresh(order)
assert order.status == OrderStatus.submitted
@pytest.mark.asyncio
async def test_order_multiple_lines(db, admin_user):
"""Multiple lines can be added to the same order."""
from app.domains.orders.models import OrderLine
product = await _create_test_product(db)
order = await _create_test_order(db, admin_user)
for _ in range(3):
line = OrderLine(
id=uuid.uuid4(),
order_id=order.id,
product_id=product.id,
render_status="pending",
item_status="pending",
tenant_id=None,
)
db.add(line)
await db.commit()
from sqlalchemy import select
result = await db.execute(
select(OrderLine).where(OrderLine.order_id == order.id)
)
assert len(result.scalars().all()) == 3
# ---------------------------------------------------------------------------
# Render status tracking
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_order_line_render_status_update(db, admin_user):
"""Order line render_status can be updated to processing/completed."""
from app.domains.orders.models import OrderLine
product = await _create_test_product(db)
order = await _create_test_order(db, admin_user)
line = OrderLine(
id=uuid.uuid4(),
order_id=order.id,
product_id=product.id,
render_status="pending",
item_status="pending",
tenant_id=None,
)
db.add(line)
await db.commit()
line.render_status = "processing"
await db.commit()
await db.refresh(line)
assert line.render_status == "processing"
line.render_status = "completed"
await db.commit()
await db.refresh(line)
assert line.render_status == "completed"
# ---------------------------------------------------------------------------
# Unit price
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_order_line_unit_price_nullable(db, admin_user):
"""unit_price defaults to None."""
from app.domains.orders.models import OrderLine
product = await _create_test_product(db)
order = await _create_test_order(db, admin_user)
line = OrderLine(
id=uuid.uuid4(),
order_id=order.id,
product_id=product.id,
render_status="pending",
item_status="pending",
tenant_id=None,
)
db.add(line)
await db.commit()
await db.refresh(line)
assert line.unit_price is None
@@ -0,0 +1,112 @@
"""Tests for rendering domain — workflow builder + task helpers."""
import uuid
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# workflow_builder unit tests (no DB required)
# ---------------------------------------------------------------------------
def test_dispatch_workflow_unknown_type_raises():
from app.domains.rendering.workflow_builder import dispatch_workflow
with pytest.raises(ValueError, match="Unknown workflow type"):
dispatch_workflow("nonexistent_type", str(uuid.uuid4()))
def test_build_still_returns_chain():
"""_build_still returns a Celery chain wrapping render_order_line_still_task."""
from celery import chain
from app.domains.rendering.workflow_builder import _build_still
canvas = _build_still(str(uuid.uuid4()), {})
# A single-task chain is still a Celery Signature, not a plain chain, but
# it should be callable / have apply_async
assert hasattr(canvas, "apply_async")
def test_build_multi_angle_creates_group():
"""_build_multi_angle returns a Celery group with one sig per angle."""
from celery import group
from app.domains.rendering.workflow_builder import _build_multi_angle
order_line_id = str(uuid.uuid4())
canvas = _build_multi_angle(order_line_id, {"angles": [0, 90, 180]})
# group has tasks attribute
assert hasattr(canvas, "tasks")
assert len(canvas.tasks) == 3
def test_build_still_with_exports_is_chain():
"""_build_still_with_exports returns a chain."""
from app.domains.rendering.workflow_builder import _build_still_with_exports
canvas = _build_still_with_exports(str(uuid.uuid4()), {})
assert hasattr(canvas, "apply_async")
def test_build_turntable_raises_without_step_path():
"""_build_turntable raises ValueError if step_path missing in params."""
from app.domains.rendering.workflow_builder import _build_turntable
with pytest.raises(ValueError, match="step_path"):
_build_turntable(str(uuid.uuid4()), {})
def test_build_turntable_raises_without_output_dir():
from app.domains.rendering.workflow_builder import _build_turntable
with pytest.raises(ValueError, match="output_dir"):
_build_turntable(str(uuid.uuid4()), {"step_path": "/tmp/test.stp"})
# ---------------------------------------------------------------------------
# _resolve_step_path_for_order_line — unit-tests with DB (integration)
# ---------------------------------------------------------------------------
@pytest.mark.integration
@pytest.mark.asyncio
async def test_resolve_step_path_returns_none_for_missing_line(db):
"""Returns (None, None) for a line_id that doesn't exist."""
from app.domains.rendering.tasks import _resolve_step_path_for_order_line
import asyncio
result = _resolve_step_path_for_order_line(str(uuid.uuid4()))
assert result == (None, None)
# ---------------------------------------------------------------------------
# publish_asset (unit test with mocked DB)
# ---------------------------------------------------------------------------
def test_publish_asset_signature():
"""publish_asset is importable and is a bound Celery task."""
from app.domains.rendering.tasks import publish_asset
assert callable(publish_asset)
assert hasattr(publish_asset, "delay")
# ---------------------------------------------------------------------------
# generate_gltf_geometry_task — smoke test (unit)
# ---------------------------------------------------------------------------
def test_generate_gltf_geometry_task_importable():
from app.tasks.step_tasks import generate_gltf_geometry_task
assert callable(generate_gltf_geometry_task)
assert hasattr(generate_gltf_geometry_task, "delay")
# ---------------------------------------------------------------------------
# New order-line tasks are importable and correctly registered
# ---------------------------------------------------------------------------
def test_render_order_line_still_task_importable():
from app.domains.rendering.tasks import render_order_line_still_task
assert render_order_line_still_task.name == "app.domains.rendering.tasks.render_order_line_still_task"
assert render_order_line_still_task.queue == "thumbnail_rendering"
def test_export_gltf_for_order_line_task_importable():
from app.domains.rendering.tasks import export_gltf_for_order_line_task
assert export_gltf_for_order_line_task.queue == "thumbnail_rendering"
def test_export_blend_for_order_line_task_importable():
from app.domains.rendering.tasks import export_blend_for_order_line_task
assert export_blend_for_order_line_task.queue == "thumbnail_rendering"