diff --git a/backend/app/domains/rendering/workflow_graph_runtime.py b/backend/app/domains/rendering/workflow_graph_runtime.py new file mode 100644 index 0000000..89fea1e --- /dev/null +++ b/backend/app/domains/rendering/workflow_graph_runtime.py @@ -0,0 +1,396 @@ +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any + +from sqlalchemy import select +from sqlalchemy.orm import Session, selectinload + +from app.core.process_steps import StepName +from app.domains.rendering.models import WorkflowNodeResult, WorkflowRun +from app.domains.rendering.workflow_executor import STEP_TASK_MAP, WorkflowContext, WorkflowDispatchResult +from app.domains.rendering.workflow_node_registry import get_node_definition +from app.domains.rendering.workflow_runtime_services import ( + AutoPopulateMaterialsResult, + BBoxResolutionResult, + MaterialResolutionResult, + OrderLineRenderSetupResult, + TemplateResolutionResult, + auto_populate_materials_for_cad, + prepare_order_line_render_context, + resolve_cad_bbox, + resolve_order_line_material_map, + resolve_order_line_template_context, +) + +logger = logging.getLogger(__name__) + + +class WorkflowGraphRuntimeError(RuntimeError): + pass + + +@dataclass(slots=True) +class WorkflowGraphState: + setup: OrderLineRenderSetupResult | None = None + template: TemplateResolutionResult | None = None + materials: MaterialResolutionResult | None = None + auto_populate: AutoPopulateMaterialsResult | None = None + bbox: BBoxResolutionResult | None = None + node_outputs: dict[str, dict[str, Any]] = field(default_factory=dict) + + +_ORDER_LINE_RENDER_STEPS = { + StepName.BLENDER_STILL, + StepName.BLENDER_TURNTABLE, + StepName.EXPORT_BLEND, + StepName.OUTPUT_SAVE, + StepName.NOTIFY, +} + + +def execute_graph_workflow( + session: Session, + workflow_context: WorkflowContext, +) -> WorkflowDispatchResult: + if workflow_context.workflow_run_id is None: + raise ValueError("workflow_context.workflow_run_id is required for graph execution") + + run = session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == workflow_context.workflow_run_id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + + node_results = {node_result.node_name: node_result for node_result in run.node_results} + state = WorkflowGraphState() + task_ids: list[str] = [] + node_task_ids: dict[str, str] = {} + skipped_node_ids: list[str] = [] + + for node in workflow_context.ordered_nodes: + node_result = node_results.get(node.id) + if node_result is None: + logger.warning( + "[WORKFLOW] Missing WorkflowNodeResult row for node %s on run %s", + node.id, + run.id, + ) + continue + + metadata = _base_output(node_result.output, node) + definition = get_node_definition(node.step) + bridge_executor = _BRIDGE_EXECUTORS.get(node.step) + + if bridge_executor is not None: + started = time.perf_counter() + node_result.status = "running" + node_result.output = dict(metadata) + session.flush() + try: + payload, status, log_message = bridge_executor( + session=session, + workflow_context=workflow_context, + state=state, + node_params=node.params, + ) + except Exception as exc: + node_result.status = "failed" + node_result.log = str(exc)[:2000] + node_result.duration_s = round(time.perf_counter() - started, 4) + node_result.output = dict(metadata) + session.flush() + raise WorkflowGraphRuntimeError( + f"Node '{node.id}' ({node.step.value}) failed: {exc}" + ) from exc + + if payload: + metadata.update(payload) + state.node_outputs[node.id] = payload + + node_result.status = status + node_result.log = log_message + node_result.output = dict(metadata) + node_result.duration_s = round(time.perf_counter() - started, 4) + session.flush() + + if status == "failed": + raise WorkflowGraphRuntimeError( + f"Node '{node.id}' ({node.step.value}) failed: {log_message or 'unknown error'}" + ) + if status == "skipped": + skipped_node_ids.append(node.id) + continue + + task_name = STEP_TASK_MAP.get(node.step) + if task_name is not None: + if node.step in _ORDER_LINE_RENDER_STEPS and state.setup is not None and not state.setup.is_ready: + metadata["blocked_by"] = "order_line_setup" + node_result.status = "skipped" + node_result.output = metadata + node_result.log = ( + f"Skipped because order_line_setup did not complete successfully " + f"({state.setup.status})" + ) + node_result.duration_s = None + session.flush() + skipped_node_ids.append(node.id) + continue + + from app.tasks.celery_app import celery_app + + result = celery_app.send_task(task_name, args=[workflow_context.context_id], kwargs=node.params) + metadata["task_id"] = result.id + if definition is not None: + metadata["execution_kind"] = definition.execution_kind + node_result.status = "queued" + node_result.output = metadata + node_result.log = None + node_result.duration_s = None + session.flush() + task_ids.append(result.id) + node_task_ids[node.id] = result.id + logger.info( + "[WORKFLOW] Dispatched node %r (step=%s, mode=%s, run=%s) -> Celery task %s", + node.id, + node.step, + workflow_context.execution_mode, + workflow_context.workflow_run_id, + result.id, + ) + continue + + metadata["execution_kind"] = definition.execution_kind if definition is not None else "bridge" + node_result.status = "skipped" + node_result.output = metadata + node_result.log = f"Graph runtime not implemented for step '{node.step.value}'" + node_result.duration_s = None + session.flush() + skipped_node_ids.append(node.id) + + run.celery_task_id = task_ids[0] if task_ids else None + if any(node_result.status == "failed" for node_result in run.node_results): + run.status = "failed" + run.completed_at = datetime.utcnow() + elif task_ids: + run.status = "pending" + run.completed_at = None + else: + run.status = "completed" + run.completed_at = datetime.utcnow() + session.flush() + + return WorkflowDispatchResult( + context=workflow_context, + task_ids=task_ids, + node_task_ids=node_task_ids, + skipped_node_ids=skipped_node_ids, + ) + + +def _base_output(existing: dict[str, Any] | None, node) -> dict[str, Any]: + metadata = dict(existing or {}) + metadata.setdefault("step", node.step.value) + if node.ui and node.ui.label: + metadata.setdefault("label", node.ui.label) + definition = get_node_definition(node.step) + if definition is not None: + metadata.setdefault("execution_kind", definition.execution_kind) + return metadata + + +def _serialize_setup_result(result: OrderLineRenderSetupResult) -> dict[str, Any]: + payload: dict[str, Any] = { + "setup_status": result.status, + "reason": result.reason, + "materials_source_count": len(result.materials_source or []), + "part_colors_count": len(result.part_colors or {}), + "usd_render_path": str(result.usd_render_path) if result.usd_render_path else None, + "glb_reuse_path": str(result.glb_reuse_path) if result.glb_reuse_path else None, + } + if result.order_line is not None: + payload["order_line_id"] = str(result.order_line.id) + payload["product_id"] = str(result.order_line.product_id) if result.order_line.product_id else None + payload["output_type_id"] = str(result.order_line.output_type_id) if result.order_line.output_type_id else None + if result.order is not None: + payload["order_id"] = str(result.order.id) + payload["order_status"] = result.order.status.value if getattr(result.order, "status", None) else None + if result.cad_file is not None: + payload["cad_file_id"] = str(result.cad_file.id) + payload["step_path"] = result.cad_file.stored_path + return payload + + +def _serialize_template_result(result: TemplateResolutionResult) -> dict[str, Any]: + return { + "template_id": str(result.template.id) if result.template is not None else None, + "template_name": result.template.name if result.template is not None else None, + "template_path": result.template.blend_file_path if result.template is not None else None, + "material_library": result.material_library, + "material_map": result.material_map, + "material_map_count": len(result.material_map or {}), + "use_materials": result.use_materials, + "override_material": result.override_material, + "category_key": result.category_key, + "output_type_id": result.output_type_id, + } + + +def _serialize_material_result(result: MaterialResolutionResult) -> dict[str, Any]: + return { + "material_map": result.material_map, + "material_map_count": len(result.material_map or {}), + "use_materials": result.use_materials, + "override_material": result.override_material, + "source_material_count": result.source_material_count, + "resolved_material_count": result.resolved_material_count, + } + + +def _serialize_auto_populate_result(result: AutoPopulateMaterialsResult) -> dict[str, Any]: + return { + "cad_file_id": result.cad_file_id, + "updated_product_ids": result.updated_product_ids, + "updated_product_count": len(result.updated_product_ids), + "queued_thumbnail_regeneration": result.queued_thumbnail_regeneration, + "part_colors": result.part_colors, + "part_colors_count": len(result.part_colors or {}), + "cad_parts": result.cad_parts, + } + + +def _serialize_bbox_result(result: BBoxResolutionResult) -> dict[str, Any]: + return { + "bbox_data": result.bbox_data, + "has_bbox": result.has_bbox, + "source_kind": result.source_kind, + "step_path": result.step_path, + "glb_path": result.glb_path, + } + + +def _execute_order_line_setup( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + del node_params + setup = prepare_order_line_render_context(session, workflow_context.context_id) + state.setup = setup + payload = _serialize_setup_result(setup) + if setup.status == "ready": + return payload, "completed", None + if setup.status == "skip": + return payload, "skipped", setup.reason + return payload, "failed", setup.reason or "order_line_setup_failed" + + +def _execute_resolve_template( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + del workflow_context, node_params + if state.setup is None or not state.setup.is_ready: + if state.setup is not None and state.setup.status == "skip": + return _serialize_setup_result(state.setup), "skipped", state.setup.reason + raise WorkflowGraphRuntimeError("resolve_template requires a ready order_line_setup result") + result = resolve_order_line_template_context(session, state.setup) + state.template = result + return _serialize_template_result(result), "completed", None + + +def _execute_material_map_resolve( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + del session, workflow_context, node_params + if state.setup is None or not state.setup.is_ready: + if state.setup is not None and state.setup.status == "skip": + return _serialize_setup_result(state.setup), "skipped", state.setup.reason + raise WorkflowGraphRuntimeError("material_map_resolve requires a ready order_line_setup result") + + line = state.setup.order_line + cad_file = state.setup.cad_file + if line is None: + raise WorkflowGraphRuntimeError("material_map_resolve requires an order line") + + material_library = state.template.material_library if state.template is not None else None + template = state.template.template if state.template is not None else None + result = resolve_order_line_material_map( + line, + cad_file, + state.setup.materials_source, + material_library=material_library, + template=template, + ) + state.materials = result + return _serialize_material_result(result), "completed", None + + +def _execute_auto_populate_materials( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + del workflow_context, node_params + if state.setup is None or state.setup.cad_file is None: + if state.setup is not None and state.setup.status == "skip": + return _serialize_setup_result(state.setup), "skipped", state.setup.reason + raise WorkflowGraphRuntimeError("auto_populate_materials requires a resolved cad_file") + result = auto_populate_materials_for_cad(session, str(state.setup.cad_file.id)) + state.auto_populate = result + if state.setup.order_line is not None and state.setup.order_line.product is not None: + session.refresh(state.setup.order_line.product) + state.setup.materials_source = state.setup.order_line.product.cad_part_materials or [] + return _serialize_auto_populate_result(result), "completed", None + + +def _execute_glb_bbox( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + del session, workflow_context + if state.setup is None or state.setup.cad_file is None: + if state.setup is not None and state.setup.status == "skip": + return _serialize_setup_result(state.setup), "skipped", state.setup.reason + raise WorkflowGraphRuntimeError("glb_bbox requires a resolved cad_file") + + step_path = state.setup.cad_file.stored_path + glb_path = node_params.get("glb_path") + if glb_path is None and state.setup.glb_reuse_path is not None: + glb_path = str(state.setup.glb_reuse_path) + elif glb_path is None: + step_file = Path(step_path) + fallback_glb = step_file.parent / f"{step_file.stem}_thumbnail.glb" + if fallback_glb.exists(): + glb_path = str(fallback_glb) + + result = resolve_cad_bbox(step_path, glb_path=glb_path) + state.bbox = result + return _serialize_bbox_result(result), "completed", None + + +_BRIDGE_EXECUTORS = { + StepName.ORDER_LINE_SETUP: _execute_order_line_setup, + StepName.RESOLVE_TEMPLATE: _execute_resolve_template, + StepName.MATERIAL_MAP_RESOLVE: _execute_material_map_resolve, + StepName.AUTO_POPULATE_MATERIALS: _execute_auto_populate_materials, + StepName.GLB_BBOX: _execute_glb_bbox, +} diff --git a/backend/app/domains/rendering/workflow_router.py b/backend/app/domains/rendering/workflow_router.py index 4dd1948..e660bba 100644 --- a/backend/app/domains/rendering/workflow_router.py +++ b/backend/app/domains/rendering/workflow_router.py @@ -228,12 +228,9 @@ async def dispatch_workflow_endpoint( the caller can track progress. """ from pydantic import ValidationError as _ValidationError - from app.domains.rendering.workflow_executor import ( - dispatch_prepared_workflow, - prepare_workflow_context, - ) + from app.domains.rendering.workflow_executor import prepare_workflow_context + from app.domains.rendering.workflow_graph_runtime import execute_graph_workflow from app.domains.rendering.workflow_run_service import ( - apply_graph_dispatch_result, create_workflow_run, mark_workflow_run_failed, ) @@ -269,7 +266,9 @@ async def dispatch_workflow_endpoint( await db.commit() try: - dispatch_result = dispatch_prepared_workflow(workflow_context) + dispatch_result = await db.run_sync( + lambda sync_session: execute_graph_workflow(sync_session, workflow_context) + ) except Exception as exc: failed_result = await db.execute( select(WorkflowRun) @@ -280,14 +279,6 @@ async def dispatch_workflow_endpoint( mark_workflow_run_failed(failed_run, str(exc)) await db.commit() raise - - run_result = await db.execute( - select(WorkflowRun) - .where(WorkflowRun.id == run_id) - .options(selectinload(WorkflowRun.node_results)) - ) - run = run_result.scalar_one() - apply_graph_dispatch_result(run, workflow_context, dispatch_result) await db.commit() refreshed_result = await db.execute( diff --git a/backend/tests/domains/test_workflow_dispatch_service.py b/backend/tests/domains/test_workflow_dispatch_service.py index c1fee7c..3b7e5ce 100644 --- a/backend/tests/domains/test_workflow_dispatch_service.py +++ b/backend/tests/domains/test_workflow_dispatch_service.py @@ -1,6 +1,7 @@ from __future__ import annotations import uuid +from pathlib import Path import pytest from sqlalchemy import select @@ -8,7 +9,7 @@ from sqlalchemy.orm import selectinload from app.config import settings from app.domains.orders.models import Order, OrderLine -from app.domains.products.models import Product +from app.domains.products.models import CadFile, Product from app.domains.rendering.dispatch_service import dispatch_render_with_workflow from app.domains.rendering.models import OutputType, WorkflowDefinition, WorkflowRun from app.domains.rendering.workflow_config_utils import build_preset_workflow_config @@ -70,6 +71,47 @@ async def _seed_order_line( } +async def _seed_renderable_order_line( + db, + admin_user, + tmp_path: Path, +) -> OrderLine: + step_path = tmp_path / "dispatch" / "product.step" + step_path.parent.mkdir(parents=True, exist_ok=True) + step_path.write_text("STEP", encoding="utf-8") + + cad_file = CadFile( + original_name="product.step", + stored_path=str(step_path), + file_hash=f"hash-{uuid.uuid4().hex}", + parsed_objects={"objects": ["Body"]}, + ) + product = Product( + pim_id=f"PIM-{uuid.uuid4().hex[:8]}", + name="Dispatch Product", + category_key="dispatch", + cad_file=cad_file, + cad_part_materials=[{"part_name": "Body", "material": "Steel"}], + ) + output_type = OutputType( + name=f"Workflow Output {uuid.uuid4().hex[:8]}", + render_backend="auto", + ) + order = Order( + order_number=f"WF-{uuid.uuid4().hex[:10]}", + created_by=admin_user.id, + ) + order_line = OrderLine( + order=order, + product=product, + output_type=output_type, + ) + db.add_all([cad_file, product, output_type, order, order_line]) + await db.commit() + await db.refresh(order_line) + return order_line + + @pytest.mark.asyncio async def test_dispatch_render_with_workflow_falls_back_to_legacy_without_workflow_definition( db, @@ -182,9 +224,13 @@ async def test_dispatch_render_with_workflow_falls_back_when_workflow_runtime_pr async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results( client, db, + admin_user, auth_headers, + tmp_path, monkeypatch, ): + monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) workflow_definition = WorkflowDefinition( name=f"Dispatch Workflow {uuid.uuid4().hex[:8]}", config=build_preset_workflow_config("still_with_exports", {"width": 640, "height": 640}), @@ -200,7 +246,7 @@ async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results calls.append((task_name, args, kwargs)) return type("Result", (), {"id": f"task-{len(calls)}"})() - context_id = str(uuid.uuid4()) + context_id = str(order_line.id) monkeypatch.setattr("app.tasks.celery_app.celery_app.send_task", _fake_send_task) response = await client.post( f"/api/workflows/{workflow_definition.id}/dispatch", @@ -235,6 +281,8 @@ async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results assert node_results["render"]["output"]["task_id"] == "task-1" assert node_results["blend"]["status"] == "queued" assert node_results["blend"]["output"]["task_id"] == "task-2" - assert node_results["setup"]["status"] == "skipped" - assert node_results["template"]["status"] == "skipped" + assert node_results["setup"]["status"] == "completed" + assert node_results["setup"]["output"]["order_line_id"] == str(order_line.id) + assert node_results["template"]["status"] == "completed" + assert node_results["template"]["output"]["use_materials"] is False assert node_results["output"]["status"] == "skipped" diff --git a/backend/tests/domains/test_workflow_graph_runtime.py b/backend/tests/domains/test_workflow_graph_runtime.py new file mode 100644 index 0000000..0d3c140 --- /dev/null +++ b/backend/tests/domains/test_workflow_graph_runtime.py @@ -0,0 +1,241 @@ +from __future__ import annotations + +import os +import uuid +from pathlib import Path +from types import SimpleNamespace + +import pytest +from sqlalchemy import create_engine, select, text +from sqlalchemy.orm import Session, selectinload + +from app.database import Base +from app.domains.auth.models import User, UserRole +from app.domains.materials.models import AssetLibrary +from app.domains.orders.models import Order, OrderLine, OrderStatus +from app.domains.products.models import CadFile, Product +from app.domains.rendering.models import OutputType, RenderTemplate, WorkflowRun +from app.domains.rendering.workflow_executor import prepare_workflow_context +from app.domains.rendering.workflow_graph_runtime import execute_graph_workflow +from app.domains.rendering.workflow_run_service import create_workflow_run + +import app.models # noqa: F401 + + +TEST_DB_URL = os.environ.get( + "TEST_DATABASE_URL", + "postgresql+asyncpg://hartomat:hartomat@localhost:5432/hartomat_test", +).replace("+asyncpg", "") + + +@pytest.fixture +def sync_session(): + engine = create_engine(TEST_DB_URL) + with engine.begin() as conn: + Base.metadata.create_all(conn) + + session = Session(engine) + try: + yield session + finally: + session.close() + with engine.begin() as conn: + conn.execute(text("DROP SCHEMA public CASCADE")) + conn.execute(text("CREATE SCHEMA public")) + engine.dispose() + + +def _seed_renderable_order_line( + session: Session, + tmp_path: Path, + *, + with_blank_materials: bool = False, +) -> OrderLine: + step_path = tmp_path / "cad" / "bearing.step" + step_path.parent.mkdir(parents=True, exist_ok=True) + step_path.write_text("STEP", encoding="utf-8") + + user = User( + id=uuid.uuid4(), + email=f"graph-{uuid.uuid4().hex[:8]}@test.local", + password_hash="hash", + full_name="Graph Runtime Tester", + role=UserRole.admin, + is_active=True, + ) + cad_file = CadFile( + id=uuid.uuid4(), + original_name="bearing.step", + stored_path=str(step_path), + file_hash=f"hash-{uuid.uuid4().hex}", + parsed_objects={"objects": ["InnerRing", "OuterRing"]}, + ) + product = Product( + id=uuid.uuid4(), + pim_id=f"P-{uuid.uuid4().hex[:8]}", + name="Bearing A", + category_key="bearings", + cad_file_id=cad_file.id, + cad_file=cad_file, + components=[ + {"part_name": "InnerRing", "material": "Steel"}, + {"part_name": "OuterRing", "material": "Rubber"}, + ], + cad_part_materials=( + [] + if with_blank_materials + else [ + {"part_name": "InnerRing", "material": "Steel raw"}, + {"part_name": "OuterRing", "material": "Steel raw"}, + ] + ), + ) + output_type = OutputType( + id=uuid.uuid4(), + name=f"Still-{uuid.uuid4().hex[:6]}", + renderer="blender", + output_format="png", + render_settings={"width": 1600, "height": 900}, + ) + order = Order( + id=uuid.uuid4(), + order_number=f"ORD-{uuid.uuid4().hex[:8]}", + status=OrderStatus.processing, + created_by=user.id, + ) + line = OrderLine( + id=uuid.uuid4(), + order_id=order.id, + product_id=product.id, + product=product, + output_type_id=output_type.id, + output_type=output_type, + render_status="pending", + ) + + session.add_all([user, cad_file, product, output_type, order, line]) + session.flush() + + session.add( + AssetLibrary( + id=uuid.uuid4(), + name="Default Library", + blend_file_path="/libraries/materials.blend", + is_active=True, + ) + ) + session.add( + RenderTemplate( + id=uuid.uuid4(), + name="Bearing Studio", + category_key="bearings", + blend_file_path="/templates/bearing.blend", + original_filename="bearing.blend", + target_collection="Product", + material_replace_enabled=True, + lighting_only=False, + is_active=True, + output_types=[output_type], + ) + ) + session.commit() + return line + + +def test_execute_graph_workflow_persists_bridge_outputs_and_queues_render_task( + sync_session, + tmp_path, + monkeypatch, +): + from app.config import settings + + monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) + queued_thumbnail: list[tuple[str, dict[str, str]]] = [] + + line = _seed_renderable_order_line(sync_session, tmp_path, with_blank_materials=True) + + monkeypatch.setattr( + "app.domains.pipeline.tasks.render_thumbnail.regenerate_thumbnail.delay", + lambda cad_file_id, part_colors: queued_thumbnail.append((cad_file_id, part_colors)), + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_runtime_services.extract_bbox_from_step_cadquery", + lambda step_path: { + "dimensions_mm": {"x": 12.5, "y": 20.0, "z": 7.5}, + "bbox_center_mm": {"x": 6.25, "y": 10.0, "z": 3.75}, + }, + ) + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + lambda task_name, args, kwargs: SimpleNamespace(id=f"task-{len(args)}"), + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "materials", "step": "material_map_resolve", "params": {}}, + {"id": "autofill", "step": "auto_populate_materials", "params": {}}, + {"id": "bbox", "step": "glb_bbox", "params": {}}, + {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 1024}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "materials"}, + {"from": "materials", "to": "autofill"}, + {"from": "autofill", "to": "bbox"}, + {"from": "bbox", "to": "render"}, + ], + }, + context_id=str(line.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + sync_session.refresh(line.product) + + assert dispatch_result.task_ids == ["task-1"] + assert refreshed_run.status == "pending" + assert refreshed_run.celery_task_id == "task-1" + + assert node_results["setup"].status == "completed" + assert node_results["setup"].output["cad_file_id"] == str(line.product.cad_file_id) + assert node_results["template"].status == "completed" + assert node_results["template"].output["template_name"] == "Bearing Studio" + assert node_results["materials"].status == "completed" + assert node_results["materials"].output["material_map_count"] == 0 + assert node_results["autofill"].status == "completed" + assert node_results["autofill"].output["updated_product_count"] == 1 + assert node_results["autofill"].output["queued_thumbnail_regeneration"] is True + assert node_results["bbox"].status == "completed" + assert node_results["bbox"].output["has_bbox"] is True + assert node_results["render"].status == "queued" + assert node_results["render"].output["task_id"] == "task-1" + + assert line.product.cad_part_materials == [ + {"part_name": "InnerRing", "material": "Steel"}, + {"part_name": "OuterRing", "material": "Rubber"}, + ] + assert queued_thumbnail == [ + ( + str(line.product.cad_file_id), + {"InnerRing": "Steel", "OuterRing": "Rubber"}, + ) + ] diff --git a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md index a21e4b2..c3c4f99 100644 --- a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md +++ b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md @@ -27,10 +27,10 @@ ### Phase 4 - [x] Workflow context introduced -- [ ] Node outputs are persisted and reusable +- [x] Node outputs are persisted and reusable - [ ] Graph runtime supports legacy fallback - [ ] `legacy`, `graph`, and `shadow` modes exist -- Progress: Phase 4 foundation now persists `WorkflowRun` and initial `WorkflowNodeResult` records for both linked workflow dispatch and `/api/workflows/{id}/dispatch`, while keeping the legacy preset dispatcher as the safe default fallback. +- Progress: Graph dispatch now executes the extracted bridge nodes (`order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, `glb_bbox`) synchronously, persists per-node outputs/logs/durations onto `WorkflowNodeResult`, and continues to queue render/export nodes through Celery without changing the legacy preset dispatcher. ### Phase 5 diff --git a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md index f548fab..c9f38b9 100644 --- a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md +++ b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md @@ -71,8 +71,8 @@ ### Tickets - `E4-T1` Introduce `WorkflowContext`. `completed` -- `E4-T2` Refactor executor to process nodes against context and node outputs. -- `E4-T3` Persist node-level run records, logs, timings, and outputs. +- `E4-T2` Refactor executor to process nodes against context and node outputs. `completed` +- `E4-T3` Persist node-level run records, logs, timings, and outputs. `completed` - `E4-T4` Support retry and failure policies. - `E4-T5` Add execution mode switch: `legacy`, `graph`, `shadow`. - `E4-T6` Add hard fallback to legacy dispatch on graph failure. diff --git a/docs/workflows/WORKFLOW_MIGRATION_PLAN.md b/docs/workflows/WORKFLOW_MIGRATION_PLAN.md index 0a30609..8e36c7d 100644 --- a/docs/workflows/WORKFLOW_MIGRATION_PLAN.md +++ b/docs/workflows/WORKFLOW_MIGRATION_PLAN.md @@ -80,6 +80,7 @@ Notes: - Introduce `WorkflowContext` and node-by-node execution with persistent run state. - Support node outputs and artifact handoff across edges. - Keep `legacy`, `graph`, and `shadow` execution modes. +- Current slice: graph dispatch executes extracted bridge nodes for order-line setup, template/material resolution, auto-material population, and bounding-box resolution before queueing render/export tasks. ### Phase 5: Workflow Editor Parity