From f9d4da52b9e3b6918ebf3b3356c52c6cab1d7c8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Tue, 7 Apr 2026 10:56:45 +0200 Subject: [PATCH] feat: add graph workflow fallback and retry metadata --- .../app/domains/rendering/dispatch_service.py | 94 ++++++++++- .../rendering/workflow_config_utils.py | 11 ++ .../rendering/workflow_graph_runtime.py | 156 ++++++++++++++---- .../app/domains/rendering/workflow_schema.py | 3 + .../domains/test_workflow_dispatch_service.py | 123 ++++++++++++++ .../domains/test_workflow_graph_runtime.py | 116 ++++++++++++- docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md | 4 +- .../WORKFLOW_IMPLEMENTATION_BACKLOG.md | 4 +- frontend/src/api/workflows.ts | 1 + 9 files changed, 473 insertions(+), 39 deletions(-) diff --git a/backend/app/domains/rendering/dispatch_service.py b/backend/app/domains/rendering/dispatch_service.py index ef630f5..2b1fe48 100644 --- a/backend/app/domains/rendering/dispatch_service.py +++ b/backend/app/domains/rendering/dispatch_service.py @@ -32,7 +32,15 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: from app.config import settings from app.domains.orders.models import OrderLine from app.domains.rendering.models import OutputType, WorkflowDefinition + from app.domains.rendering.workflow_config_utils import ( + extract_runtime_workflow, + get_workflow_execution_mode, + ) from app.domains.rendering.workflow_executor import prepare_workflow_context + from app.domains.rendering.workflow_graph_runtime import ( + execute_graph_workflow, + find_unsupported_graph_nodes, + ) from app.domains.rendering.workflow_run_service import create_workflow_run, mark_workflow_run_failed engine = create_engine( @@ -78,7 +86,90 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: ) return _legacy_dispatch(order_line_id) - from app.domains.rendering.workflow_config_utils import extract_runtime_workflow + execution_mode = get_workflow_execution_mode(wf_def.config, default="legacy") + + if execution_mode == "graph": + try: + workflow_context = prepare_workflow_context( + wf_def.config, + context_id=order_line_id, + execution_mode="graph", + ) + except Exception as exc: + logger.warning( + "order_line %s: workflow_definition_id %s failed graph runtime preparation (%s), " + "falling back to legacy dispatch", + order_line_id, + wf_def.id, + exc, + ) + return _legacy_dispatch(order_line_id) + + unsupported_nodes = find_unsupported_graph_nodes(workflow_context) + if unsupported_nodes: + logger.warning( + "order_line %s: workflow_definition_id %s contains graph-unsupported nodes %s, " + "falling back to legacy dispatch", + order_line_id, + wf_def.id, + unsupported_nodes, + ) + return _legacy_dispatch(order_line_id) + + run = None + try: + run = create_workflow_run( + session, + workflow_def_id=wf_def.id, + order_line_id=line.id, + workflow_context=workflow_context, + ) + session.commit() + except Exception as exc: + session.rollback() + logger.warning( + "order_line %s: failed to create graph workflow run for workflow_definition_id %s (%s), " + "falling back to legacy dispatch", + order_line_id, + wf_def.id, + exc, + ) + return _legacy_dispatch(order_line_id) + + try: + dispatch_result = execute_graph_workflow(session, workflow_context) + session.commit() + except Exception as exc: + if run is not None: + mark_workflow_run_failed(run, str(exc)) + session.commit() + logger.exception( + "order_line %s: graph workflow execution via definition %s failed, falling back to legacy dispatch", + order_line_id, + wf_def.id, + ) + fallback_result = _legacy_dispatch(order_line_id) + fallback_result["fallback_from"] = "workflow_graph" + if run is not None: + fallback_result["workflow_run_id"] = str(run.id) + return fallback_result + + return { + "backend": "workflow_graph", + "execution_mode": "graph", + "workflow_run_id": str(run.id), + "celery_task_id": dispatch_result.task_ids[0] if dispatch_result.task_ids else None, + "task_ids": dispatch_result.task_ids, + } + + if execution_mode == "shadow": + logger.warning( + "order_line %s: workflow_definition_id %s requested shadow mode, " + "falling back to legacy dispatch until duplicate-safe shadow execution exists", + order_line_id, + wf_def.id, + ) + return _legacy_dispatch(order_line_id) workflow_type, params = extract_runtime_workflow(wf_def.config) if workflow_type is None or workflow_type == "custom": @@ -178,6 +269,7 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict: return { "backend": "workflow", "workflow_type": workflow_type, + "execution_mode": "legacy", "workflow_run_id": str(run.id), "celery_task_id": celery_task_id, } diff --git a/backend/app/domains/rendering/workflow_config_utils.py b/backend/app/domains/rendering/workflow_config_utils.py index bef4bd8..1490369 100644 --- a/backend/app/domains/rendering/workflow_config_utils.py +++ b/backend/app/domains/rendering/workflow_config_utils.py @@ -17,6 +17,8 @@ _PRESET_TYPES = { "custom", } +_EXECUTION_MODES = {"legacy", "graph", "shadow"} + _NODE_TYPE_TO_STEP: dict[str, str] = { "inputNode": StepName.RESOLVE_STEP_PATH.value, "convertNode": StepName.STL_CACHE_GENERATE.value, @@ -243,6 +245,15 @@ def get_workflow_preset_type(config: dict[str, Any]) -> str | None: return None +def get_workflow_execution_mode(config: dict[str, Any], *, default: str = "legacy") -> str: + canonical = canonicalize_workflow_config(config) + ui = canonical.get("ui") or {} + mode = ui.get("execution_mode") + if mode in _EXECUTION_MODES: + return mode + return default + + def extract_runtime_workflow(config: dict[str, Any]) -> tuple[str | None, dict[str, Any]]: canonical = canonicalize_workflow_config(config) preset = get_workflow_preset_type(canonical) diff --git a/backend/app/domains/rendering/workflow_graph_runtime.py b/backend/app/domains/rendering/workflow_graph_runtime.py index 89fea1e..80c43e1 100644 --- a/backend/app/domains/rendering/workflow_graph_runtime.py +++ b/backend/app/domains/rendering/workflow_graph_runtime.py @@ -53,6 +53,17 @@ _ORDER_LINE_RENDER_STEPS = { } +def find_unsupported_graph_nodes(workflow_context: WorkflowContext) -> list[str]: + unsupported: list[str] = [] + for node in workflow_context.ordered_nodes: + if node.step in _BRIDGE_EXECUTORS: + continue + if STEP_TASK_MAP.get(node.step) is not None: + continue + unsupported.append(node.id) + return unsupported + + def execute_graph_workflow( session: Session, workflow_context: WorkflowContext, @@ -82,48 +93,102 @@ def execute_graph_workflow( ) continue + retry_policy = _retry_policy(node.params) + failure_policy = _failure_policy(node.params) metadata = _base_output(node_result.output, node) + metadata["retry_policy"] = retry_policy + metadata["failure_policy"] = failure_policy definition = get_node_definition(node.step) bridge_executor = _BRIDGE_EXECUTORS.get(node.step) if bridge_executor is not None: - started = time.perf_counter() - node_result.status = "running" - node_result.output = dict(metadata) - session.flush() - try: - payload, status, log_message = bridge_executor( - session=session, - workflow_context=workflow_context, - state=state, - node_params=node.params, - ) - except Exception as exc: - node_result.status = "failed" - node_result.log = str(exc)[:2000] - node_result.duration_s = round(time.perf_counter() - started, 4) - node_result.output = dict(metadata) + max_attempts = retry_policy["max_attempts"] + last_error: str | None = None + + for attempt in range(1, max_attempts + 1): + started = time.perf_counter() + attempt_output = dict(metadata) + attempt_output["attempt_count"] = attempt + attempt_output["max_attempts"] = max_attempts + node_result.status = "running" + node_result.output = attempt_output session.flush() - raise WorkflowGraphRuntimeError( - f"Node '{node.id}' ({node.step.value}) failed: {exc}" - ) from exc - if payload: - metadata.update(payload) - state.node_outputs[node.id] = payload + try: + payload, status, log_message = bridge_executor( + session=session, + workflow_context=workflow_context, + state=state, + node_params=node.params, + ) + except Exception as exc: + last_error = str(exc)[:2000] + if attempt < max_attempts: + retry_output = dict(attempt_output) + retry_output["last_error"] = last_error + retry_output["retry_state"] = "retrying" + node_result.status = "retrying" + node_result.log = f"Attempt {attempt}/{max_attempts} failed: {last_error}" + node_result.output = retry_output + node_result.duration_s = round(time.perf_counter() - started, 4) + session.flush() + continue - node_result.status = status - node_result.log = log_message - node_result.output = dict(metadata) - node_result.duration_s = round(time.perf_counter() - started, 4) - session.flush() + failed_output = dict(attempt_output) + failed_output["last_error"] = last_error + failed_output["retry_exhausted"] = True + node_result.status = "failed" + node_result.log = last_error + node_result.duration_s = round(time.perf_counter() - started, 4) + node_result.output = failed_output + session.flush() + raise WorkflowGraphRuntimeError( + f"Node '{node.id}' ({node.step.value}) failed: {exc}" + ) from exc - if status == "failed": - raise WorkflowGraphRuntimeError( - f"Node '{node.id}' ({node.step.value}) failed: {log_message or 'unknown error'}" - ) - if status == "skipped": - skipped_node_ids.append(node.id) + if payload: + metadata.update(payload) + state.node_outputs[node.id] = payload + + final_output = dict(metadata) + final_output["attempt_count"] = attempt + final_output["max_attempts"] = max_attempts + if last_error is not None: + final_output["last_error"] = last_error + final_output["retry_state"] = "recovered" + + node_result.status = status + node_result.log = log_message + node_result.output = final_output + node_result.duration_s = round(time.perf_counter() - started, 4) + session.flush() + + if status == "failed": + last_error = (log_message or "unknown error")[:2000] + if attempt < max_attempts: + retry_output = dict(final_output) + retry_output["last_error"] = last_error + retry_output["retry_state"] = "retrying" + node_result.status = "retrying" + node_result.log = f"Attempt {attempt}/{max_attempts} failed: {last_error}" + node_result.output = retry_output + session.flush() + continue + + failed_output = dict(final_output) + failed_output["last_error"] = last_error + failed_output["retry_exhausted"] = True + node_result.status = "failed" + node_result.log = last_error + node_result.output = failed_output + session.flush() + raise WorkflowGraphRuntimeError( + f"Node '{node.id}' ({node.step.value}) failed: {last_error}" + ) + + if status == "skipped": + skipped_node_ids.append(node.id) + break continue task_name = STEP_TASK_MAP.get(node.step) @@ -147,6 +212,8 @@ def execute_graph_workflow( metadata["task_id"] = result.id if definition is not None: metadata["execution_kind"] = definition.execution_kind + metadata["attempt_count"] = 1 + metadata["max_attempts"] = retry_policy["max_attempts"] node_result.status = "queued" node_result.output = metadata node_result.log = None @@ -203,6 +270,29 @@ def _base_output(existing: dict[str, Any] | None, node) -> dict[str, Any]: return metadata +def _retry_policy(node_params: dict[str, Any]) -> dict[str, Any]: + raw = node_params.get("retry_policy") + if not isinstance(raw, dict): + raw = {} + try: + max_attempts = int(raw.get("max_attempts", 1)) + except (TypeError, ValueError): + max_attempts = 1 + return { + "max_attempts": max(1, min(max_attempts, 5)), + } + + +def _failure_policy(node_params: dict[str, Any]) -> dict[str, Any]: + raw = node_params.get("failure_policy") + if not isinstance(raw, dict): + raw = {} + return { + "halt_workflow": bool(raw.get("halt_workflow", True)), + "fallback_to_legacy": bool(raw.get("fallback_to_legacy", False)), + } + + def _serialize_setup_result(result: OrderLineRenderSetupResult) -> dict[str, Any]: payload: dict[str, Any] = { "setup_status": result.status, diff --git a/backend/app/domains/rendering/workflow_schema.py b/backend/app/domains/rendering/workflow_schema.py index 1780a97..04e416f 100644 --- a/backend/app/domains/rendering/workflow_schema.py +++ b/backend/app/domains/rendering/workflow_schema.py @@ -16,6 +16,8 @@ Example config:: ] } """ +from typing import Literal + from pydantic import BaseModel, Field, field_validator, model_validator from app.core.process_steps import StepName @@ -49,6 +51,7 @@ class WorkflowEdge(BaseModel): class WorkflowUI(BaseModel): preset: str | None = None + execution_mode: Literal["legacy", "graph", "shadow"] | None = None class WorkflowConfig(BaseModel): diff --git a/backend/tests/domains/test_workflow_dispatch_service.py b/backend/tests/domains/test_workflow_dispatch_service.py index 3b7e5ce..c086722 100644 --- a/backend/tests/domains/test_workflow_dispatch_service.py +++ b/backend/tests/domains/test_workflow_dispatch_service.py @@ -220,6 +220,129 @@ async def test_dispatch_render_with_workflow_falls_back_when_workflow_runtime_pr assert runs == [] +@pytest.mark.asyncio +async def test_dispatch_render_with_workflow_graph_mode_dispatches_supported_custom_workflow( + db, + admin_user, + monkeypatch, + tmp_path, +): + _use_test_database(monkeypatch) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Graph Workflow {uuid.uuid4().hex[:8]}", + output_type_id=order_line.output_type_id, + config={ + "version": 1, + "ui": {"preset": "custom", "execution_mode": "graph"}, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + ], + }, + is_active=True, + ) + db.add(workflow_definition) + await db.flush() + output_type = await db.get(OutputType, order_line.output_type_id) + assert output_type is not None + output_type.workflow_definition_id = workflow_definition.id + await db.commit() + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + lambda task_name, args, kwargs: type("Result", (), {"id": "graph-task-1"})(), + ) + + result = dispatch_render_with_workflow(str(order_line.id)) + + await db.rollback() + + run_result = await db.execute( + select(WorkflowRun) + .where(WorkflowRun.id == uuid.UUID(result["workflow_run_id"])) + .options(selectinload(WorkflowRun.node_results)) + ) + run = run_result.scalar_one() + node_results = {node_result.node_name: node_result for node_result in run.node_results} + + assert result["backend"] == "workflow_graph" + assert result["execution_mode"] == "graph" + assert result["task_ids"] == ["graph-task-1"] + assert run.status == "pending" + assert node_results["setup"].status == "completed" + assert node_results["template"].status == "completed" + assert node_results["render"].status == "queued" + + +@pytest.mark.asyncio +async def test_dispatch_render_with_workflow_graph_mode_falls_back_to_legacy_on_graph_failure( + db, + admin_user, + monkeypatch, + tmp_path, +): + _use_test_database(monkeypatch) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_definition = WorkflowDefinition( + name=f"Graph Workflow {uuid.uuid4().hex[:8]}", + output_type_id=order_line.output_type_id, + config={ + "version": 1, + "ui": {"preset": "custom", "execution_mode": "graph"}, + "nodes": [ + { + "id": "setup", + "step": "order_line_setup", + "params": {"failure_policy": {"fallback_to_legacy": True}}, + }, + {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, + ], + "edges": [ + {"from": "setup", "to": "render"}, + ], + }, + is_active=True, + ) + db.add(workflow_definition) + await db.flush() + output_type = await db.get(OutputType, order_line.output_type_id) + assert output_type is not None + output_type.workflow_definition_id = workflow_definition.id + await db.commit() + + monkeypatch.setattr( + "app.domains.rendering.workflow_graph_runtime.execute_graph_workflow", + lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("graph dispatch exploded")), + ) + monkeypatch.setattr( + "app.domains.rendering.dispatch_service._legacy_dispatch", + lambda order_line_id: {"backend": "legacy", "order_line_id": order_line_id}, + ) + + result = dispatch_render_with_workflow(str(order_line.id)) + + await db.rollback() + + runs = ( + await db.execute( + select(WorkflowRun).options(selectinload(WorkflowRun.node_results)).order_by(WorkflowRun.created_at.desc()) + ) + ).scalars().all() + run = runs[0] + + assert result["backend"] == "legacy" + assert result["fallback_from"] == "workflow_graph" + assert result["workflow_run_id"] == str(run.id) + assert run.status == "failed" + assert run.error_message == "graph dispatch exploded" + + @pytest.mark.asyncio async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results( client, diff --git a/backend/tests/domains/test_workflow_graph_runtime.py b/backend/tests/domains/test_workflow_graph_runtime.py index 0d3c140..653d9c1 100644 --- a/backend/tests/domains/test_workflow_graph_runtime.py +++ b/backend/tests/domains/test_workflow_graph_runtime.py @@ -16,8 +16,9 @@ from app.domains.orders.models import Order, OrderLine, OrderStatus from app.domains.products.models import CadFile, Product from app.domains.rendering.models import OutputType, RenderTemplate, WorkflowRun from app.domains.rendering.workflow_executor import prepare_workflow_context -from app.domains.rendering.workflow_graph_runtime import execute_graph_workflow +from app.domains.rendering.workflow_graph_runtime import WorkflowGraphRuntimeError, execute_graph_workflow from app.domains.rendering.workflow_run_service import create_workflow_run +from app.domains.rendering.workflow_runtime_services import OrderLineRenderSetupResult import app.models # noqa: F401 @@ -239,3 +240,116 @@ def test_execute_graph_workflow_persists_bridge_outputs_and_queues_render_task( {"InnerRing": "Steel", "OuterRing": "Rubber"}, ) ] + + +def test_execute_graph_workflow_retries_bridge_node_and_persists_attempt_metadata( + sync_session, + monkeypatch, +): + attempts = {"count": 0} + + def _flaky_prepare(_session, _context_id): + attempts["count"] += 1 + if attempts["count"] == 1: + raise RuntimeError("temporary setup failure") + return OrderLineRenderSetupResult(status="skip", reason="line_cancelled") + + monkeypatch.setattr( + "app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context", + _flaky_prepare, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + { + "id": "setup", + "step": "order_line_setup", + "params": {"retry_policy": {"max_attempts": 2}}, + }, + ], + "edges": [], + }, + context_id=str(uuid.uuid4()), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=None, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup") + + assert dispatch_result.task_ids == [] + assert refreshed_run.status == "completed" + assert setup_result.status == "skipped" + assert setup_result.output["attempt_count"] == 2 + assert setup_result.output["max_attempts"] == 2 + assert setup_result.output["retry_state"] == "recovered" + assert setup_result.output["last_error"] == "temporary setup failure" + assert setup_result.output["retry_policy"]["max_attempts"] == 2 + + +def test_execute_graph_workflow_marks_failed_node_with_retry_exhausted_metadata( + sync_session, + monkeypatch, +): + monkeypatch.setattr( + "app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context", + lambda _session, _context_id: (_ for _ in ()).throw(RuntimeError("permanent setup failure")), + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + { + "id": "setup", + "step": "order_line_setup", + "params": { + "retry_policy": {"max_attempts": 2}, + "failure_policy": {"fallback_to_legacy": True}, + }, + }, + ], + "edges": [], + }, + context_id=str(uuid.uuid4()), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=None, + workflow_context=workflow_context, + ) + + with pytest.raises(WorkflowGraphRuntimeError, match="permanent setup failure"): + execute_graph_workflow(sync_session, workflow_context) + + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup") + + assert setup_result.status == "failed" + assert setup_result.output["attempt_count"] == 2 + assert setup_result.output["max_attempts"] == 2 + assert setup_result.output["retry_exhausted"] is True + assert setup_result.output["last_error"] == "permanent setup failure" + assert setup_result.output["failure_policy"]["fallback_to_legacy"] is True diff --git a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md index c3c4f99..40e8fe9 100644 --- a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md +++ b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md @@ -28,9 +28,9 @@ - [x] Workflow context introduced - [x] Node outputs are persisted and reusable -- [ ] Graph runtime supports legacy fallback +- [x] Graph runtime supports legacy fallback - [ ] `legacy`, `graph`, and `shadow` modes exist -- Progress: Graph dispatch now executes the extracted bridge nodes (`order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, `glb_bbox`) synchronously, persists per-node outputs/logs/durations onto `WorkflowNodeResult`, and continues to queue render/export nodes through Celery without changing the legacy preset dispatcher. +- Progress: Graph dispatch now persists retry/failure policy metadata per node, retries bridge-node failures within the configured attempt budget, and the production order-line dispatcher can opt into explicit graph mode with a hard fallback back to legacy dispatch if graph execution preparation or runtime fails. ### Phase 5 diff --git a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md index c9f38b9..0f95214 100644 --- a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md +++ b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md @@ -73,9 +73,9 @@ - `E4-T1` Introduce `WorkflowContext`. `completed` - `E4-T2` Refactor executor to process nodes against context and node outputs. `completed` - `E4-T3` Persist node-level run records, logs, timings, and outputs. `completed` -- `E4-T4` Support retry and failure policies. +- `E4-T4` Support retry and failure policies. `completed` - `E4-T5` Add execution mode switch: `legacy`, `graph`, `shadow`. -- `E4-T6` Add hard fallback to legacy dispatch on graph failure. +- `E4-T6` Add hard fallback to legacy dispatch on graph failure. `completed` ## Epic 5: Editor Parity diff --git a/frontend/src/api/workflows.ts b/frontend/src/api/workflows.ts index 2c45532..e1b3219 100644 --- a/frontend/src/api/workflows.ts +++ b/frontend/src/api/workflows.ts @@ -51,6 +51,7 @@ export interface WorkflowEdge { export interface WorkflowUi { preset?: WorkflowPresetType + execution_mode?: 'legacy' | 'graph' | 'shadow' } export interface WorkflowCreate {