From fe46dabfc5fbc1d04ad46b3a8159ae69c5856e0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Wed, 8 Apr 2026 21:44:02 +0200 Subject: [PATCH] feat: add workflow rollout gate signals --- .../app/domains/rendering/dispatch_service.py | 232 +++- .../rendering/workflow_comparison_service.py | 101 +- .../domains/test_workflow_dispatch_service.py | 1122 ++++++++++++++++- scripts/test_render_pipeline.py | 270 +++- 4 files changed, 1624 insertions(+), 101 deletions(-) diff --git a/backend/app/domains/rendering/dispatch_service.py b/backend/app/domains/rendering/dispatch_service.py index 65103ed..9b79e70 100644 --- a/backend/app/domains/rendering/dispatch_service.py +++ b/backend/app/domains/rendering/dispatch_service.py @@ -17,6 +17,28 @@ import logging logger = logging.getLogger(__name__) +def _build_rollout_signal( + *, + gate_status: str, + ready: bool, + reasons: list[str], + workflow_def_id=None, + output_type_id=None, + verdict: str | None = None, +) -> dict: + return { + "rollout_gate_status": gate_status, + "rollout_gate_verdict": verdict, + "rollout_gate_reasons": reasons, + "workflow_rollout_ready": ready, + "workflow_rollout_status": "ready_for_rollout" if ready else "hold_legacy_authoritative", + "output_type_rollout_ready": ready, + "output_type_rollout_status": "ready_for_rollout" if ready else "hold_legacy_authoritative", + "rollout_workflow_definition_id": str(workflow_def_id) if workflow_def_id is not None else None, + "rollout_output_type_id": str(output_type_id) if output_type_id is not None else None, + } + + def dispatch_render_with_workflow(order_line_id: str) -> dict: """Dispatch a render for the given order line. @@ -33,6 +55,7 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: from app.domains.orders.models import OrderLine from app.domains.rendering.models import OutputType, WorkflowDefinition from app.domains.rendering.workflow_config_utils import ( + canonicalize_workflow_config, extract_runtime_workflow, get_workflow_execution_mode, ) @@ -67,7 +90,16 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: "order_line %s: no workflow_definition_id, using legacy dispatch", order_line_id, ) - return _legacy_dispatch(order_line_id) + legacy_result = _legacy_dispatch(order_line_id) + legacy_result.update( + _build_rollout_signal( + gate_status="legacy_only", + ready=False, + reasons=["No workflow definition is linked; legacy dispatch remains authoritative."], + output_type_id=getattr(output_type, "id", None), + ) + ) + return legacy_result # Load the linked WorkflowDefinition wf_def: WorkflowDefinition | None = session.execute( @@ -84,13 +116,45 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: order_line_id, output_type.workflow_definition_id, ) - return _legacy_dispatch(order_line_id) + legacy_result = _legacy_dispatch(order_line_id) + legacy_result.update( + _build_rollout_signal( + gate_status="workflow_unavailable", + ready=False, + reasons=["Linked workflow definition is missing or inactive; legacy dispatch remains authoritative."], + workflow_def_id=output_type.workflow_definition_id, + output_type_id=output_type.id, + ) + ) + return legacy_result - execution_mode = get_workflow_execution_mode(wf_def.config, default="legacy") + try: + canonical_config = canonicalize_workflow_config(wf_def.config) + except Exception as exc: + logger.warning( + "order_line %s: workflow_definition_id %s has invalid config (%s), " + "falling back to legacy dispatch", + order_line_id, + wf_def.id, + exc, + ) + legacy_result = _legacy_dispatch(order_line_id) + legacy_result.update( + _build_rollout_signal( + gate_status="workflow_invalid", + ready=False, + reasons=[f"Workflow definition config is invalid: {exc}."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) + return legacy_result + + execution_mode = get_workflow_execution_mode(canonical_config, default="legacy") def _prepare_graph_context(target_mode: str): workflow_context = prepare_workflow_context( - wf_def.config, + canonical_config, context_id=order_line_id, execution_mode=target_mode, ) @@ -122,7 +186,18 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: wf_def.id, exc, ) - return _legacy_dispatch(order_line_id) + legacy_result = _legacy_dispatch(order_line_id) + legacy_result["fallback_from"] = "workflow_graph" + legacy_result.update( + _build_rollout_signal( + gate_status="graph_preparation_failed", + ready=False, + reasons=[f"Graph runtime preparation failed: {exc}."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) + return legacy_result run = None try: @@ -136,7 +211,18 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: wf_def.id, exc, ) - return _legacy_dispatch(order_line_id) + legacy_result = _legacy_dispatch(order_line_id) + legacy_result["fallback_from"] = "workflow_graph" + legacy_result.update( + _build_rollout_signal( + gate_status="graph_run_creation_failed", + ready=False, + reasons=[f"Graph workflow run creation failed: {exc}."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) + return legacy_result try: dispatch_result = execute_graph_workflow(session, workflow_context) @@ -154,15 +240,35 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: fallback_result = _legacy_dispatch(order_line_id) fallback_result["fallback_from"] = "workflow_graph" fallback_result["workflow_run_id"] = str(run.id) + fallback_result.update( + _build_rollout_signal( + gate_status="graph_execution_failed", + ready=False, + reasons=[f"Graph workflow execution failed: {exc}."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) return fallback_result - return { + result = { "backend": "workflow_graph", "execution_mode": "graph", "workflow_run_id": str(run.id), "celery_task_id": dispatch_result.task_ids[0] if dispatch_result.task_ids else None, "task_ids": dispatch_result.task_ids, } + result.update( + _build_rollout_signal( + gate_status="graph_authoritative", + ready=True, + verdict="pass", + reasons=["Workflow graph dispatch is authoritative for this output type."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) + return result if execution_mode == "shadow": legacy_result = _legacy_dispatch(order_line_id) @@ -180,6 +286,18 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: legacy_result["execution_mode"] = "shadow" legacy_result["shadow_status"] = "skipped" legacy_result["shadow_error"] = str(exc) + legacy_result.update( + _build_rollout_signal( + gate_status="shadow_skipped", + ready=False, + reasons=[ + "Shadow workflow preparation failed; legacy dispatch remains authoritative.", + f"Preparation error: {exc}.", + ], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) return legacy_result run = None @@ -197,6 +315,18 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: legacy_result["execution_mode"] = "shadow" legacy_result["shadow_status"] = "failed" legacy_result["shadow_error"] = str(exc) + legacy_result.update( + _build_rollout_signal( + gate_status="shadow_run_creation_failed", + ready=False, + reasons=[ + "Shadow workflow run could not be created; legacy dispatch remains authoritative.", + f"Run creation error: {exc}.", + ], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) return legacy_result try: @@ -216,15 +346,39 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: legacy_result["shadow_status"] = "failed" legacy_result["shadow_error"] = str(exc) legacy_result["shadow_workflow_run_id"] = str(run.id) + legacy_result.update( + _build_rollout_signal( + gate_status="shadow_execution_failed", + ready=False, + reasons=[ + "Shadow workflow execution failed; legacy dispatch remains authoritative.", + f"Execution error: {exc}.", + ], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) return legacy_result legacy_result["execution_mode"] = "shadow" legacy_result["shadow_status"] = "dispatched" legacy_result["shadow_workflow_run_id"] = str(run.id) legacy_result["shadow_task_ids"] = dispatch_result.task_ids + legacy_result.update( + _build_rollout_signal( + gate_status="pending_shadow_verdict", + ready=False, + reasons=[ + "Legacy dispatch remains authoritative until the shadow workflow comparison returns pass.", + "A pass verdict is required before workflow-first rollout is ready.", + ], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) return legacy_result - workflow_type, params = extract_runtime_workflow(wf_def.config) + workflow_type, params = extract_runtime_workflow(canonical_config) if workflow_type is None or workflow_type == "custom": logger.warning( "order_line %s: workflow_definition_id %s has no supported preset runtime, " @@ -232,7 +386,17 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: order_line_id, wf_def.id, ) - return _legacy_dispatch(order_line_id) + legacy_result = _legacy_dispatch(order_line_id) + legacy_result.update( + _build_rollout_signal( + gate_status="workflow_runtime_unsupported", + ready=False, + reasons=["Workflow definition has no supported preset runtime; legacy dispatch remains authoritative."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) + return legacy_result logger.info( "order_line %s: dispatching via WorkflowDefinition %s (type=%s)", @@ -243,7 +407,7 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: try: workflow_context = prepare_workflow_context( - wf_def.config, + canonical_config, context_id=order_line_id, execution_mode="legacy", ) @@ -255,7 +419,17 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: wf_def.id, exc, ) - return _legacy_dispatch(order_line_id) + legacy_result = _legacy_dispatch(order_line_id) + legacy_result.update( + _build_rollout_signal( + gate_status="workflow_preparation_failed", + ready=False, + reasons=[f"Workflow runtime preparation failed: {exc}."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) + return legacy_result # For turntable workflows: resolve step_path + output_dir from the order line at runtime if workflow_type == "turntable" and ("step_path" not in params or "output_dir" not in params): @@ -299,7 +473,17 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: wf_def.id, exc, ) - return _legacy_dispatch(order_line_id) + legacy_result = _legacy_dispatch(order_line_id) + legacy_result.update( + _build_rollout_signal( + gate_status="workflow_run_creation_failed", + ready=False, + reasons=[f"Workflow run creation failed: {exc}."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) + return legacy_result from app.domains.rendering.workflow_builder import dispatch_workflow @@ -317,15 +501,35 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: order_line_id, wf_def.id, ) - return _legacy_dispatch(order_line_id) + legacy_result = _legacy_dispatch(order_line_id) + legacy_result.update( + _build_rollout_signal( + gate_status="workflow_dispatch_failed", + ready=False, + reasons=[f"Workflow dispatch failed: {exc}."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) + return legacy_result - return { + result = { "backend": "workflow", "workflow_type": workflow_type, "execution_mode": "legacy", "workflow_run_id": str(run.id), "celery_task_id": celery_task_id, } + result.update( + _build_rollout_signal( + gate_status="workflow_legacy_runtime", + ready=False, + reasons=["Workflow definition is active, but execution still uses the legacy runtime path."], + workflow_def_id=wf_def.id, + output_type_id=output_type.id, + ) + ) + return result def _legacy_dispatch(order_line_id: str) -> dict: diff --git a/backend/app/domains/rendering/workflow_comparison_service.py b/backend/app/domains/rendering/workflow_comparison_service.py index 8b46995..4256941 100644 --- a/backend/app/domains/rendering/workflow_comparison_service.py +++ b/backend/app/domains/rendering/workflow_comparison_service.py @@ -10,11 +10,16 @@ from PIL import Image, ImageChops, ImageStat from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from app.config import settings +from app.core.render_paths import resolve_result_path, result_path_to_storage_key from app.domains.media.models import MediaAsset from app.domains.orders.models import OrderLine from app.domains.rendering.models import WorkflowRun from app.domains.rendering.schemas import WorkflowComparisonArtifactOut, WorkflowRunComparisonOut +ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA = 0.0 +ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA = 0.02 + @dataclass(slots=True) class _ArtifactComparison: @@ -36,18 +41,78 @@ class _ArtifactComparison: sha256=self.sha256, mime_type=self.mime_type, image_width=self.image_width, - image_height=self.image_height, + image_height=self.image_height, ) +def evaluate_rollout_gate( + *, + authoritative_output: _ArtifactComparison, + observer_output: _ArtifactComparison, + exact_match: bool | None, + dimensions_match: bool | None, + mean_pixel_delta: float | None, +) -> dict[str, object]: + thresholds = { + "pass_max_mean_pixel_delta": ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA, + "warn_max_mean_pixel_delta": ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA, + } + reasons: list[str] = [] + + if not authoritative_output.exists: + verdict = "fail" + reasons.append("Authoritative legacy output is missing; keep legacy fallback active.") + elif not observer_output.exists: + verdict = "fail" + reasons.append("Observer workflow output is missing; rollout cannot be approved.") + elif exact_match: + verdict = "pass" + reasons.append("Observer output matches the authoritative legacy output byte-for-byte.") + elif dimensions_match is False: + verdict = "fail" + reasons.append("Observer output dimensions differ from the authoritative legacy output.") + elif mean_pixel_delta is None: + verdict = "fail" + reasons.append("Observer output could not be pixel-compared against the authoritative output.") + elif mean_pixel_delta <= ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA: + verdict = "pass" + reasons.append("Observer output is visually identical within the pass threshold.") + elif mean_pixel_delta <= ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA: + verdict = "warn" + reasons.append( + "Observer output differs slightly from the authoritative output but remains within the warn threshold." + ) + else: + verdict = "fail" + reasons.append( + "Observer output exceeds the allowed parity threshold; keep legacy fallback active." + ) + + if mean_pixel_delta is not None and not exact_match: + reasons.append( + f"Mean pixel delta {mean_pixel_delta:.6f}; " + f"pass<={ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA:.6f}, " + f"warn<={ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA:.6f}." + ) + + rollout_ready = verdict == "pass" + rollout_status = "ready_for_rollout" if rollout_ready else "hold_legacy_authoritative" + + return { + "verdict": verdict, + "ready": rollout_ready, + "status": rollout_status, + "reasons": reasons, + "thresholds": thresholds, + "workflow_rollout_ready": rollout_ready, + "workflow_rollout_status": rollout_status, + "output_type_rollout_ready": rollout_ready, + "output_type_rollout_status": rollout_status, + } + + def _normalize_storage_key(path: str | None) -> str | None: - if not path: - return None - normalized = path.replace("\\", "/") - marker = "/uploads/" - if marker in normalized: - return normalized.split(marker, 1)[1] - return normalized.lstrip("/") + return result_path_to_storage_key(path) def _build_artifact(path: str | None) -> _ArtifactComparison: @@ -63,7 +128,8 @@ def _build_artifact(path: str | None) -> _ArtifactComparison: image_height=None, ) - file_path = Path(path) + resolved_path = resolve_result_path(path) + file_path = resolved_path or Path(path) exists = file_path.exists() mime_type, _ = mimetypes.guess_type(str(file_path)) @@ -136,10 +202,8 @@ async def _load_shadow_asset_by_workflow_run( if asset is None: return None - storage_key = asset.storage_key.lstrip("/") - if storage_key.startswith("app/uploads/"): - return f"/{storage_key}" - return f"/app/uploads/{storage_key}" + resolved = resolve_result_path(asset.storage_key) + return str(resolved) if resolved is not None else None def _find_shadow_file(order_line: OrderLine, workflow_run: WorkflowRun) -> str | None: @@ -147,9 +211,13 @@ def _find_shadow_file(order_line: OrderLine, workflow_run: WorkflowRun) -> str | candidate_roots: list[Path] = [] if order_line.result_path: - candidate_roots.append(Path(order_line.result_path).parent) + resolved_result = resolve_result_path(order_line.result_path) + if resolved_result is not None: + candidate_roots.append(resolved_result.parent) - candidate_roots.append(Path("/app/uploads/renders") / str(order_line.id)) + upload_root = Path(settings.upload_dir) + candidate_roots.append(upload_root / "renders" / str(order_line.id)) + candidate_roots.append(upload_root / "step_files" / "renders") seen_roots: set[Path] = set() candidates: list[Path] = [] @@ -215,6 +283,9 @@ async def build_workflow_run_comparison( if exact_match: status = "matched" summary = "Observer output matches the authoritative legacy output byte-for-byte." + elif mean_pixel_delta == 0.0 and dimensions_match: + status = "matched" + summary = "Observer output matches the authoritative legacy output visually, but file metadata differs." else: status = "different" if dimensions_match is False: diff --git a/backend/tests/domains/test_workflow_dispatch_service.py b/backend/tests/domains/test_workflow_dispatch_service.py index 06e56cb..9fa8b9f 100644 --- a/backend/tests/domains/test_workflow_dispatch_service.py +++ b/backend/tests/domains/test_workflow_dispatch_service.py @@ -2,17 +2,22 @@ from __future__ import annotations import uuid from pathlib import Path +from types import SimpleNamespace import pytest -from PIL import Image +from PIL import Image, PngImagePlugin from sqlalchemy import select from sqlalchemy.orm import selectinload from app.config import settings -from app.domains.orders.models import Order, OrderLine +from app.domains.orders.models import Order, OrderLine, OrderStatus from app.domains.products.models import CadFile, Product from app.domains.rendering.dispatch_service import dispatch_render_with_workflow from app.domains.rendering.models import OutputType, WorkflowDefinition, WorkflowRun +from app.domains.rendering.workflow_comparison_service import ( + _build_artifact, + evaluate_rollout_gate, +) from app.domains.rendering.workflow_config_utils import build_preset_workflow_config @@ -131,10 +136,14 @@ async def test_dispatch_render_with_workflow_falls_back_to_legacy_without_workfl await db.rollback() - assert result == { - "backend": "legacy", - "order_line_id": str(seeded["order_line"].id), - } + assert result["backend"] == "legacy" + assert result["order_line_id"] == str(seeded["order_line"].id) + assert result["rollout_gate_status"] == "legacy_only" + assert result["rollout_gate_verdict"] is None + assert result["workflow_rollout_ready"] is False + assert result["output_type_rollout_ready"] is False + assert result["rollout_workflow_definition_id"] is None + assert result["rollout_output_type_id"] == str(seeded["output_type"].id) runs = (await db.execute(select(WorkflowRun))).scalars().all() assert runs == [] @@ -214,10 +223,15 @@ async def test_dispatch_render_with_workflow_falls_back_when_workflow_runtime_pr await db.rollback() - assert result == { - "backend": "legacy", - "order_line_id": str(seeded["order_line"].id), - } + assert result["backend"] == "legacy" + assert result["order_line_id"] == str(seeded["order_line"].id) + assert result["rollout_gate_status"] == "workflow_preparation_failed" + assert result["rollout_gate_verdict"] is None + assert result["workflow_rollout_ready"] is False + assert result["output_type_rollout_ready"] is False + assert result["rollout_workflow_definition_id"] == str(seeded["workflow_definition"].id) + assert result["rollout_output_type_id"] == str(seeded["output_type"].id) + assert any("Workflow runtime preparation failed:" in reason for reason in result["rollout_gate_reasons"]) runs = (await db.execute(select(WorkflowRun))).scalars().all() assert runs == [] @@ -276,11 +290,138 @@ async def test_dispatch_render_with_workflow_graph_mode_dispatches_supported_cus assert result["backend"] == "workflow_graph" assert result["execution_mode"] == "graph" assert result["task_ids"] == ["graph-task-1"] + assert result["rollout_gate_status"] == "graph_authoritative" + assert result["rollout_gate_verdict"] == "pass" + assert result["workflow_rollout_ready"] is True + assert result["output_type_rollout_ready"] is True assert run.execution_mode == "graph" assert run.status == "pending" assert node_results["setup"].status == "completed" assert node_results["template"].status == "completed" assert node_results["render"].status == "queued" + assert node_results["render"].output["publish_asset_enabled"] is True + assert node_results["render"].output["graph_authoritative_output_enabled"] is False + + +@pytest.mark.asyncio +async def test_dispatch_render_with_workflow_graph_mode_uses_output_save_as_authoritative_boundary( + db, + admin_user, + monkeypatch, + tmp_path, +): + _use_test_database(monkeypatch) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Graph Output Save {uuid.uuid4().hex[:8]}", + output_type_id=order_line.output_type_id, + config={ + "version": 1, + "ui": {"preset": "custom", "execution_mode": "graph"}, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, + {"id": "output", "step": "output_save", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + {"from": "render", "to": "output"}, + ], + }, + is_active=True, + ) + db.add(workflow_definition) + await db.flush() + output_type = await db.get(OutputType, order_line.output_type_id) + assert output_type is not None + output_type.workflow_definition_id = workflow_definition.id + await db.commit() + + calls: list[tuple[str, list[str], dict]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict): + calls.append((task_name, args, kwargs)) + return type("Result", (), {"id": "graph-output-save-task-1"})() + + monkeypatch.setattr("app.tasks.celery_app.celery_app.send_task", _fake_send_task) + + result = dispatch_render_with_workflow(str(order_line.id)) + + await db.rollback() + + run_result = await db.execute( + select(WorkflowRun) + .where(WorkflowRun.id == uuid.UUID(result["workflow_run_id"])) + .options(selectinload(WorkflowRun.node_results)) + ) + run = run_result.scalar_one() + node_results = {node_result.node_name: node_result for node_result in run.node_results} + + assert result["backend"] == "workflow_graph" + assert result["task_ids"] == ["graph-output-save-task-1"] + assert len(calls) == 1 + assert calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" + assert calls[0][1] == [str(order_line.id)] + assert calls[0][2]["publish_asset_enabled"] is False + assert calls[0][2]["graph_authoritative_output_enabled"] is True + assert calls[0][2]["graph_output_node_ids"] == ["output"] + assert node_results["output"].status == "completed" + assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" + + +@pytest.mark.asyncio +async def test_dispatch_render_with_workflow_graph_mode_canonicalizes_legacy_preset_config( + db, + admin_user, + monkeypatch, + tmp_path, +): + _use_test_database(monkeypatch) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Legacy Preset Graph {uuid.uuid4().hex[:8]}", + output_type_id=order_line.output_type_id, + config={ + "type": "still", + "params": {"width": 1024, "height": 768}, + "ui": {"execution_mode": "graph"}, + }, + is_active=True, + ) + db.add(workflow_definition) + await db.flush() + output_type = await db.get(OutputType, order_line.output_type_id) + assert output_type is not None + output_type.workflow_definition_id = workflow_definition.id + await db.commit() + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + lambda task_name, args, kwargs: type("Result", (), {"id": "legacy-graph-task-1"})(), + ) + + result = dispatch_render_with_workflow(str(order_line.id)) + + await db.rollback() + + run_result = await db.execute( + select(WorkflowRun) + .where(WorkflowRun.id == uuid.UUID(result["workflow_run_id"])) + .options(selectinload(WorkflowRun.node_results)) + ) + run = run_result.scalar_one() + node_results = {node_result.node_name: node_result for node_result in run.node_results} + + assert result["backend"] == "workflow_graph" + assert result["execution_mode"] == "graph" + assert result["task_ids"] == ["legacy-graph-task-1"] + assert run.execution_mode == "graph" + assert node_results["setup"].status == "completed" + assert node_results["template"].status == "completed" + assert node_results["render"].status == "queued" + assert node_results["output"].status == "completed" @pytest.mark.asyncio @@ -342,6 +483,8 @@ async def test_dispatch_render_with_workflow_graph_mode_falls_back_to_legacy_on_ assert result["backend"] == "legacy" assert result["fallback_from"] == "workflow_graph" assert result["workflow_run_id"] == str(run.id) + assert result["rollout_gate_status"] == "graph_execution_failed" + assert result["workflow_rollout_ready"] is False assert run.execution_mode == "graph" assert run.status == "failed" assert run.error_message == "graph dispatch exploded" @@ -409,6 +552,10 @@ async def test_dispatch_render_with_workflow_shadow_mode_keeps_legacy_authoritat assert result["execution_mode"] == "shadow" assert result["shadow_status"] == "dispatched" assert result["shadow_task_ids"] == ["shadow-task-1"] + assert result["rollout_gate_status"] == "pending_shadow_verdict" + assert result["rollout_gate_verdict"] is None + assert result["workflow_rollout_ready"] is False + assert result["output_type_rollout_ready"] is False assert run.execution_mode == "shadow" assert run.status == "pending" assert render_call[0] == "app.domains.rendering.tasks.render_order_line_still_task" @@ -421,6 +568,65 @@ async def test_dispatch_render_with_workflow_shadow_mode_keeps_legacy_authoritat assert render_call[2]["workflow_node_id"] == "render" +@pytest.mark.asyncio +async def test_dispatch_render_with_workflow_shadow_mode_canonicalizes_legacy_preset_config( + db, + admin_user, + monkeypatch, + tmp_path, +): + _use_test_database(monkeypatch) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Legacy Preset Shadow {uuid.uuid4().hex[:8]}", + output_type_id=order_line.output_type_id, + config={ + "type": "still", + "params": {"width": 1024, "height": 768}, + "ui": {"execution_mode": "shadow"}, + }, + is_active=True, + ) + db.add(workflow_definition) + await db.flush() + output_type = await db.get(OutputType, order_line.output_type_id) + assert output_type is not None + output_type.workflow_definition_id = workflow_definition.id + await db.commit() + + calls: list[tuple[str, list[str], dict]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict): + calls.append((task_name, args, kwargs)) + return type("Result", (), {"id": "legacy-shadow-task-1"})() + + monkeypatch.setattr( + "app.domains.rendering.dispatch_service._legacy_dispatch", + lambda order_line_id: {"backend": "legacy", "order_line_id": order_line_id}, + ) + monkeypatch.setattr("app.tasks.celery_app.celery_app.send_task", _fake_send_task) + + result = dispatch_render_with_workflow(str(order_line.id)) + + await db.rollback() + + run_result = await db.execute( + select(WorkflowRun) + .where(WorkflowRun.id == uuid.UUID(result["shadow_workflow_run_id"])) + .options(selectinload(WorkflowRun.node_results)) + ) + run = run_result.scalar_one() + node_results = {node_result.node_name: node_result for node_result in run.node_results} + + assert result["backend"] == "legacy" + assert result["execution_mode"] == "shadow" + assert result["shadow_status"] == "dispatched" + assert result["shadow_task_ids"] == ["legacy-shadow-task-1"] + assert run.execution_mode == "shadow" + assert node_results["output"].status == "completed" + assert calls[0][2]["publish_asset_enabled"] is False + + @pytest.mark.asyncio async def test_dispatch_render_with_workflow_shadow_mode_ignores_graph_failures_after_legacy_dispatch( db, @@ -475,11 +681,198 @@ async def test_dispatch_render_with_workflow_shadow_mode_ignores_graph_failures_ assert result["shadow_status"] == "failed" assert result["shadow_error"] == "shadow graph exploded" assert result["shadow_workflow_run_id"] == str(run.id) + assert result["rollout_gate_status"] == "shadow_execution_failed" + assert result["workflow_rollout_ready"] is False assert run.execution_mode == "shadow" assert run.status == "failed" assert run.error_message == "shadow graph exploded" +def test_evaluate_rollout_gate_passes_exact_match(tmp_path: Path): + authoritative = tmp_path / "authoritative.png" + observer = tmp_path / "observer.png" + + Image.new("RGBA", (16, 16), color=(0, 128, 255, 255)).save(authoritative) + Image.new("RGBA", (16, 16), color=(0, 128, 255, 255)).save(observer) + + gate = evaluate_rollout_gate( + authoritative_output=_build_artifact(str(authoritative)), + observer_output=_build_artifact(str(observer)), + exact_match=True, + dimensions_match=True, + mean_pixel_delta=0.0, + ) + + assert gate["verdict"] == "pass" + assert gate["ready"] is True + assert gate["workflow_rollout_ready"] is True + assert gate["output_type_rollout_ready"] is True + + +def test_evaluate_rollout_gate_warns_on_small_visual_delta(tmp_path: Path): + authoritative = tmp_path / "authoritative.png" + observer = tmp_path / "observer.png" + + Image.new("RGBA", (16, 16), color=(0, 128, 255, 255)).save(authoritative) + Image.new("RGBA", (16, 16), color=(0, 129, 255, 255)).save(observer) + + gate = evaluate_rollout_gate( + authoritative_output=_build_artifact(str(authoritative)), + observer_output=_build_artifact(str(observer)), + exact_match=False, + dimensions_match=True, + mean_pixel_delta=1 / (4 * 255), + ) + + assert gate["verdict"] == "warn" + assert gate["ready"] is False + assert gate["status"] == "hold_legacy_authoritative" + assert any("warn threshold" in reason for reason in gate["reasons"]) + + +def test_evaluate_rollout_gate_fails_on_missing_observer(tmp_path: Path): + authoritative = tmp_path / "authoritative.png" + Image.new("RGBA", (16, 16), color=(0, 128, 255, 255)).save(authoritative) + + gate = evaluate_rollout_gate( + authoritative_output=_build_artifact(str(authoritative)), + observer_output=_build_artifact(str(tmp_path / "missing.png")), + exact_match=None, + dimensions_match=None, + mean_pixel_delta=None, + ) + + assert gate["verdict"] == "fail" + assert gate["ready"] is False + assert any("Observer workflow output is missing" in reason for reason in gate["reasons"]) + + +def test_dispatch_render_with_workflow_unit_adds_legacy_only_rollout_signal(monkeypatch): + order_line_id = str(uuid.uuid4()) + output_type_id = uuid.uuid4() + fake_line = SimpleNamespace( + id=uuid.UUID(order_line_id), + output_type=SimpleNamespace(id=output_type_id, workflow_definition_id=None), + ) + + class _FakeExecuteResult: + def __init__(self, value): + self._value = value + + def scalar_one_or_none(self): + return self._value + + class _FakeSession: + def __init__(self, _engine): + self._engine = _engine + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def execute(self, _query): + return _FakeExecuteResult(fake_line) + + monkeypatch.setattr("sqlalchemy.create_engine", lambda *args, **kwargs: object()) + monkeypatch.setattr("sqlalchemy.orm.Session", _FakeSession) + monkeypatch.setattr( + "app.domains.rendering.dispatch_service._legacy_dispatch", + lambda value: {"backend": "legacy", "order_line_id": value}, + ) + + result = dispatch_render_with_workflow(order_line_id) + + assert result["backend"] == "legacy" + assert result["rollout_gate_status"] == "legacy_only" + assert result["workflow_rollout_ready"] is False + assert result["rollout_output_type_id"] == str(output_type_id) + + +def test_dispatch_render_with_workflow_unit_marks_shadow_dispatch_as_pending_rollout(monkeypatch): + order_line_id = str(uuid.uuid4()) + output_type_id = uuid.uuid4() + workflow_def_id = uuid.uuid4() + fake_line = SimpleNamespace( + id=uuid.UUID(order_line_id), + output_type=SimpleNamespace(id=output_type_id, workflow_definition_id=workflow_def_id), + ) + fake_workflow_def = SimpleNamespace(id=workflow_def_id, config={"version": 1}, is_active=True) + fake_run = SimpleNamespace(id=uuid.uuid4()) + execute_values = [fake_line, fake_workflow_def] + + class _FakeExecuteResult: + def __init__(self, value): + self._value = value + + def scalar_one_or_none(self): + return self._value + + class _FakeSession: + def __init__(self, _engine): + self._engine = _engine + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def execute(self, _query): + return _FakeExecuteResult(execute_values.pop(0)) + + def commit(self): + return None + + def rollback(self): + return None + + def add(self, _value): + return None + + monkeypatch.setattr("sqlalchemy.create_engine", lambda *args, **kwargs: object()) + monkeypatch.setattr("sqlalchemy.orm.Session", _FakeSession) + monkeypatch.setattr( + "app.domains.rendering.dispatch_service._legacy_dispatch", + lambda value: {"backend": "legacy", "order_line_id": value}, + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_config_utils.canonicalize_workflow_config", + lambda config: config, + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_config_utils.get_workflow_execution_mode", + lambda config, default="legacy": "shadow", + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_executor.prepare_workflow_context", + lambda *args, **kwargs: {"nodes": [{"id": "render"}]}, + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_graph_runtime.find_unsupported_graph_nodes", + lambda *_args, **_kwargs: [], + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_run_service.create_workflow_run", + lambda *args, **kwargs: fake_run, + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_graph_runtime.execute_graph_workflow", + lambda *args, **kwargs: SimpleNamespace(task_ids=["shadow-task-1"]), + ) + + result = dispatch_render_with_workflow(order_line_id) + + assert result["backend"] == "legacy" + assert result["execution_mode"] == "shadow" + assert result["shadow_status"] == "dispatched" + assert result["rollout_gate_status"] == "pending_shadow_verdict" + assert result["workflow_rollout_ready"] is False + assert result["output_type_rollout_ready"] is False + assert result["shadow_workflow_run_id"] == str(fake_run.id) + + @pytest.mark.asyncio async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results( client, @@ -491,6 +884,14 @@ async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results ): monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + output_type = await db.get(OutputType, order_line.output_type_id) + assert output_type is not None + output_type.render_settings = { + "width": 2048, + "height": 2048, + "engine": "cycles", + "samples": 128, + } workflow_definition = WorkflowDefinition( name=f"Dispatch Workflow {uuid.uuid4().hex[:8]}", config=build_preset_workflow_config("still_with_exports", {"width": 640, "height": 640}), @@ -526,8 +927,12 @@ async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results "app.domains.rendering.tasks.export_blend_for_order_line_task", ] assert [call[1] for call in calls] == [[context_id], [context_id]] - assert calls[0][2]["width"] == 640 - assert calls[0][2]["height"] == 640 + assert calls[0][2]["publish_asset_enabled"] is False + assert calls[0][2]["graph_authoritative_output_enabled"] is True + assert calls[0][2]["graph_output_node_ids"] == ["output"] + assert calls[0][2]["width"] == 2048 + assert calls[0][2]["height"] == 2048 + assert calls[0][2]["samples"] == 128 assert calls[0][2]["workflow_node_id"] == "render" assert calls[1][2]["workflow_node_id"] == "blend" assert "workflow_run_id" in calls[0][2] @@ -537,6 +942,7 @@ async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results assert body["workflow_run"]["status"] == "pending" assert body["workflow_run"]["execution_mode"] == "graph" assert body["workflow_run"]["celery_task_id"] == "task-1" + assert body["workflow_run"]["order_line_id"] == str(order_line.id) assert node_results["render"]["status"] == "queued" assert node_results["render"]["output"]["task_id"] == "task-1" assert node_results["blend"]["status"] == "queued" @@ -545,7 +951,606 @@ async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results assert node_results["setup"]["output"]["order_line_id"] == str(order_line.id) assert node_results["template"]["status"] == "completed" assert node_results["template"]["output"]["use_materials"] is False - assert node_results["output"]["status"] == "skipped" + assert node_results["output"]["status"] == "completed" + assert node_results["output"]["output"]["publication_mode"] == "awaiting_graph_authoritative_save" + + +@pytest.mark.asyncio +async def test_workflow_dispatch_endpoint_arms_output_save_for_export_blend( + client, + db, + admin_user, + auth_headers, + tmp_path, + monkeypatch, +): + monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Blend Output Workflow {uuid.uuid4().hex[:8]}", + config={ + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "blend", "step": "export_blend", "params": {}}, + {"id": "output", "step": "output_save", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "blend"}, + {"from": "blend", "to": "output"}, + ], + }, + is_active=True, + ) + db.add(workflow_definition) + await db.commit() + await db.refresh(workflow_definition) + + calls: list[tuple[str, list[str], dict]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict): + calls.append((task_name, args, kwargs)) + return type("Result", (), {"id": f"task-{len(calls)}"})() + + context_id = str(order_line.id) + monkeypatch.setattr("app.tasks.celery_app.celery_app.send_task", _fake_send_task) + response = await client.post( + f"/api/workflows/{workflow_definition.id}/dispatch", + params={"context_id": context_id}, + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + + assert body["context_id"] == context_id + assert body["execution_mode"] == "graph" + assert body["dispatched"] == 1 + assert body["task_ids"] == ["task-1"] + assert calls == [ + ( + "app.domains.rendering.tasks.export_blend_for_order_line_task", + [context_id], + { + "workflow_run_id": body["workflow_run"]["id"], + "workflow_node_id": "blend", + "publish_asset_enabled": False, + "graph_authoritative_output_enabled": True, + "graph_output_node_ids": ["output"], + }, + ) + ] + + node_results = {node["node_name"]: node for node in body["workflow_run"]["node_results"]} + assert node_results["blend"]["status"] == "queued" + assert node_results["blend"]["output"]["predicted_asset_type"] == "blend_production" + assert node_results["blend"]["output"]["publish_asset_enabled"] is False + assert node_results["blend"]["output"]["graph_authoritative_output_enabled"] is True + assert node_results["blend"]["output"]["graph_output_node_ids"] == ["output"] + assert node_results["output"]["status"] == "completed" + assert node_results["output"]["output"]["publication_mode"] == "awaiting_graph_authoritative_save" + + +@pytest.mark.asyncio +async def test_workflow_dispatch_endpoint_arms_output_save_for_turntable( + client, + db, + admin_user, + auth_headers, + tmp_path, + monkeypatch, +): + monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Turntable Output Workflow {uuid.uuid4().hex[:8]}", + config={ + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, + {"id": "output", "step": "output_save", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "turntable"}, + {"from": "turntable", "to": "output"}, + ], + }, + is_active=True, + ) + db.add(workflow_definition) + await db.commit() + await db.refresh(workflow_definition) + + calls: list[tuple[str, list[str], dict]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict): + calls.append((task_name, args, kwargs)) + return type("Result", (), {"id": f"task-{len(calls)}"})() + + context_id = str(order_line.id) + monkeypatch.setattr("app.tasks.celery_app.celery_app.send_task", _fake_send_task) + response = await client.post( + f"/api/workflows/{workflow_definition.id}/dispatch", + params={"context_id": context_id}, + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + + assert body["context_id"] == context_id + assert body["execution_mode"] == "graph" + assert body["dispatched"] == 1 + assert body["task_ids"] == ["task-1"] + assert calls[0][0] == "app.domains.rendering.tasks.render_turntable_task" + assert calls[0][1] == [context_id] + assert calls[0][2]["workflow_run_id"] == body["workflow_run"]["id"] + assert calls[0][2]["workflow_node_id"] == "turntable" + assert calls[0][2]["publish_asset_enabled"] is False + assert calls[0][2]["graph_authoritative_output_enabled"] is True + assert calls[0][2]["graph_output_node_ids"] == ["output"] + assert calls[0][2]["fps"] == 24 + assert calls[0][2]["frame_count"] == 96 + + node_results = {node["node_name"]: node for node in body["workflow_run"]["node_results"]} + assert node_results["turntable"]["status"] == "queued" + assert node_results["turntable"]["output"]["predicted_asset_type"] == "turntable" + assert node_results["turntable"]["output"]["publish_asset_enabled"] is False + assert node_results["turntable"]["output"]["graph_authoritative_output_enabled"] is True + assert node_results["turntable"]["output"]["graph_output_node_ids"] == ["output"] + assert node_results["output"]["status"] == "completed" + assert node_results["output"]["output"]["publication_mode"] == "awaiting_graph_authoritative_save" + + +@pytest.mark.asyncio +async def test_workflow_dispatch_endpoint_arms_notify_handoff_for_render_node( + client, + db, + admin_user, + auth_headers, + tmp_path, + monkeypatch, +): + monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Notify Workflow {uuid.uuid4().hex[:8]}", + config={ + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "render", "step": "blender_still", "params": {}}, + {"id": "notify", "step": "notify", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "render"}, + {"from": "render", "to": "notify"}, + ], + }, + is_active=True, + ) + db.add(workflow_definition) + await db.commit() + await db.refresh(workflow_definition) + + calls: list[tuple[str, list[str], dict]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict): + calls.append((task_name, args, kwargs)) + return type("Result", (), {"id": "task-1"})() + + context_id = str(order_line.id) + monkeypatch.setattr("app.tasks.celery_app.celery_app.send_task", _fake_send_task) + response = await client.post( + f"/api/workflows/{workflow_definition.id}/dispatch", + params={"context_id": context_id}, + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + + assert body["context_id"] == context_id + assert body["execution_mode"] == "graph" + assert body["dispatched"] == 1 + assert body["task_ids"] == ["task-1"] + assert len(calls) == 1 + assert calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" + assert calls[0][1] == [context_id] + assert calls[0][2]["workflow_run_id"] == body["workflow_run"]["id"] + assert calls[0][2]["workflow_node_id"] == "render" + assert calls[0][2]["emit_legacy_notifications"] is True + assert calls[0][2]["graph_notify_node_ids"] == ["notify"] + + node_results = {node["node_name"]: node for node in body["workflow_run"]["node_results"]} + assert node_results["render"]["status"] == "queued" + assert node_results["render"]["output"]["graph_notify_node_ids"] == ["notify"] + assert node_results["notify"]["status"] == "completed" + assert node_results["notify"]["output"]["notification_mode"] == "deferred_to_render_task" + assert node_results["notify"]["output"]["armed_node_ids"] == ["render"] + + +@pytest.mark.asyncio +async def test_workflow_preflight_endpoint_reports_render_graph_readiness( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Preflight Workflow {uuid.uuid4().hex[:8]}", + config=build_preset_workflow_config("still_with_exports", {"width": 640, "height": 640}), + is_active=True, + ) + db.add(workflow_definition) + await db.commit() + await db.refresh(workflow_definition) + + response = await client.get( + f"/api/workflows/{workflow_definition.id}/preflight", + params={"context_id": str(order_line.id)}, + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + node_checks = {node["node_id"]: node for node in body["nodes"]} + + assert body["workflow_id"] == str(workflow_definition.id) + assert body["context_kind"] == "order_line" + assert body["expected_context_kind"] == "order_line" + assert body["execution_mode"] == "legacy" + assert body["graph_dispatch_allowed"] is True + assert body["resolved_order_line_id"] == str(order_line.id) + assert body["resolved_cad_file_id"] == str(order_line.product.cad_file_id) + assert body["unsupported_node_ids"] == [] + assert node_checks["setup"]["status"] == "ready" + assert node_checks["template"]["status"] == "warning" + assert node_checks["template"]["issues"][0]["code"] == "template_missing" + assert node_checks["render"]["status"] == "ready" + assert node_checks["blend"]["status"] == "ready" + + +@pytest.mark.asyncio +async def test_workflow_draft_dispatch_endpoint_dispatches_unsaved_render_graph( + client, + db, + admin_user, + auth_headers, + tmp_path, + monkeypatch, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Draft Dispatch Workflow {uuid.uuid4().hex[:8]}", + config=build_preset_workflow_config("still", {"width": 640, "height": 640}), + is_active=True, + ) + db.add(workflow_definition) + await db.commit() + await db.refresh(workflow_definition) + + calls: list[tuple[str, list[str], dict]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict): + calls.append((task_name, args, kwargs)) + return type("Result", (), {"id": f"draft-task-{len(calls)}"})() + + monkeypatch.setattr("app.tasks.celery_app.celery_app.send_task", _fake_send_task) + response = await client.post( + "/api/workflows/dispatch", + headers=auth_headers, + json={ + "workflow_id": str(workflow_definition.id), + "context_id": str(order_line.id), + "config": { + "version": 1, + "ui": {"preset": "custom", "execution_mode": "graph"}, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}, "ui": {"label": "Setup"}}, + {"id": "template", "step": "resolve_template", "params": {}, "ui": {"label": "Template"}}, + {"id": "render", "step": "blender_still", "params": {"width": 800, "height": 600}, "ui": {"label": "Render"}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + ], + }, + }, + ) + + assert response.status_code == 200 + body = response.json() + node_results = {node["node_name"]: node for node in body["workflow_run"]["node_results"]} + + assert body["context_id"] == str(order_line.id) + assert body["execution_mode"] == "graph" + assert body["dispatched"] == 1 + assert body["task_ids"] == ["draft-task-1"] + assert body["workflow_run"]["workflow_def_id"] == str(workflow_definition.id) + assert body["workflow_run"]["execution_mode"] == "graph" + assert body["workflow_run"]["order_line_id"] == str(order_line.id) + assert [call[0] for call in calls] == ["app.domains.rendering.tasks.render_order_line_still_task"] + assert calls[0][1] == [str(order_line.id)] + assert calls[0][2]["workflow_node_id"] == "render" + assert "workflow_run_id" in calls[0][2] + assert node_results["setup"]["status"] == "completed" + assert node_results["template"]["status"] == "completed" + assert node_results["render"]["status"] == "queued" + + +@pytest.mark.asyncio +async def test_workflow_draft_dispatch_endpoint_marks_submitted_order_processing( + client, + db, + admin_user, + auth_headers, + tmp_path, + monkeypatch, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + order = await db.get(Order, order_line.order_id) + assert order is not None + order.status = OrderStatus.submitted + await db.commit() + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + lambda task_name, args, kwargs: type("Result", (), {"id": "draft-task-1"})(), + ) + response = await client.post( + "/api/workflows/dispatch", + headers=auth_headers, + json={ + "context_id": str(order_line.id), + "config": { + "version": 1, + "ui": {"preset": "custom", "execution_mode": "graph"}, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}, "ui": {"label": "Setup"}}, + {"id": "render", "step": "blender_still", "params": {}, "ui": {"label": "Render"}}, + ], + "edges": [ + {"from": "setup", "to": "render"}, + ], + }, + }, + ) + + assert response.status_code == 200 + await db.refresh(order) + assert order.status == OrderStatus.processing + assert order.processing_started_at is not None + assert order_line.order.completed_at is None + + +@pytest.mark.asyncio +async def test_workflow_draft_dispatch_endpoint_rejects_invalid_graph_config( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + + response = await client.post( + "/api/workflows/dispatch", + headers=auth_headers, + json={ + "context_id": str(order_line.id), + "config": { + "version": 1, + "ui": {"preset": "custom", "execution_mode": "graph"}, + "nodes": [ + {"id": "render", "step": "blender_still", "params": {}}, + ], + "edges": [ + {"from": "missing", "to": "render"}, + ], + }, + }, + ) + + assert response.status_code == 422 + assert "Invalid workflow config" in response.json()["detail"] + + +@pytest.mark.asyncio +async def test_workflow_preflight_endpoint_rejects_context_kind_mismatch( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Mismatch Workflow {uuid.uuid4().hex[:8]}", + config=build_preset_workflow_config("still", {"width": 640, "height": 640}), + is_active=True, + ) + db.add(workflow_definition) + await db.commit() + await db.refresh(workflow_definition) + + response = await client.get( + f"/api/workflows/{workflow_definition.id}/preflight", + params={"context_id": str(order_line.product.cad_file_id)}, + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + + assert body["context_kind"] == "cad_file" + assert body["expected_context_kind"] == "order_line" + assert body["graph_dispatch_allowed"] is False + assert any(issue["code"] == "context_kind_mismatch" for issue in body["issues"]) + assert any(node["status"] == "error" for node in body["nodes"]) + + +@pytest.mark.asyncio +async def test_workflow_preflight_endpoint_supports_direct_cad_file_graphs( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + step_path = tmp_path / "cad-preflight" / "thumb.step" + step_path.parent.mkdir(parents=True, exist_ok=True) + step_path.write_text("STEP", encoding="utf-8") + + cad_file = CadFile( + original_name="thumb.step", + stored_path=str(step_path), + file_hash=f"hash-{uuid.uuid4().hex}", + parsed_objects={"objects": ["Body"]}, + ) + workflow_definition = WorkflowDefinition( + name=f"CAD Workflow {uuid.uuid4().hex[:8]}", + config={ + "version": 1, + "ui": {"preset": "custom", "execution_mode": "graph"}, + "nodes": [ + {"id": "input", "step": "resolve_step_path", "params": {}, "ui": {"label": "Resolve STEP"}}, + {"id": "render", "step": "blender_render", "params": {"width": 512, "height": 512}, "ui": {"label": "Thumbnail"}}, + {"id": "save", "step": "thumbnail_save", "params": {}, "ui": {"label": "Save Thumbnail"}}, + ], + "edges": [ + {"from": "input", "to": "render"}, + {"from": "render", "to": "save"}, + ], + }, + is_active=True, + ) + db.add_all([cad_file, workflow_definition]) + await db.commit() + await db.refresh(workflow_definition) + + response = await client.get( + f"/api/workflows/{workflow_definition.id}/preflight", + params={"context_id": str(cad_file.id)}, + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + + assert body["context_kind"] == "cad_file" + assert body["expected_context_kind"] == "cad_file" + assert body["execution_mode"] == "graph" + assert body["graph_dispatch_allowed"] is True + assert body["resolved_cad_file_id"] == str(cad_file.id) + assert all(node["status"] == "ready" for node in body["nodes"]) + + +@pytest.mark.asyncio +async def test_workflow_draft_preflight_endpoint_validates_unsaved_render_graph( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + + response = await client.post( + "/api/workflows/preflight", + headers=auth_headers, + json={ + "context_id": str(order_line.id), + "config": { + "version": 1, + "ui": {"preset": "custom", "execution_mode": "graph"}, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}, "ui": {"label": "Setup"}}, + {"id": "template", "step": "resolve_template", "params": {}, "ui": {"label": "Template"}}, + {"id": "render", "step": "blender_still", "params": {"width": 640, "height": 640}, "ui": {"label": "Render"}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + ], + }, + }, + ) + + assert response.status_code == 200 + body = response.json() + + assert body["workflow_id"] is None + assert body["context_kind"] == "order_line" + assert body["expected_context_kind"] == "order_line" + assert body["execution_mode"] == "graph" + assert body["graph_dispatch_allowed"] is True + assert body["resolved_order_line_id"] == str(order_line.id) + assert [node["node_id"] for node in body["nodes"]] == ["setup", "template", "render"] + + +@pytest.mark.asyncio +async def test_workflow_draft_preflight_endpoint_reports_context_kind_mismatch( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + + response = await client.post( + "/api/workflows/preflight", + headers=auth_headers, + json={ + "context_id": str(order_line.product.cad_file_id), + "config": build_preset_workflow_config("still", {"width": 640, "height": 640}), + }, + ) + + assert response.status_code == 200 + body = response.json() + + assert body["workflow_id"] is None + assert body["context_kind"] == "cad_file" + assert body["expected_context_kind"] == "order_line" + assert body["graph_dispatch_allowed"] is False + assert any(issue["code"] == "context_kind_mismatch" for issue in body["issues"]) + + +@pytest.mark.asyncio +async def test_workflow_draft_preflight_endpoint_rejects_invalid_graph_config( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + + response = await client.post( + "/api/workflows/preflight", + headers=auth_headers, + json={ + "context_id": str(order_line.id), + "config": { + "version": 1, + "ui": {"preset": "custom", "execution_mode": "graph"}, + "nodes": [ + {"id": "render", "step": "blender_still", "params": {}}, + ], + "edges": [ + {"from": "missing", "to": "render"}, + ], + }, + }, + ) + + assert response.status_code == 422 + assert "Invalid workflow config" in response.json()["detail"] @pytest.mark.asyncio @@ -596,6 +1601,54 @@ async def test_workflow_run_comparison_endpoint_reports_identical_shadow_output( assert body["observer_output"]["image_height"] == 8 +@pytest.mark.asyncio +async def test_workflow_run_comparison_endpoint_reports_metadata_only_difference_as_matched( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_run = WorkflowRun( + order_line_id=order_line.id, + execution_mode="shadow", + status="completed", + ) + db.add(workflow_run) + await db.flush() + + render_dir = tmp_path / "comparison-metadata" / str(order_line.id) + render_dir.mkdir(parents=True, exist_ok=True) + authoritative_path = render_dir / "authoritative.png" + shadow_path = render_dir / f"authoritative_shadow-{str(workflow_run.id)[:8]}.png" + + image = Image.new("RGBA", (8, 8), (0, 128, 255, 255)) + authoritative_meta = PngImagePlugin.PngInfo() + authoritative_meta.add_text("Date", "2026-04-07 10:38:23") + observer_meta = PngImagePlugin.PngInfo() + observer_meta.add_text("Date", "2026-04-07 10:40:45") + image.save(authoritative_path, pnginfo=authoritative_meta) + image.save(shadow_path, pnginfo=observer_meta) + + order_line.result_path = str(authoritative_path) + order_line.render_status = "completed" + await db.commit() + + response = await client.get( + f"/api/workflows/runs/{workflow_run.id}/comparison", + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + assert body["status"] == "matched" + assert body["exact_match"] is False + assert body["dimensions_match"] is True + assert body["mean_pixel_delta"] == 0.0 + assert "metadata differs" in body["summary"] + + @pytest.mark.asyncio async def test_workflow_run_comparison_endpoint_reports_missing_shadow_output( client, @@ -633,3 +1686,46 @@ async def test_workflow_run_comparison_endpoint_reports_missing_shadow_output( assert body["exact_match"] is None assert body["observer_output"]["exists"] is False assert body["authoritative_output"]["exists"] is True + + +@pytest.mark.asyncio +async def test_workflow_run_comparison_endpoint_finds_shadow_output_in_step_files_render_dir( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_run = WorkflowRun( + order_line_id=order_line.id, + execution_mode="shadow", + status="completed", + ) + db.add(workflow_run) + await db.flush() + + render_dir = tmp_path / "comparison-step-files" / str(order_line.id) + render_dir.mkdir(parents=True, exist_ok=True) + authoritative_path = render_dir / "authoritative.png" + Image.new("RGBA", (12, 12), (32, 160, 255, 255)).save(authoritative_path) + + step_shadow_dir = Path("/app/uploads/step_files/renders") + step_shadow_dir.mkdir(parents=True, exist_ok=True) + shadow_path = step_shadow_dir / f"line_{order_line.id}_shadow-{str(workflow_run.id)[:8]}.png" + Image.new("RGBA", (12, 12), (32, 160, 255, 255)).save(shadow_path) + + order_line.result_path = str(authoritative_path) + order_line.render_status = "completed" + await db.commit() + + response = await client.get( + f"/api/workflows/runs/{workflow_run.id}/comparison", + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + assert body["status"] == "matched" + assert body["observer_output"]["exists"] is True + assert body["observer_output"]["path"] == str(shadow_path) diff --git a/scripts/test_render_pipeline.py b/scripts/test_render_pipeline.py index db99ea8..14f180e 100644 --- a/scripts/test_render_pipeline.py +++ b/scripts/test_render_pipeline.py @@ -53,6 +53,9 @@ passed = [] failed = [] warnings = [] +ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA = 0.0 +ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA = 0.02 + # --------------------------------------------------------------------------- # Helpers @@ -83,6 +86,55 @@ def section(title: str): print(f"{BLUE}{'='*60}{RESET}") +def evaluate_rollout_gate_from_comparison(comparison: dict) -> dict: + reasons: list[str] = [] + mean_pixel_delta = comparison.get("mean_pixel_delta") + exact_match = comparison.get("exact_match") + dimensions_match = comparison.get("dimensions_match") + status = comparison.get("status") + + authoritative_exists = bool(comparison.get("authoritative_output", {}).get("exists")) + observer_exists = bool(comparison.get("observer_output", {}).get("exists")) + + if not authoritative_exists: + verdict = "fail" + reasons.append("Authoritative legacy output is missing.") + elif not observer_exists: + verdict = "fail" + reasons.append("Observer workflow output is missing.") + elif exact_match: + verdict = "pass" + reasons.append("Observer output matches the authoritative legacy output byte-for-byte.") + elif dimensions_match is False: + verdict = "fail" + reasons.append("Observer output dimensions differ from the authoritative legacy output.") + elif mean_pixel_delta is None: + verdict = "fail" + reasons.append(f"Workflow comparison did not produce a pixel delta (status={status}).") + elif mean_pixel_delta <= ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA: + verdict = "pass" + reasons.append("Observer output is visually identical within the pass threshold.") + elif mean_pixel_delta <= ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA: + verdict = "warn" + reasons.append("Observer output differs slightly but remains within the warn threshold.") + else: + verdict = "fail" + reasons.append("Observer output exceeds the rollout parity threshold.") + + if mean_pixel_delta is not None and not exact_match: + reasons.append( + f"Mean pixel delta {mean_pixel_delta:.6f}; " + f"pass<={ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA:.6f}, " + f"warn<={ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA:.6f}." + ) + + return { + "verdict": verdict, + "ready": verdict == "pass", + "reasons": reasons, + } + + class APIClient: def __init__(self, host: str, email: str, password: str): self.host = host.rstrip("/") @@ -93,7 +145,7 @@ class APIClient: def _login(self, email: str, password: str): resp = self.session.post( f"{self.host}/api/auth/login", - data={"username": email, "password": password}, + json={"email": email, "password": password}, ) resp.raise_for_status() data = resp.json() @@ -110,6 +162,44 @@ class APIClient: return self.session.delete(f"{self.host}/api{path}", **kwargs) +def build_graph_still_config() -> dict: + return { + "version": 1, + "ui": {"preset": "still_graph", "execution_mode": "graph"}, + "nodes": [ + { + "id": "setup", + "step": "order_line_setup", + "params": {}, + "ui": {"label": "Order Line Setup", "position": {"x": 0, "y": 100}}, + }, + { + "id": "template", + "step": "resolve_template", + "params": {}, + "ui": {"label": "Resolve Template", "position": {"x": 220, "y": 100}}, + }, + { + "id": "render", + "step": "blender_still", + "params": {}, + "ui": {"type": "renderNode", "label": "Still Render", "position": {"x": 440, "y": 100}}, + }, + { + "id": "output", + "step": "output_save", + "params": {}, + "ui": {"type": "outputNode", "label": "Save Output", "position": {"x": 660, "y": 100}}, + }, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + {"from": "render", "to": "output"}, + ], + } + + # --------------------------------------------------------------------------- # Test: Render health endpoint # --------------------------------------------------------------------------- @@ -174,24 +264,34 @@ def test_step_upload(client: APIClient, step_file: Path) -> str | None: cad_file_id = data["cad_file_id"] ok(f"STEP uploaded → cad_file_id={cad_file_id[:8]}... status={data.get('status')}") - # Poll for completed processing + # Poll the existing CAD endpoints. There is no GET /api/cad/{id}; the most + # reliable readiness signal is /objects returning 200 with processing_status. info(f"Waiting for CAD processing (timeout={CAD_PROCESSING_TIMEOUT}s)...") deadline = time.time() + CAD_PROCESSING_TIMEOUT last_status = None while time.time() < deadline: - resp = client.get(f"/cad/{cad_file_id}") - if resp.status_code == 200: - cad = resp.json() + resp_objects = client.get(f"/cad/{cad_file_id}/objects") + if resp_objects.status_code == 200: + cad = resp_objects.json() status = cad.get("processing_status") if status != last_status: info(f" CAD status: {status}") last_status = status if status == "completed": - ok(f"CAD processing completed (thumbnail rendered)") + ok("CAD processing completed (parsed objects available)") return cad_file_id if status == "failed": fail(f"CAD processing FAILED: {cad.get('error_message', 'unknown error')}") return None + + resp_thumb = client.get(f"/cad/{cad_file_id}/thumbnail") + if resp_thumb.status_code == 200: + if last_status != "completed": + info(" CAD status: completed") + last_status = "completed" + ok("CAD processing completed (thumbnail available)") + return cad_file_id + time.sleep(POLL_INTERVAL_SECONDS) fail(f"CAD processing timed out after {CAD_PROCESSING_TIMEOUT}s (last status: {last_status})") @@ -207,17 +307,14 @@ def test_order_render( cad_file_id: str, output_type_ids: list[str], test_label: str, + *, + use_graph_dispatch: bool = False, ) -> bool: """Create a minimal order, submit, dispatch renders, wait for completion.""" section(f"3. Order Render — {test_label}") info(f"Output types: {len(output_type_ids)}") # Get a product that uses this CAD file - resp = client.get(f"/cad/{cad_file_id}") - if resp.status_code != 200: - fail(f"CAD file lookup failed: {resp.status_code}") - return False - # Find or create a product linked to this CAD file product_id = None resp_products = client.get("/products/?limit=100") @@ -245,46 +342,41 @@ def test_order_render( product_id = resp_create.json()["id"] ok(f"Created test product: {product_id[:8]}...") - # Build output_type_selections for one product - ot_selections = [{"product_id": product_id, "output_type_id": ot_id} for ot_id in output_type_ids] - - # Create order via wizard endpoint - resp_order = client.post("/orders/product-order", json={ - "product_id": product_id, - "output_type_selections": [ - {"output_type_id": ot_id} - for ot_id in output_type_ids - ], - }) + resp_order = client.post( + "/orders", + json={ + "notes": f"Render pipeline integration test: {test_label}", + "items": [], + "lines": [ + {"product_id": product_id, "output_type_id": ot_id} + for ot_id in output_type_ids + ], + }, + ) if resp_order.status_code not in (200, 201): - # Fallback: try to find existing submitted order - warn(f"Product order wizard not available ({resp_order.status_code}), looking for existing order lines...") - return _test_existing_renders(client, product_id, output_type_ids) + fail(f"Order creation failed: {resp_order.status_code} {resp_order.text[:300]}") + return False order = resp_order.json() order_id = order["id"] ok(f"Order created: {order.get('order_number')} (id={order_id[:8]}...)") - return _submit_and_wait(client, order_id, output_type_ids) + return _submit_and_wait( + client, + order, + output_type_ids, + use_graph_dispatch=use_graph_dispatch, + ) -def _test_existing_renders(client: APIClient, product_id: str, output_type_ids: list[str]) -> bool: - """Find existing order lines for a product and wait for completion.""" - resp = client.get(f"/orders/?limit=20") - if resp.status_code != 200: - fail("Could not list orders") - return False - orders = resp.json() - if isinstance(orders, dict): - orders = orders.get("items", []) - for order in orders: - if order.get("status") in ("submitted", "processing", "rendering"): - return _submit_and_wait(client, order["id"], output_type_ids) - warn("No suitable existing orders found for render test") - return True # non-blocking warning - - -def _submit_and_wait(client: APIClient, order_id: str, output_type_ids: list[str]) -> bool: +def _submit_and_wait( + client: APIClient, + order: dict, + output_type_ids: list[str], + *, + use_graph_dispatch: bool = False, +) -> bool: + order_id = order["id"] # Submit resp_sub = client.post(f"/orders/{order_id}/submit") if resp_sub.status_code not in (200, 201, 204): @@ -296,14 +388,34 @@ def _submit_and_wait(client: APIClient, order_id: str, output_type_ids: list[str else: ok("Order submitted") - # Dispatch renders - resp_disp = client.post(f"/orders/{order_id}/dispatch-renders") - if resp_disp.status_code not in (200, 201, 204): - fail(f"Dispatch renders failed: {resp_disp.status_code} {resp_disp.text[:200]}") - return False - dispatch_data = resp_disp.json() if resp_disp.content else {} - dispatched = dispatch_data.get("dispatched", "?") - ok(f"Renders dispatched ({dispatched} lines)") + dispatch_run_id = None + if use_graph_dispatch: + lines = order.get("lines", []) + if len(lines) != 1: + fail("Graph mode currently expects exactly one order line per test order") + return False + line_id = lines[0]["id"] + resp_disp = client.post( + "/workflows/dispatch", + json={ + "context_id": line_id, + "config": build_graph_still_config(), + }, + ) + if resp_disp.status_code not in (200, 201): + fail(f"Workflow draft dispatch failed: {resp_disp.status_code} {resp_disp.text[:300]}") + return False + dispatch_data = resp_disp.json() + dispatch_run_id = dispatch_data["workflow_run"]["id"] + ok(f"Graph workflow dispatched (run={dispatch_run_id[:8]}..., tasks={dispatch_data.get('dispatched', '?')})") + else: + resp_disp = client.post(f"/orders/{order_id}/dispatch-renders") + if resp_disp.status_code not in (200, 201, 204): + fail(f"Dispatch renders failed: {resp_disp.status_code} {resp_disp.text[:200]}") + return False + dispatch_data = resp_disp.json() if resp_disp.content else {} + dispatched = dispatch_data.get("dispatched", "?") + ok(f"Renders dispatched ({dispatched} lines)") # Poll for order completion info(f"Waiting for renders to complete (timeout={RENDER_TIMEOUT_SECONDS}s per OT)...") @@ -323,10 +435,20 @@ def _submit_and_wait(client: APIClient, order_id: str, output_type_ids: list[str info(f" {summary}") last_summary = summary - if order_status == "completed": - ok(f"Order completed — all {len(lines)} render(s) done") - # Check individual line results - all_success = True + terminal_states = {"completed", "failed", "cancelled"} + line_states = [state for state in statuses if state] + if line_states and all(state in terminal_states for state in line_states): + all_success = all(state == "completed" for state in line_states) + if order_status == "completed": + ok(f"Order completed — all {len(lines)} render(s) done") + elif all_success: + ok( + f"All {len(lines)} render line(s) completed " + f"(order status remains {order_status})" + ) + else: + fail(f"Order reached terminal line states with order={order_status}") + for line in lines: rs = line.get("render_status") ot_name = line.get("output_type_name") or line.get("output_type", {}).get("name", "?") @@ -334,13 +456,30 @@ def _submit_and_wait(client: APIClient, order_id: str, output_type_ids: list[str ok(f" Line [{ot_name}]: completed") elif rs == "failed": fail(f" Line [{ot_name}]: FAILED") - all_success = False else: warn(f" Line [{ot_name}]: {rs}") + + if all_success and dispatch_run_id: + resp_cmp = client.get(f"/workflows/runs/{dispatch_run_id}/comparison") + if resp_cmp.status_code == 200: + comparison = resp_cmp.json() + rollout_gate = evaluate_rollout_gate_from_comparison(comparison) + verdict = rollout_gate["verdict"] + if verdict == "pass": + ok(" Rollout gate PASS — graph output is ready for workflow-first rollout") + elif verdict == "warn": + warn(" Rollout gate WARN — keep legacy authoritative and review drift") + else: + warn(" Rollout gate FAIL — keep legacy authoritative") + info(f" Comparison status: {comparison.get('status')}, verdict={verdict}") + for reason in rollout_gate["reasons"]: + info(f" {reason}") + else: + warn(f" Comparison lookup failed: {resp_cmp.status_code}") return all_success if order_status == "failed": - fail(f"Order FAILED — check render logs") + fail("Order FAILED — check render logs") return False time.sleep(POLL_INTERVAL_SECONDS) @@ -377,6 +516,7 @@ def main(): parser.add_argument("--health", action="store_true", help="Only run health check") parser.add_argument("--sample", action="store_true", help="Quick sample test (1 STEP, 1 OT)") parser.add_argument("--full", action="store_true", help="Full test (all output types)") + parser.add_argument("--graph", action="store_true", help="Dispatch sample/full renders via /api/workflows/dispatch") parser.add_argument("--step", default=str(SAMPLE_STEP), help="Path to STEP file") args = parser.parse_args() @@ -431,7 +571,13 @@ def main(): output_types[0], ) info(f"Sample test using output type: {ot['name']}") - test_order_render(client, cad_file_id, [ot["id"]], f"Sample [{ot['name']}]") + test_order_render( + client, + cad_file_id, + [ot["id"]], + f"Sample [{ot['name']}]", + use_graph_dispatch=args.graph, + ) elif args.full: # Test each output type individually @@ -439,7 +585,13 @@ def main(): if ot.get("is_animation"): warn(f"Skipping animation output type: {ot['name']} (too slow for full test)") continue - test_order_render(client, cad_file_id, [ot["id"]], ot["name"]) + test_order_render( + client, + cad_file_id, + [ot["id"]], + ot["name"], + use_graph_dispatch=args.graph, + ) _print_summary() sys.exit(0 if not failed else 1)