From f43f1e74202d7582d198c87f5bba18fa64a9a5d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Tue, 7 Apr 2026 11:35:32 +0200 Subject: [PATCH] feat: add duplicate-safe workflow shadow dispatch --- .../064_workflow_run_execution_mode.py | 36 ++++ .../app/domains/rendering/dispatch_service.py | 123 ++++++++---- backend/app/domains/rendering/models.py | 1 + backend/app/domains/rendering/schemas.py | 1 + backend/app/domains/rendering/tasks.py | 181 ++++++++++++++---- .../rendering/workflow_graph_runtime.py | 45 ++++- .../domains/rendering/workflow_run_service.py | 2 + .../rendering/workflow_runtime_services.py | 56 +++--- .../domains/test_workflow_dispatch_service.py | 158 +++++++++++++-- docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md | 4 +- .../WORKFLOW_IMPLEMENTATION_BACKLOG.md | 2 +- 11 files changed, 496 insertions(+), 113 deletions(-) create mode 100644 backend/alembic/versions/064_workflow_run_execution_mode.py diff --git a/backend/alembic/versions/064_workflow_run_execution_mode.py b/backend/alembic/versions/064_workflow_run_execution_mode.py new file mode 100644 index 0000000..8e8a089 --- /dev/null +++ b/backend/alembic/versions/064_workflow_run_execution_mode.py @@ -0,0 +1,36 @@ +"""Add execution_mode to workflow_runs. + +Revision ID: 064 +Revises: 063 +""" +from alembic import op +import sqlalchemy as sa + +revision = "064" +down_revision = "063" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "workflow_runs", + sa.Column( + "execution_mode", + sa.String(length=20), + nullable=False, + server_default="legacy", + ), + ) + op.execute( + """ + UPDATE workflow_runs + SET execution_mode = 'legacy' + WHERE execution_mode IS NULL OR execution_mode = '' + """ + ) + op.alter_column("workflow_runs", "execution_mode", server_default=None) + + +def downgrade() -> None: + op.drop_column("workflow_runs", "execution_mode") diff --git a/backend/app/domains/rendering/dispatch_service.py b/backend/app/domains/rendering/dispatch_service.py index 2b1fe48..65103ed 100644 --- a/backend/app/domains/rendering/dispatch_service.py +++ b/backend/app/domains/rendering/dispatch_service.py @@ -88,13 +88,32 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: execution_mode = get_workflow_execution_mode(wf_def.config, default="legacy") + def _prepare_graph_context(target_mode: str): + workflow_context = prepare_workflow_context( + wf_def.config, + context_id=order_line_id, + execution_mode=target_mode, + ) + unsupported_nodes = find_unsupported_graph_nodes(workflow_context) + if unsupported_nodes: + raise ValueError( + f"graph-unsupported nodes present: {', '.join(unsupported_nodes)}" + ) + return workflow_context + + def _create_graph_run(workflow_context): + run = create_workflow_run( + session, + workflow_def_id=wf_def.id, + order_line_id=line.id, + workflow_context=workflow_context, + ) + session.commit() + return run + if execution_mode == "graph": try: - workflow_context = prepare_workflow_context( - wf_def.config, - context_id=order_line_id, - execution_mode="graph", - ) + workflow_context = _prepare_graph_context("graph") except Exception as exc: logger.warning( "order_line %s: workflow_definition_id %s failed graph runtime preparation (%s), " @@ -105,26 +124,9 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: ) return _legacy_dispatch(order_line_id) - unsupported_nodes = find_unsupported_graph_nodes(workflow_context) - if unsupported_nodes: - logger.warning( - "order_line %s: workflow_definition_id %s contains graph-unsupported nodes %s, " - "falling back to legacy dispatch", - order_line_id, - wf_def.id, - unsupported_nodes, - ) - return _legacy_dispatch(order_line_id) - run = None try: - run = create_workflow_run( - session, - workflow_def_id=wf_def.id, - order_line_id=line.id, - workflow_context=workflow_context, - ) - session.commit() + run = _create_graph_run(workflow_context) except Exception as exc: session.rollback() logger.warning( @@ -140,9 +142,10 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: dispatch_result = execute_graph_workflow(session, workflow_context) session.commit() except Exception as exc: - if run is not None: - mark_workflow_run_failed(run, str(exc)) - session.commit() + session.rollback() + session.add(run) + mark_workflow_run_failed(run, str(exc)) + session.commit() logger.exception( "order_line %s: graph workflow execution via definition %s failed, falling back to legacy dispatch", order_line_id, @@ -150,8 +153,7 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: ) fallback_result = _legacy_dispatch(order_line_id) fallback_result["fallback_from"] = "workflow_graph" - if run is not None: - fallback_result["workflow_run_id"] = str(run.id) + fallback_result["workflow_run_id"] = str(run.id) return fallback_result return { @@ -163,13 +165,64 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: } if execution_mode == "shadow": - logger.warning( - "order_line %s: workflow_definition_id %s requested shadow mode, " - "falling back to legacy dispatch until duplicate-safe shadow execution exists", - order_line_id, - wf_def.id, - ) - return _legacy_dispatch(order_line_id) + legacy_result = _legacy_dispatch(order_line_id) + + try: + workflow_context = _prepare_graph_context("shadow") + except Exception as exc: + logger.warning( + "order_line %s: shadow graph preparation for workflow_definition_id %s failed (%s), " + "continuing with authoritative legacy dispatch only", + order_line_id, + wf_def.id, + exc, + ) + legacy_result["execution_mode"] = "shadow" + legacy_result["shadow_status"] = "skipped" + legacy_result["shadow_error"] = str(exc) + return legacy_result + + run = None + try: + run = _create_graph_run(workflow_context) + except Exception as exc: + session.rollback() + logger.warning( + "order_line %s: failed to create shadow workflow run for workflow_definition_id %s (%s); " + "legacy dispatch remains authoritative", + order_line_id, + wf_def.id, + exc, + ) + legacy_result["execution_mode"] = "shadow" + legacy_result["shadow_status"] = "failed" + legacy_result["shadow_error"] = str(exc) + return legacy_result + + try: + dispatch_result = execute_graph_workflow(session, workflow_context) + session.commit() + except Exception as exc: + session.rollback() + session.add(run) + mark_workflow_run_failed(run, str(exc)) + session.commit() + logger.exception( + "order_line %s: shadow workflow execution via definition %s failed; legacy dispatch remains authoritative", + order_line_id, + wf_def.id, + ) + legacy_result["execution_mode"] = "shadow" + legacy_result["shadow_status"] = "failed" + legacy_result["shadow_error"] = str(exc) + legacy_result["shadow_workflow_run_id"] = str(run.id) + return legacy_result + + legacy_result["execution_mode"] = "shadow" + legacy_result["shadow_status"] = "dispatched" + legacy_result["shadow_workflow_run_id"] = str(run.id) + legacy_result["shadow_task_ids"] = dispatch_result.task_ids + return legacy_result workflow_type, params = extract_runtime_workflow(wf_def.config) if workflow_type is None or workflow_type == "custom": diff --git a/backend/app/domains/rendering/models.py b/backend/app/domains/rendering/models.py index 155f926..d791611 100644 --- a/backend/app/domains/rendering/models.py +++ b/backend/app/domains/rendering/models.py @@ -155,6 +155,7 @@ class WorkflowRun(Base): UUID(as_uuid=True), ForeignKey("order_lines.id", ondelete="CASCADE"), nullable=True, index=True ) celery_task_id: Mapped[str | None] = mapped_column(String(500), nullable=True) + execution_mode: Mapped[str] = mapped_column(String(20), nullable=False, default="legacy", server_default="legacy") status: Mapped[str] = mapped_column(String(50), nullable=False, default="pending") started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) diff --git a/backend/app/domains/rendering/schemas.py b/backend/app/domains/rendering/schemas.py index 5f9e98d..cbae437 100644 --- a/backend/app/domains/rendering/schemas.py +++ b/backend/app/domains/rendering/schemas.py @@ -180,6 +180,7 @@ class WorkflowRunOut(BaseModel): workflow_def_id: uuid.UUID | None order_line_id: uuid.UUID | None celery_task_id: str | None + execution_mode: str status: str started_at: datetime | None completed_at: datetime | None diff --git a/backend/app/domains/rendering/tasks.py b/backend/app/domains/rendering/tasks.py index c84a5e7..fd82255 100644 --- a/backend/app/domains/rendering/tasks.py +++ b/backend/app/domains/rendering/tasks.py @@ -15,30 +15,86 @@ from app.core.task_logs import log_task_event logger = logging.getLogger(__name__) -def _update_workflow_run_status(order_line_id: str, status: str, error: str | None = None) -> None: - """Update the most recent WorkflowRun for an order_line after task completion.""" +def _update_workflow_run_status( + order_line_id: str, + status: str, + error: str | None = None, + *, + workflow_run_id: str | None = None, + workflow_node_id: str | None = None, +) -> None: + """Update WorkflowRun / WorkflowNodeResult state after task completion.""" try: import asyncio + import uuid from datetime import datetime as _dt async def _run(): from app.database import AsyncSessionLocal - from app.domains.rendering.models import WorkflowRun + from app.domains.rendering.models import WorkflowNodeResult, WorkflowRun from sqlalchemy import select as _sel + async with AsyncSessionLocal() as db: - res = await db.execute( - _sel(WorkflowRun) - .where(WorkflowRun.order_line_id == order_line_id) - .order_by(WorkflowRun.created_at.desc()) - .limit(1) + run = None + if workflow_run_id: + try: + resolved_run_id = uuid.UUID(str(workflow_run_id)) + except (TypeError, ValueError): + resolved_run_id = workflow_run_id + run_res = await db.execute( + _sel(WorkflowRun).where(WorkflowRun.id == resolved_run_id) + ) + run = run_res.scalar_one_or_none() + else: + res = await db.execute( + _sel(WorkflowRun) + .where(WorkflowRun.order_line_id == order_line_id) + .order_by(WorkflowRun.created_at.desc()) + .limit(1) + ) + run = res.scalar_one_or_none() + + if run is None: + return + + if workflow_node_id: + node_res = await db.execute( + _sel(WorkflowNodeResult).where( + WorkflowNodeResult.run_id == run.id, + WorkflowNodeResult.node_name == workflow_node_id, + ) + ) + node_result = node_res.scalar_one_or_none() + if node_result is not None: + metadata = dict(node_result.output or {}) + if error: + metadata["last_error"] = error[:2000] + node_result.status = status + node_result.log = error[:2000] if error else None + node_result.output = metadata + + node_results_res = await db.execute( + _sel(WorkflowNodeResult).where(WorkflowNodeResult.run_id == run.id) ) - run = res.scalar_one_or_none() - if run and run.status == "pending": - run.status = status + node_results = list(node_results_res.scalars().all()) + + if any(node.status == "failed" for node in node_results): + run.status = "failed" run.completed_at = _dt.utcnow() if error: run.error_message = error[:2000] - await db.commit() + elif any(node.status in {"pending", "queued", "running", "retrying"} for node in node_results): + run.status = "pending" + run.completed_at = None + if status != "failed": + run.error_message = None + else: + run.status = status + run.completed_at = _dt.utcnow() + if status != "failed": + run.error_message = None + + await db.commit() asyncio.get_event_loop().run_until_complete(_run()) except Exception as _exc: @@ -315,6 +371,7 @@ def publish_asset( asset_type: str, storage_key: str, render_config: dict | None = None, + workflow_run_id: str | None = None, ) -> str | None: """Create a MediaAsset record after a successful render.""" import asyncio @@ -345,6 +402,7 @@ def publish_asset( order_line_id=line.id, product_id=line.product_id, cad_file_id=cad_file_id, + workflow_run_id=workflow_run_id, asset_type=MediaAssetType(asset_type), storage_key=storage_key, render_config=render_config, @@ -404,6 +462,13 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict: from app.domains.rendering.job_document import RenderJobDocument, JobState from app.core.process_steps import StepName + workflow_run_id = params.pop("workflow_run_id", None) + workflow_node_id = params.pop("workflow_node_id", None) + publish_asset_enabled = bool(params.pop("publish_asset_enabled", True)) + emit_events = bool(params.pop("emit_events", True)) + job_document_enabled = bool(params.pop("job_document_enabled", True)) + output_name_suffix = params.pop("output_name_suffix", None) + log_task_event(self.request.id, f"Starting render_order_line_still_task: order_line={order_line_id}", "info") # Initialise job document and store real Celery task ID @@ -422,6 +487,8 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict: .values(render_job_doc=job_doc.to_dict()) ) await db.commit() + if not job_document_enabled: + return try: asyncio.get_event_loop().run_until_complete(_run()) except Exception as _exc: @@ -445,7 +512,10 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict: step = Path(step_path_str) output_dir = step.parent / "renders" output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / f"line_{order_line_id}.png" + output_filename = f"line_{order_line_id}.png" + if output_name_suffix: + output_filename = f"line_{order_line_id}_{output_name_suffix}.png" + output_path = output_dir / output_filename try: job_doc.begin_step(StepName.BLENDER_STILL) @@ -466,12 +536,14 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict: }) _save_job_doc() - publish_asset.delay( - order_line_id, - "still", - str(output_path), - render_config=result, - ) + if publish_asset_enabled: + publish_asset.delay( + order_line_id, + "still", + str(output_path), + render_config=result, + workflow_run_id=workflow_run_id, + ) log_task_event(self.request.id, f"Completed successfully in {result.get('total_duration_s', 0):.1f}s", "done") logger.info( "render_order_line_still_task completed for line %s in %.1fs", @@ -479,13 +551,19 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict: ) try: from app.core.websocket import publish_event_sync - publish_event_sync(None, { - "type": "render.order_line.completed", - "order_line_id": order_line_id, - }) + if emit_events: + publish_event_sync(None, { + "type": "render.order_line.completed", + "order_line_id": order_line_id, + }) except Exception: pass - _update_workflow_run_status(order_line_id, "completed") + _update_workflow_run_status( + order_line_id, + "completed", + workflow_run_id=workflow_run_id, + workflow_node_id=workflow_node_id, + ) return result except Exception as exc: job_doc.fail_step(StepName.BLENDER_STILL, str(exc)) @@ -495,14 +573,21 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict: logger.error("render_order_line_still_task failed for %s: %s", order_line_id, exc) try: from app.core.websocket import publish_event_sync - publish_event_sync(None, { - "type": "render.order_line.failed", - "order_line_id": order_line_id, - "error": str(exc), - }) + if emit_events: + publish_event_sync(None, { + "type": "render.order_line.failed", + "order_line_id": order_line_id, + "error": str(exc), + }) except Exception: pass - _update_workflow_run_status(order_line_id, "failed", str(exc)) + _update_workflow_run_status( + order_line_id, + "failed", + str(exc), + workflow_run_id=workflow_run_id, + workflow_node_id=workflow_node_id, + ) raise self.retry(exc=exc, countdown=30) @@ -512,7 +597,15 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict: queue="asset_pipeline", max_retries=1, ) -def export_blend_for_order_line_task(self, order_line_id: str) -> dict: +def export_blend_for_order_line_task( + self, + order_line_id: str, + workflow_run_id: str | None = None, + workflow_node_id: str | None = None, + publish_asset_enabled: bool = True, + output_name_suffix: str | None = None, + **_kwargs, +) -> dict: """Export a production .blend file via Blender + asset library (export_blend.py). Publishes a MediaAsset with asset_type='blend_production'. @@ -542,7 +635,10 @@ def export_blend_for_order_line_task(self, order_line_id: str) -> dict: if occ_res.returncode != 0: raise RuntimeError(f"GLB generation failed:\n{occ_res.stderr[-500:]}") - output_path = step.parent / f"{step.stem}_production.blend" + output_name = f"{step.stem}_production.blend" + if output_name_suffix: + output_name = f"{step.stem}_production_{output_name_suffix}.blend" + output_path = step.parent / output_name scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) export_script = scripts_dir / "export_blend.py" @@ -589,11 +685,30 @@ def export_blend_for_order_line_task(self, order_line_id: str) -> dict: raise RuntimeError( f"export_blend.py exited {result.returncode}:\n{result.stderr[-500:]}" ) - publish_asset.delay(order_line_id, "blend_production", str(output_path)) + if publish_asset_enabled: + publish_asset.delay( + order_line_id, + "blend_production", + str(output_path), + workflow_run_id=workflow_run_id, + ) logger.info("export_blend_for_order_line_task completed: %s", output_path.name) + _update_workflow_run_status( + order_line_id, + "completed", + workflow_run_id=workflow_run_id, + workflow_node_id=workflow_node_id, + ) return {"blend_path": str(output_path)} except Exception as exc: logger.error("export_blend_for_order_line_task failed for %s: %s", order_line_id, exc) + _update_workflow_run_status( + order_line_id, + "failed", + str(exc), + workflow_run_id=workflow_run_id, + workflow_node_id=workflow_node_id, + ) raise self.retry(exc=exc, countdown=30) diff --git a/backend/app/domains/rendering/workflow_graph_runtime.py b/backend/app/domains/rendering/workflow_graph_runtime.py index 80c43e1..438445d 100644 --- a/backend/app/domains/rendering/workflow_graph_runtime.py +++ b/backend/app/domains/rendering/workflow_graph_runtime.py @@ -208,12 +208,26 @@ def execute_graph_workflow( from app.tasks.celery_app import celery_app - result = celery_app.send_task(task_name, args=[workflow_context.context_id], kwargs=node.params) + task_kwargs = dict(node.params) + task_kwargs["workflow_run_id"] = str(workflow_context.workflow_run_id) + task_kwargs["workflow_node_id"] = node.id + if workflow_context.execution_mode == "shadow": + task_kwargs["publish_asset_enabled"] = False + task_kwargs["emit_events"] = False + task_kwargs["job_document_enabled"] = False + task_kwargs["output_name_suffix"] = f"shadow-{str(workflow_context.workflow_run_id)[:8]}" + + result = celery_app.send_task( + task_name, + args=[workflow_context.context_id], + kwargs=task_kwargs, + ) metadata["task_id"] = result.id if definition is not None: metadata["execution_kind"] = definition.execution_kind metadata["attempt_count"] = 1 metadata["max_attempts"] = retry_policy["max_attempts"] + metadata["execution_mode"] = workflow_context.execution_mode node_result.status = "queued" node_result.output = metadata node_result.log = None @@ -371,9 +385,18 @@ def _execute_order_line_setup( node_params: dict[str, Any], ) -> tuple[dict[str, Any], str, str | None]: del node_params - setup = prepare_order_line_render_context(session, workflow_context.context_id) + shadow_mode = workflow_context.execution_mode == "shadow" + if shadow_mode: + setup = prepare_order_line_render_context( + session, + workflow_context.context_id, + persist_state=False, + ) + else: + setup = prepare_order_line_render_context(session, workflow_context.context_id) state.setup = setup payload = _serialize_setup_result(setup) + payload["shadow_mode"] = shadow_mode if setup.status == "ready": return payload, "completed", None if setup.status == "skip": @@ -436,17 +459,27 @@ def _execute_auto_populate_materials( state: WorkflowGraphState, node_params: dict[str, Any], ) -> tuple[dict[str, Any], str, str | None]: - del workflow_context, node_params + del node_params if state.setup is None or state.setup.cad_file is None: if state.setup is not None and state.setup.status == "skip": return _serialize_setup_result(state.setup), "skipped", state.setup.reason raise WorkflowGraphRuntimeError("auto_populate_materials requires a resolved cad_file") - result = auto_populate_materials_for_cad(session, str(state.setup.cad_file.id)) + shadow_mode = workflow_context.execution_mode == "shadow" + if shadow_mode: + result = auto_populate_materials_for_cad( + session, + str(state.setup.cad_file.id), + persist_updates=False, + ) + else: + result = auto_populate_materials_for_cad(session, str(state.setup.cad_file.id)) state.auto_populate = result - if state.setup.order_line is not None and state.setup.order_line.product is not None: + if not shadow_mode and state.setup.order_line is not None and state.setup.order_line.product is not None: session.refresh(state.setup.order_line.product) state.setup.materials_source = state.setup.order_line.product.cad_part_materials or [] - return _serialize_auto_populate_result(result), "completed", None + payload = _serialize_auto_populate_result(result) + payload["shadow_mode"] = shadow_mode + return payload, "completed", None def _execute_glb_bbox( diff --git a/backend/app/domains/rendering/workflow_run_service.py b/backend/app/domains/rendering/workflow_run_service.py index d44e97f..278b1d8 100644 --- a/backend/app/domains/rendering/workflow_run_service.py +++ b/backend/app/domains/rendering/workflow_run_service.py @@ -14,10 +14,12 @@ def create_workflow_run( workflow_def_id, order_line_id, workflow_context: WorkflowContext, + execution_mode: str | None = None, ) -> WorkflowRun: run = WorkflowRun( workflow_def_id=workflow_def_id, order_line_id=order_line_id, + execution_mode=execution_mode or workflow_context.execution_mode, status="pending", started_at=datetime.utcnow(), ) diff --git a/backend/app/domains/rendering/workflow_runtime_services.py b/backend/app/domains/rendering/workflow_runtime_services.py index 33e7f29..37b16ea 100644 --- a/backend/app/domains/rendering/workflow_runtime_services.py +++ b/backend/app/domains/rendering/workflow_runtime_services.py @@ -389,6 +389,7 @@ def prepare_order_line_render_context( order_line_id: str, *, emit: EmitFn = None, + persist_state: bool = True, ) -> OrderLineRenderSetupResult: """Load and validate the order line, then prepare reusable render inputs.""" _emit(emit, order_line_id, "Loading order line from database") @@ -421,7 +422,7 @@ def prepare_order_line_render_context( if order and order.status in (OrderStatus.rejected, OrderStatus.completed): _emit(emit, order_line_id, f"Order {order.status.value} — skipping render") logger.info("OrderLine %s: order %s — skipping", order_line_id, order.status.value) - if line.render_status in ("pending", "processing"): + if persist_state and line.render_status in ("pending", "processing"): session.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) @@ -438,12 +439,13 @@ def prepare_order_line_render_context( if line.product is None or line.product.cad_file_id is None or line.product.cad_file is None: _emit(emit, order_line_id, "Product has no CAD file — marking as failed", "error") logger.warning("OrderLine %s: product has no CAD file", order_line_id) - session.execute( - sql_update(OrderLine) - .where(OrderLine.id == line.id) - .values(render_status="failed") - ) - session.commit() + if persist_state: + session.execute( + sql_update(OrderLine) + .where(OrderLine.id == line.id) + .values(render_status="failed") + ) + session.commit() return OrderLineRenderSetupResult( status="failed", order_line=line, @@ -451,17 +453,18 @@ def prepare_order_line_render_context( reason="missing_cad_file", ) - render_start = datetime.utcnow() - session.execute( - sql_update(OrderLine) - .where(OrderLine.id == line.id) - .values( - render_status="processing", - render_backend_used="celery", - render_started_at=render_start, + render_start = datetime.utcnow() if persist_state else None + if persist_state: + session.execute( + sql_update(OrderLine) + .where(OrderLine.id == line.id) + .values( + render_status="processing", + render_backend_used="celery", + render_started_at=render_start, + ) ) - ) - session.commit() + session.commit() cad_file = line.product.cad_file materials_source = line.product.cad_part_materials or [] @@ -665,6 +668,7 @@ def auto_populate_materials_for_cad( cad_file_id: str, *, enqueue_thumbnail: QueueThumbnailFn = None, + persist_updates: bool = True, ) -> AutoPopulateMaterialsResult: """Auto-fill empty CAD material mappings from Excel component data. @@ -708,13 +712,14 @@ def auto_populate_materials_for_cad( continue new_materials = build_materials_from_excel(cad_parts, excel_components) - session.execute( - sql_update(Product) - .where(Product.id == product.id) - .values(cad_part_materials=new_materials) - ) - session.flush() updated_product_ids.append(str(product.id)) + if persist_updates: + session.execute( + sql_update(Product) + .where(Product.id == product.id) + .values(cad_part_materials=new_materials) + ) + session.flush() try: final_part_colors = build_part_colors(cad_parts, new_materials) @@ -728,10 +733,11 @@ def auto_populate_materials_for_cad( len(excel_components), ) - session.commit() + if persist_updates: + session.commit() queued_thumbnail_regeneration = False - if final_part_colors is not None: + if persist_updates and final_part_colors is not None: if enqueue_thumbnail is None: from app.domains.pipeline.tasks.render_thumbnail import regenerate_thumbnail diff --git a/backend/tests/domains/test_workflow_dispatch_service.py b/backend/tests/domains/test_workflow_dispatch_service.py index c086722..4aa1cb3 100644 --- a/backend/tests/domains/test_workflow_dispatch_service.py +++ b/backend/tests/domains/test_workflow_dispatch_service.py @@ -170,6 +170,7 @@ async def test_dispatch_render_with_workflow_creates_run_and_node_results_for_pr assert result["backend"] == "workflow" assert result["workflow_type"] == "still" assert result["celery_task_id"] == "canvas-123" + assert run.execution_mode == "legacy" assert run.workflow_def_id == seeded["workflow_definition"].id assert run.order_line_id == seeded["order_line"].id assert run.celery_task_id == "canvas-123" @@ -274,6 +275,7 @@ async def test_dispatch_render_with_workflow_graph_mode_dispatches_supported_cus assert result["backend"] == "workflow_graph" assert result["execution_mode"] == "graph" assert result["task_ids"] == ["graph-task-1"] + assert run.execution_mode == "graph" assert run.status == "pending" assert node_results["setup"].status == "completed" assert node_results["template"].status == "completed" @@ -339,10 +341,144 @@ async def test_dispatch_render_with_workflow_graph_mode_falls_back_to_legacy_on_ assert result["backend"] == "legacy" assert result["fallback_from"] == "workflow_graph" assert result["workflow_run_id"] == str(run.id) + assert run.execution_mode == "graph" assert run.status == "failed" assert run.error_message == "graph dispatch exploded" +@pytest.mark.asyncio +async def test_dispatch_render_with_workflow_shadow_mode_keeps_legacy_authoritative_and_dispatches_graph_observer( + db, + admin_user, + monkeypatch, + tmp_path, +): + _use_test_database(monkeypatch) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Shadow Workflow {uuid.uuid4().hex[:8]}", + output_type_id=order_line.output_type_id, + config={ + "version": 1, + "ui": {"preset": "custom", "execution_mode": "shadow"}, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + ], + }, + is_active=True, + ) + db.add(workflow_definition) + await db.flush() + output_type = await db.get(OutputType, order_line.output_type_id) + assert output_type is not None + output_type.workflow_definition_id = workflow_definition.id + await db.commit() + + calls: list[tuple[str, list[str], dict]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict): + calls.append((task_name, args, kwargs)) + return type("Result", (), {"id": "shadow-task-1"})() + + monkeypatch.setattr( + "app.domains.rendering.dispatch_service._legacy_dispatch", + lambda order_line_id: {"backend": "legacy", "order_line_id": order_line_id}, + ) + monkeypatch.setattr("app.tasks.celery_app.celery_app.send_task", _fake_send_task) + + result = dispatch_render_with_workflow(str(order_line.id)) + + await db.rollback() + + run_result = await db.execute( + select(WorkflowRun) + .where(WorkflowRun.id == uuid.UUID(result["shadow_workflow_run_id"])) + .options(selectinload(WorkflowRun.node_results)) + ) + run = run_result.scalar_one() + render_call = calls[0] + + assert result["backend"] == "legacy" + assert result["execution_mode"] == "shadow" + assert result["shadow_status"] == "dispatched" + assert result["shadow_task_ids"] == ["shadow-task-1"] + assert run.execution_mode == "shadow" + assert run.status == "pending" + assert render_call[0] == "app.domains.rendering.tasks.render_order_line_still_task" + assert render_call[1] == [str(order_line.id)] + assert render_call[2]["publish_asset_enabled"] is False + assert render_call[2]["emit_events"] is False + assert render_call[2]["job_document_enabled"] is False + assert render_call[2]["output_name_suffix"].startswith("shadow-") + assert render_call[2]["workflow_run_id"] == str(run.id) + assert render_call[2]["workflow_node_id"] == "render" + + +@pytest.mark.asyncio +async def test_dispatch_render_with_workflow_shadow_mode_ignores_graph_failures_after_legacy_dispatch( + db, + admin_user, + monkeypatch, + tmp_path, +): + _use_test_database(monkeypatch) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Shadow Workflow {uuid.uuid4().hex[:8]}", + output_type_id=order_line.output_type_id, + config={ + "version": 1, + "ui": {"preset": "custom", "execution_mode": "shadow"}, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, + ], + "edges": [ + {"from": "setup", "to": "render"}, + ], + }, + is_active=True, + ) + db.add(workflow_definition) + await db.flush() + output_type = await db.get(OutputType, order_line.output_type_id) + assert output_type is not None + output_type.workflow_definition_id = workflow_definition.id + await db.commit() + + monkeypatch.setattr( + "app.domains.rendering.dispatch_service._legacy_dispatch", + lambda order_line_id: {"backend": "legacy", "order_line_id": order_line_id}, + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_graph_runtime.execute_graph_workflow", + lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("shadow graph exploded")), + ) + + result = dispatch_render_with_workflow(str(order_line.id)) + + await db.rollback() + + run = ( + await db.execute(select(WorkflowRun).order_by(WorkflowRun.created_at.desc())) + ).scalars().first() + + assert result["backend"] == "legacy" + assert result["execution_mode"] == "shadow" + assert result["shadow_status"] == "failed" + assert result["shadow_error"] == "shadow graph exploded" + assert result["shadow_workflow_run_id"] == str(run.id) + assert run.execution_mode == "shadow" + assert run.status == "failed" + assert run.error_message == "shadow graph exploded" + + @pytest.mark.asyncio async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results( client, @@ -384,21 +520,21 @@ async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results assert body["execution_mode"] == "graph" assert body["dispatched"] == 2 assert body["task_ids"] == ["task-1", "task-2"] - assert calls == [ - ( - "app.domains.rendering.tasks.render_order_line_still_task", - [context_id], - {"width": 640, "height": 640}, - ), - ( - "app.domains.rendering.tasks.export_blend_for_order_line_task", - [context_id], - {}, - ), + assert [call[0] for call in calls] == [ + "app.domains.rendering.tasks.render_order_line_still_task", + "app.domains.rendering.tasks.export_blend_for_order_line_task", ] + assert [call[1] for call in calls] == [[context_id], [context_id]] + assert calls[0][2]["width"] == 640 + assert calls[0][2]["height"] == 640 + assert calls[0][2]["workflow_node_id"] == "render" + assert calls[1][2]["workflow_node_id"] == "blend" + assert "workflow_run_id" in calls[0][2] + assert calls[0][2]["workflow_run_id"] == calls[1][2]["workflow_run_id"] node_results = {node["node_name"]: node for node in body["workflow_run"]["node_results"]} assert body["workflow_run"]["status"] == "pending" + assert body["workflow_run"]["execution_mode"] == "graph" assert body["workflow_run"]["celery_task_id"] == "task-1" assert node_results["render"]["status"] == "queued" assert node_results["render"]["output"]["task_id"] == "task-1" diff --git a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md index d933aeb..a6693df 100644 --- a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md +++ b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md @@ -30,7 +30,7 @@ - [x] Node outputs are persisted and reusable - [x] Graph runtime supports legacy fallback - [x] `legacy`, `graph`, and `shadow` modes exist -- Progress: Workflow configs now normalize to an explicit execution mode, the editor exposes and persists `legacy`/`graph`/`shadow`, production order-line dispatch can opt into graph mode with hard fallback to legacy on graph failure, and shadow mode is stored safely while still deferring duplicate-safe parity execution to Phase 6. +- Progress: Workflow configs now normalize to an explicit execution mode, the editor exposes and persists `legacy`/`graph`/`shadow`, production order-line dispatch can opt into graph mode with hard fallback to legacy on graph failure, and workflow runs now persist their execution mode for safer status tracking and rollout inspection. ### Phase 5 @@ -42,7 +42,7 @@ ### Phase 6 -- [ ] Shadow mode parity checks run on real workflows +- [x] Shadow mode parity execution dispatches real graph observer runs alongside authoritative legacy dispatch - [ ] Golden cases pass against legacy outputs - [ ] Rollout can be enabled per workflow or output type - [ ] Rollback to legacy is immediate diff --git a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md index 8e2456f..c9f3b94 100644 --- a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md +++ b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md @@ -92,7 +92,7 @@ ### Tickets -- `E6-T1` Add shadow mode parity execution. +- `E6-T1` Add shadow mode parity execution. `completed` - `E6-T2` Build output comparison tooling. - `E6-T3` Define golden test cases. - `E6-T4` Roll out per workflow or output type.