from __future__ import annotations import os import uuid from pathlib import Path from types import SimpleNamespace import pytest from sqlalchemy import create_engine, select, text from sqlalchemy.orm import Session, selectinload from app.database import Base from app.core.process_steps import StepName from app.domains.auth.models import User, UserRole from app.domains.materials.models import AssetLibrary from app.domains.orders.models import Order, OrderLine, OrderStatus from app.domains.products.models import CadFile, Product from app.domains.rendering.models import OutputType, RenderTemplate, WorkflowRun from app.domains.rendering.workflow_executor import prepare_workflow_context from app.domains.rendering.workflow_graph_runtime import ( _build_task_kwargs, WorkflowGraphState, WorkflowGraphRuntimeError, execute_graph_workflow, find_unsupported_graph_nodes, ) from app.domains.rendering.workflow_run_service import create_workflow_run from app.domains.rendering.workflow_runtime_services import OrderLineRenderSetupResult import app.models # noqa: F401 from tests.db_test_utils import reset_public_schema_sync, resolve_test_db_url @pytest.fixture def sync_session(): engine = create_engine(resolve_test_db_url(async_driver=False)) with engine.begin() as conn: reset_public_schema_sync(conn) Base.metadata.create_all(conn) session = Session(engine) try: yield session finally: session.close() with engine.begin() as conn: reset_public_schema_sync(conn) engine.dispose() def _seed_renderable_order_line( session: Session, tmp_path: Path, *, with_blank_materials: bool = False, ) -> OrderLine: step_path = tmp_path / "cad" / "bearing.step" step_path.parent.mkdir(parents=True, exist_ok=True) step_path.write_text("STEP", encoding="utf-8") user = User( id=uuid.uuid4(), email=f"graph-{uuid.uuid4().hex[:8]}@test.local", password_hash="hash", full_name="Graph Runtime Tester", role=UserRole.admin, is_active=True, ) cad_file = CadFile( id=uuid.uuid4(), original_name="bearing.step", stored_path=str(step_path), file_hash=f"hash-{uuid.uuid4().hex}", parsed_objects={"objects": ["InnerRing", "OuterRing"]}, ) product = Product( id=uuid.uuid4(), pim_id=f"P-{uuid.uuid4().hex[:8]}", name="Bearing A", category_key="bearings", cad_file_id=cad_file.id, cad_file=cad_file, components=[ {"part_name": "InnerRing", "material": "Steel"}, {"part_name": "OuterRing", "material": "Rubber"}, ], cad_part_materials=( [] if with_blank_materials else [ {"part_name": "InnerRing", "material": "Steel raw"}, {"part_name": "OuterRing", "material": "Steel raw"}, ] ), ) output_type = OutputType( id=uuid.uuid4(), name=f"Still-{uuid.uuid4().hex[:6]}", renderer="blender", output_format="png", render_settings={"width": 1600, "height": 900}, ) order = Order( id=uuid.uuid4(), order_number=f"ORD-{uuid.uuid4().hex[:8]}", status=OrderStatus.processing, created_by=user.id, ) line = OrderLine( id=uuid.uuid4(), order_id=order.id, product_id=product.id, product=product, output_type_id=output_type.id, output_type=output_type, render_status="pending", ) session.add_all([user, cad_file, product, output_type, order, line]) session.flush() session.add( AssetLibrary( id=uuid.uuid4(), name="Default Library", blend_file_path="/libraries/materials.blend", is_active=True, ) ) session.add( RenderTemplate( id=uuid.uuid4(), name="Bearing Studio", category_key="bearings", blend_file_path="/templates/bearing.blend", original_filename="bearing.blend", target_collection="Product", material_replace_enabled=True, lighting_only=False, is_active=True, output_types=[output_type], ) ) session.commit() return line def _seed_cad_file_only(session: Session, tmp_path: Path) -> CadFile: step_path = tmp_path / "cad-only" / "thumbnail.step" step_path.parent.mkdir(parents=True, exist_ok=True) step_path.write_text("STEP", encoding="utf-8") cad_file = CadFile( id=uuid.uuid4(), original_name="thumbnail.step", stored_path=str(step_path), file_hash=f"hash-{uuid.uuid4().hex}", parsed_objects={"objects": ["Body"]}, ) session.add(cad_file) session.commit() return cad_file def test_execute_graph_workflow_persists_bridge_outputs_and_queues_render_task( sync_session, tmp_path, monkeypatch, ): from app.config import settings monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) queued_thumbnail: list[tuple[str, dict[str, str]]] = [] line = _seed_renderable_order_line(sync_session, tmp_path, with_blank_materials=True) monkeypatch.setattr( "app.domains.pipeline.tasks.render_thumbnail.regenerate_thumbnail.delay", lambda cad_file_id, part_colors: queued_thumbnail.append((cad_file_id, part_colors)), ) monkeypatch.setattr( "app.domains.rendering.workflow_runtime_services.extract_bbox_from_step_cadquery", lambda step_path: { "dimensions_mm": {"x": 12.5, "y": 20.0, "z": 7.5}, "bbox_center_mm": {"x": 6.25, "y": 10.0, "z": 3.75}, }, ) monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: SimpleNamespace(id=f"task-{len(args)}"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "materials", "step": "material_map_resolve", "params": {}}, {"id": "autofill", "step": "auto_populate_materials", "params": {}}, {"id": "bbox", "step": "glb_bbox", "params": {}}, {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 1024}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "materials"}, {"from": "materials", "to": "autofill"}, {"from": "autofill", "to": "bbox"}, {"from": "bbox", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} sync_session.refresh(line.product) assert dispatch_result.task_ids == ["task-1"] assert refreshed_run.status == "pending" assert refreshed_run.celery_task_id == "task-1" assert node_results["setup"].status == "completed" assert node_results["setup"].output["cad_file_id"] == str(line.product.cad_file_id) assert node_results["template"].status == "completed" assert node_results["template"].output["template_name"] == "Bearing Studio" assert node_results["materials"].status == "completed" assert node_results["materials"].output["material_map_count"] == 0 assert node_results["autofill"].status == "completed" assert node_results["autofill"].output["updated_product_count"] == 1 assert node_results["autofill"].output["queued_thumbnail_regeneration"] is True assert node_results["bbox"].status == "completed" assert node_results["bbox"].output["has_bbox"] is True assert node_results["render"].status == "queued" assert node_results["render"].output["task_id"] == "task-1" assert line.product.cad_part_materials == [ {"part_name": "InnerRing", "material": "Steel"}, {"part_name": "OuterRing", "material": "Rubber"}, ] assert queued_thumbnail == [ ( str(line.product.cad_file_id), {"InnerRing": "Steel", "OuterRing": "Rubber"}, ) ] def test_execute_graph_workflow_routes_cad_thumbnail_save_using_upstream_threejs_request( sync_session, tmp_path, monkeypatch, ): cad_file = _seed_cad_file_only(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-threejs-thumb") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "threejs", "step": "threejs_render", "params": {"width": 640, "height": 480, "transparent_bg": True}, }, {"id": "save", "step": "thumbnail_save", "params": {}}, ], "edges": [ {"from": "threejs", "to": "save"}, ], }, context_id=str(cad_file.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) assert find_unsupported_graph_nodes(workflow_context) == [] dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-threejs-thumb"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.tasks.step_tasks.render_graph_thumbnail" assert send_calls[0][1] == [str(cad_file.id)] assert send_calls[0][2]["renderer"] == "threejs" assert send_calls[0][2]["width"] == 640 assert send_calls[0][2]["height"] == 480 assert send_calls[0][2]["transparent_bg"] is True assert node_results["threejs"].status == "completed" assert node_results["threejs"].output["renderer"] == "threejs" assert node_results["save"].status == "queued" assert node_results["save"].output["predicted_output_path"].endswith(f"{cad_file.id}.png") def test_execute_graph_workflow_completes_cad_bridge_only_nodes_without_queueing( sync_session, tmp_path, monkeypatch, ): cad_file = _seed_cad_file_only(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="unexpected-task") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "resolve", "step": "resolve_step_path", "params": {}}, {"id": "stl", "step": "stl_cache_generate", "params": {}}, ], "edges": [ {"from": "resolve", "to": "stl"}, ], }, context_id=str(cad_file.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) assert find_unsupported_graph_nodes(workflow_context) == [] dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == [] assert send_calls == [] assert refreshed_run.status == "completed" assert refreshed_run.completed_at is not None assert node_results["resolve"].status == "completed" assert node_results["resolve"].output["cad_file_id"] == str(cad_file.id) assert node_results["resolve"].output["processing_status"] == "pending" assert node_results["resolve"].output["step_path"].endswith("thumbnail.step") assert node_results["stl"].status == "completed" assert node_results["stl"].output["cache_mode"] == "compatibility_noop" assert node_results["stl"].output["cache_required"] is False assert node_results["stl"].output["cad_file_id"] == str(cad_file.id) def test_execute_graph_workflow_routes_each_thumbnail_save_to_its_connected_request( sync_session, tmp_path, monkeypatch, ): cad_file = _seed_cad_file_only(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id=f"task-{len(send_calls)}") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "blender", "step": "blender_render", "params": {"width": 512, "height": 512, "samples": 64}, }, { "id": "threejs", "step": "threejs_render", "params": {"width": 640, "height": 480, "transparent_bg": True}, }, {"id": "save_blender", "step": "thumbnail_save", "params": {}}, {"id": "save_threejs", "step": "thumbnail_save", "params": {}}, ], "edges": [ {"from": "blender", "to": "save_blender"}, {"from": "threejs", "to": "save_threejs"}, ], }, context_id=str(cad_file.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-1", "task-2"] assert [call[0] for call in send_calls] == [ "app.tasks.step_tasks.render_graph_thumbnail", "app.tasks.step_tasks.render_graph_thumbnail", ] assert send_calls[0][2]["renderer"] == "blender" assert send_calls[0][2]["width"] == 512 assert send_calls[0][2]["height"] == 512 assert send_calls[1][2]["renderer"] == "threejs" assert send_calls[1][2]["width"] == 640 assert send_calls[1][2]["height"] == 480 assert send_calls[1][2]["transparent_bg"] is True assert send_calls[0][2]["workflow_node_id"] == "save_blender" assert send_calls[1][2]["workflow_node_id"] == "save_threejs" assert node_results["save_blender"].status == "queued" assert node_results["save_threejs"].status == "queued" def test_execute_graph_workflow_merges_legacy_render_context_into_still_task_kwargs( sync_session, tmp_path, monkeypatch, ): from app.config import settings monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) line = _seed_renderable_order_line(sync_session, tmp_path) asset_library = sync_session.execute( select(AssetLibrary).where(AssetLibrary.is_active == True) # noqa: E712 ).scalar_one() asset_library_path = Path(settings.upload_dir) / "asset-libraries" / f"{asset_library.id}.blend" asset_library_path.parent.mkdir(parents=True, exist_ok=True) asset_library_path.write_text("BLEND", encoding="utf-8") assert line.output_type is not None line.output_type.render_settings = { "width": 2048, "height": 2048, "engine": "cycles", "samples": 128, "noise_threshold": "0.05", } line.output_type.transparent_bg = True line.output_type.cycles_device = "cuda" line.render_overrides = { "height": 1440, "samples": 96, "denoiser": "OPENIMAGEDENOISE", } sync_session.commit() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-merged-still") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", lambda _session, _line: SimpleNamespace( rotation_x=15.0, rotation_y=25.0, rotation_z=35.0, focal_length_mm=85.0, sensor_width_mm=36.0, ), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {"width": 1024, "samples": 32}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="shadow", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-merged-still"] assert len(send_calls) == 1 task_name, args, kwargs = send_calls[0] assert task_name == "app.domains.rendering.tasks.render_order_line_still_task" assert args == [str(line.id)] assert kwargs["width"] == 2048 assert kwargs["height"] == 1440 assert kwargs["engine"] == "cycles" assert kwargs["samples"] == 96 assert kwargs["noise_threshold"] == "0.05" assert kwargs["denoiser"] == "OPENIMAGEDENOISE" assert kwargs["transparent_bg"] is True assert kwargs["cycles_device"] == "cuda" assert kwargs["template_path"] == "/templates/bearing.blend" assert kwargs["target_collection"] == "Product" assert kwargs["material_library_path"] == str(asset_library_path) assert kwargs["material_map"] is not None assert kwargs["part_colors"] == {"InnerRing": "Steel raw", "OuterRing": "Steel raw"} assert kwargs["part_names_ordered"] == ["InnerRing", "OuterRing"] assert kwargs["rotation_x"] == 15.0 assert kwargs["rotation_y"] == 25.0 assert kwargs["rotation_z"] == 35.0 assert kwargs["focal_length_mm"] == 85.0 assert kwargs["sensor_width_mm"] == 36.0 assert kwargs["publish_asset_enabled"] is False assert kwargs["emit_events"] is False assert kwargs["job_document_enabled"] is False assert kwargs["output_name_suffix"].startswith("shadow-") def test_build_task_kwargs_autoscales_default_samples_via_shared_render_invocation( tmp_path, monkeypatch, ): step_path = tmp_path / "cad" / "bearing.step" step_path.parent.mkdir(parents=True, exist_ok=True) step_path.write_text("STEP", encoding="utf-8") output_type = OutputType( id=uuid.uuid4(), name="Still Preview", renderer="blender", output_format="png", render_settings={"width": 1024, "height": 512}, ) output_type.invocation_overrides = {"samples": 128, "engine": "cycles"} cad_file = CadFile( id=uuid.uuid4(), original_name="bearing.step", stored_path=str(step_path), file_hash="hash-graph-1", parsed_objects={"objects": ["InnerRing", "OuterRing"]}, ) product = Product( id=uuid.uuid4(), pim_id="P-graph-1", name="Bearing G", category_key="bearings", cad_file_id=cad_file.id, cad_file=cad_file, ) line = OrderLine( id=uuid.uuid4(), order_id=uuid.uuid4(), product_id=product.id, product=product, output_type_id=output_type.id, output_type=output_type, ) state = WorkflowGraphState( setup=OrderLineRenderSetupResult( status="ready", order_line=line, cad_file=cad_file, part_colors={"InnerRing": "Steel raw"}, ) ) workflow_context = SimpleNamespace( workflow_run_id=uuid.uuid4(), execution_mode="shadow", ordered_nodes=[], edges=[], ) node = SimpleNamespace(id="render", step=StepName.BLENDER_STILL, params={}) monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", lambda _session, _line: SimpleNamespace( rotation_x=0.0, rotation_y=0.0, rotation_z=0.0, focal_length_mm=None, sensor_width_mm=None, ), ) kwargs = _build_task_kwargs( session=object(), workflow_context=workflow_context, state=state, node=node, ) assert kwargs["width"] == 1024 assert kwargs["height"] == 512 assert kwargs["samples"] == 64 def test_execute_graph_workflow_respects_custom_render_settings_opt_in_for_still_task( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) assert line.output_type is not None line.output_type.render_settings = { "width": 2048, "height": 2048, "engine": "cycles", "samples": 128, } sync_session.commit() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-custom-still") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, { "id": "render", "step": "blender_still", "params": { "use_custom_render_settings": True, "width": 1024, "height": 768, "samples": 32, "render_engine": "eevee", }, }, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-custom-still"] assert len(send_calls) == 1 task_name, args, kwargs = send_calls[0] assert task_name == "app.domains.rendering.tasks.render_order_line_still_task" assert args == [str(line.id)] assert kwargs["width"] == 1024 assert kwargs["height"] == 768 assert kwargs["samples"] == 32 assert kwargs["render_engine"] == "eevee" assert kwargs["engine"] == "cycles" def test_execute_graph_workflow_preserves_turntable_timing_without_custom_render_settings( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) assert line.output_type is not None line.output_type.render_settings = { "width": 2048, "height": 2048, "engine": "cycles", "samples": 128, "fps": 30, "frame_count": 180, "bg_color": "#ffffff", "turntable_axis": "world_y", } line.output_type.transparent_bg = True line.output_type.cycles_device = "gpu" line.material_override = "Studio White" sync_session.commit() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-turntable") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", lambda _session, _line: SimpleNamespace( rotation_x=10.0, rotation_y=20.0, rotation_z=30.0, focal_length_mm=70.0, sensor_width_mm=35.0, ), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, { "id": "render", "step": "blender_turntable", "params": { "width": 1024, "samples": 32, "fps": 24, "duration_s": 5, "frame_count": 120, }, }, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="shadow", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-turntable"] assert len(send_calls) == 1 task_name, args, kwargs = send_calls[0] assert task_name == "app.domains.rendering.tasks.render_turntable_task" assert args == [str(line.id)] assert kwargs["width"] == 2048 assert kwargs["height"] == 2048 assert kwargs["samples"] == 128 assert kwargs["transparent_bg"] is True assert kwargs["cycles_device"] == "gpu" assert kwargs["fps"] == 24 assert kwargs["frame_count"] == 120 assert kwargs["bg_color"] == "#ffffff" assert kwargs["turntable_axis"] == "world_y" assert kwargs["rotation_x"] == 10.0 assert kwargs["rotation_y"] == 20.0 assert kwargs["rotation_z"] == 30.0 assert kwargs["focal_length_mm"] == 70.0 assert kwargs["sensor_width_mm"] == 35.0 assert kwargs["material_override"] == "Studio White" assert kwargs["output_name_suffix"].startswith("shadow-") def test_execute_graph_workflow_retries_bridge_node_and_persists_attempt_metadata( sync_session, monkeypatch, ): attempts = {"count": 0} def _flaky_prepare(_session, _context_id): attempts["count"] += 1 if attempts["count"] == 1: raise RuntimeError("temporary setup failure") return OrderLineRenderSetupResult(status="skip", reason="line_cancelled") monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context", _flaky_prepare, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "setup", "step": "order_line_setup", "params": {"retry_policy": {"max_attempts": 2}}, }, ], "edges": [], }, context_id=str(uuid.uuid4()), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup") assert dispatch_result.task_ids == [] assert refreshed_run.status == "completed" assert setup_result.status == "skipped" assert setup_result.output["attempt_count"] == 2 assert setup_result.output["max_attempts"] == 2 assert setup_result.output["retry_state"] == "recovered" assert setup_result.output["last_error"] == "temporary setup failure" assert setup_result.output["retry_policy"]["max_attempts"] == 2 def test_execute_graph_workflow_marks_failed_node_with_retry_exhausted_metadata( sync_session, monkeypatch, ): monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context", lambda _session, _context_id: (_ for _ in ()).throw(RuntimeError("permanent setup failure")), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "setup", "step": "order_line_setup", "params": { "retry_policy": {"max_attempts": 2}, "failure_policy": {"fallback_to_legacy": True}, }, }, ], "edges": [], }, context_id=str(uuid.uuid4()), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) with pytest.raises(WorkflowGraphRuntimeError, match="permanent setup failure"): execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup") assert setup_result.status == "failed" assert setup_result.output["attempt_count"] == 2 assert setup_result.output["max_attempts"] == 2 assert setup_result.output["retry_exhausted"] is True assert setup_result.output["last_error"] == "permanent setup failure" assert setup_result.output["failure_policy"]["fallback_to_legacy"] is True def test_execute_graph_workflow_supports_output_save_bridge_node( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) or SimpleNamespace(id="task-output-save"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, {"id": "output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, {"from": "render", "to": "output"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) assert find_unsupported_graph_nodes(workflow_context) == [] dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-output-save"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" assert send_calls[0][1] == [str(line.id)] assert send_calls[0][2]["publish_asset_enabled"] is False assert send_calls[0][2]["graph_authoritative_output_enabled"] is True assert send_calls[0][2]["graph_output_node_ids"] == ["output"] assert node_results["render"].status == "queued" assert node_results["output"].status == "completed" assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" assert node_results["output"].output["order_line_id"] == str(line.id) assert node_results["output"].output["artifact_count"] == 1 assert node_results["output"].output["upstream_artifacts"] == [ { "node_id": "render", "artifact_role": "render_output", "predicted_output_path": str( tmp_path / "cad" / "renders" / f"line_{line.id}.png" ), "predicted_asset_type": "still", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["output"], "notify_handoff_enabled": False, "task_id": "task-output-save", } ] def test_execute_graph_workflow_arms_output_save_handoff_for_export_blend( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) or SimpleNamespace(id="task-blend-output-save"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "blend", "step": "export_blend", "params": {}}, {"id": "output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "blend"}, {"from": "blend", "to": "output"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-blend-output-save"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.export_blend_for_order_line_task" assert send_calls[0][1] == [str(line.id)] assert send_calls[0][2]["publish_asset_enabled"] is False assert send_calls[0][2]["graph_authoritative_output_enabled"] is True assert send_calls[0][2]["graph_output_node_ids"] == ["output"] assert node_results["blend"].status == "queued" assert node_results["output"].status == "completed" assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" assert node_results["output"].output["artifact_count"] == 1 assert node_results["output"].output["upstream_artifacts"] == [ { "node_id": "blend", "artifact_role": "blend_export", "predicted_output_path": str(tmp_path / "cad" / "bearing_production.blend"), "predicted_asset_type": "blend_production", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["output"], "notify_handoff_enabled": False, "task_id": "task-blend-output-save", } ] def test_execute_graph_workflow_arms_output_save_handoff_for_turntable( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) or SimpleNamespace(id="task-turntable-output-save"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, {"id": "output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "turntable"}, {"from": "turntable", "to": "output"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-turntable-output-save"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_turntable_task" assert send_calls[0][1] == [str(line.id)] assert send_calls[0][2]["publish_asset_enabled"] is False assert send_calls[0][2]["graph_authoritative_output_enabled"] is True assert send_calls[0][2]["graph_output_node_ids"] == ["output"] assert send_calls[0][2]["workflow_node_id"] == "turntable" assert node_results["turntable"].status == "queued" assert node_results["output"].status == "completed" assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" assert node_results["output"].output["artifact_count"] == 1 assert node_results["output"].output["upstream_artifacts"] == [ { "node_id": "turntable", "artifact_role": "turntable_output", "predicted_output_path": str(tmp_path / "cad" / "renders" / "turntable.mp4"), "predicted_asset_type": "turntable", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["output"], "notify_handoff_enabled": False, "task_id": "task-turntable-output-save", } ] def test_execute_graph_workflow_routes_output_save_handoffs_per_connected_branch( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id=f"task-branch-{len(send_calls)}") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "still", "step": "blender_still", "params": {"width": 1024, "height": 768}}, {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, {"id": "still_output", "step": "output_save", "params": {}}, {"id": "turntable_output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "still"}, {"from": "template", "to": "turntable"}, {"from": "still", "to": "still_output"}, {"from": "turntable", "to": "turntable_output"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-branch-1", "task-branch-2"] assert len(send_calls) == 2 assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" assert send_calls[0][2]["graph_output_node_ids"] == ["still_output"] assert send_calls[1][0] == "app.domains.rendering.tasks.render_turntable_task" assert send_calls[1][2]["graph_output_node_ids"] == ["turntable_output"] assert node_results["still_output"].output["artifact_count"] == 1 assert node_results["still_output"].output["upstream_artifacts"] == [ { "node_id": "still", "artifact_role": "render_output", "predicted_output_path": str(tmp_path / "cad" / "renders" / f"line_{line.id}.png"), "predicted_asset_type": "still", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["still_output"], "notify_handoff_enabled": False, "task_id": "task-branch-1", } ] assert node_results["turntable_output"].output["artifact_count"] == 1 assert node_results["turntable_output"].output["upstream_artifacts"] == [ { "node_id": "turntable", "artifact_role": "turntable_output", "predicted_output_path": str(tmp_path / "cad" / "renders" / "turntable.mp4"), "predicted_asset_type": "turntable", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["turntable_output"], "notify_handoff_enabled": False, "task_id": "task-branch-2", } ] def test_execute_graph_workflow_keeps_self_publish_when_no_output_save_node( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-no-output-save") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-no-output-save"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" assert send_calls[0][2].get("publish_asset_enabled", True) is True assert send_calls[0][2].get("graph_authoritative_output_enabled") in (None, False) assert send_calls[0][2].get("graph_output_node_ids") in (None, []) def test_execute_graph_workflow_arms_notify_handoff_for_graph_render_task( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-notify-handoff") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {}}, {"id": "notify", "step": "notify", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, {"from": "render", "to": "notify"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-notify-handoff"] assert len(send_calls) == 1 assert send_calls[0][2]["emit_legacy_notifications"] is True assert send_calls[0][2]["graph_notify_node_ids"] == ["notify"] assert node_results["render"].output["graph_notify_node_ids"] == ["notify"] assert node_results["notify"].status == "completed" assert node_results["notify"].output["notification_mode"] == "deferred_to_render_task" assert node_results["notify"].output["armed_node_ids"] == ["render"] def test_execute_graph_workflow_routes_notify_handoffs_per_connected_branch( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id=f"task-notify-branch-{len(send_calls)}") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "still", "step": "blender_still", "params": {}}, {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, {"id": "still_notify", "step": "notify", "params": {}}, {"id": "turntable_notify", "step": "notify", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "still"}, {"from": "template", "to": "turntable"}, {"from": "still", "to": "still_notify"}, {"from": "turntable", "to": "turntable_notify"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-notify-branch-1", "task-notify-branch-2"] assert len(send_calls) == 2 assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" assert send_calls[0][2]["emit_legacy_notifications"] is True assert send_calls[0][2]["graph_notify_node_ids"] == ["still_notify"] assert send_calls[1][0] == "app.domains.rendering.tasks.render_turntable_task" assert send_calls[1][2]["emit_legacy_notifications"] is True assert send_calls[1][2]["graph_notify_node_ids"] == ["turntable_notify"] assert node_results["still"].output["graph_notify_node_ids"] == ["still_notify"] assert node_results["turntable"].output["graph_notify_node_ids"] == ["turntable_notify"] assert node_results["still_notify"].status == "completed" assert node_results["still_notify"].output["armed_node_ids"] == ["still"] assert node_results["turntable_notify"].status == "completed" assert node_results["turntable_notify"].output["armed_node_ids"] == ["turntable"] def test_execute_graph_workflow_suppresses_notify_node_in_shadow_mode( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-shadow-notify") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {}}, {"id": "notify", "step": "notify", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, {"from": "render", "to": "notify"}, ], }, context_id=str(line.id), execution_mode="shadow", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-shadow-notify"] assert len(send_calls) == 1 assert send_calls[0][2].get("emit_legacy_notifications", False) is False assert node_results["notify"].status == "skipped" assert node_results["notify"].output["notification_mode"] == "shadow_suppressed"