from __future__ import annotations import os import uuid from pathlib import Path from types import SimpleNamespace import pytest from sqlalchemy import select, text from sqlalchemy.orm import Session, selectinload from app.core.render_paths import build_order_line_export_path, build_order_line_step_render_path from app.core.process_steps import StepName from app.domains.auth.models import User, UserRole from app.domains.materials.models import AssetLibrary from app.domains.orders.models import Order, OrderLine, OrderStatus from app.domains.products.models import CadFile, Product from app.domains.rendering.models import OutputType, RenderTemplate, WorkflowRun from app.domains.rendering.workflow_executor import prepare_workflow_context from app.domains.rendering.workflow_graph_runtime import ( _build_task_kwargs, WorkflowGraphState, WorkflowGraphRuntimeError, execute_graph_workflow, find_unsupported_graph_nodes, ) from app.domains.rendering.workflow_run_service import create_workflow_run from app.domains.rendering.workflow_runtime_services import OrderLineRenderSetupResult from tests.db_test_utils import sync_test_session as sync_test_session_ctx @pytest.fixture def sync_session(): with sync_test_session_ctx() as session: yield session def _seed_renderable_order_line( session: Session, tmp_path: Path, *, with_blank_materials: bool = False, ) -> OrderLine: step_path = tmp_path / "cad" / "bearing.step" step_path.parent.mkdir(parents=True, exist_ok=True) step_path.write_text("STEP", encoding="utf-8") user = User( id=uuid.uuid4(), email=f"graph-{uuid.uuid4().hex[:8]}@test.local", password_hash="hash", full_name="Graph Runtime Tester", role=UserRole.admin, is_active=True, ) cad_file = CadFile( id=uuid.uuid4(), original_name="bearing.step", stored_path=str(step_path), file_hash=f"hash-{uuid.uuid4().hex}", parsed_objects={"objects": ["InnerRing", "OuterRing"]}, ) product = Product( id=uuid.uuid4(), pim_id=f"P-{uuid.uuid4().hex[:8]}", name="Bearing A", category_key="bearings", cad_file_id=cad_file.id, cad_file=cad_file, components=[ {"part_name": "InnerRing", "material": "Steel"}, {"part_name": "OuterRing", "material": "Rubber"}, ], cad_part_materials=( [] if with_blank_materials else [ {"part_name": "InnerRing", "material": "Steel raw"}, {"part_name": "OuterRing", "material": "Steel raw"}, ] ), ) output_type = OutputType( id=uuid.uuid4(), name=f"Still-{uuid.uuid4().hex[:6]}", renderer="blender", output_format="png", render_settings={"width": 1600, "height": 900}, ) order = Order( id=uuid.uuid4(), order_number=f"ORD-{uuid.uuid4().hex[:8]}", status=OrderStatus.processing, created_by=user.id, ) line = OrderLine( id=uuid.uuid4(), order_id=order.id, product_id=product.id, product=product, output_type_id=output_type.id, output_type=output_type, render_status="pending", ) session.add_all([user, cad_file, product, output_type, order, line]) session.flush() session.add( AssetLibrary( id=uuid.uuid4(), name="Default Library", blend_file_path="/libraries/materials.blend", is_active=True, ) ) session.add( RenderTemplate( id=uuid.uuid4(), name="Bearing Studio", category_key="bearings", blend_file_path="/templates/bearing.blend", original_filename="bearing.blend", target_collection="Product", material_replace_enabled=True, lighting_only=False, workflow_input_schema=[ { "key": "studio_variant", "label": "Studio Variant", "type": "select", "section": "Template Inputs", "default": "default", "options": [ {"value": "default", "label": "Default"}, {"value": "warm", "label": "Warm"}, ], } ], is_active=True, output_types=[output_type], ) ) session.commit() return line def _seed_cad_file_only(session: Session, tmp_path: Path) -> CadFile: step_path = tmp_path / "cad-only" / "thumbnail.step" step_path.parent.mkdir(parents=True, exist_ok=True) step_path.write_text("STEP", encoding="utf-8") cad_file = CadFile( id=uuid.uuid4(), original_name="thumbnail.step", stored_path=str(step_path), file_hash=f"hash-{uuid.uuid4().hex}", parsed_objects={"objects": ["Body"]}, ) session.add(cad_file) session.commit() return cad_file def test_execute_graph_workflow_persists_bridge_outputs_and_queues_render_task( sync_session, tmp_path, monkeypatch, ): from app.config import settings monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) queued_thumbnail: list[tuple[str, dict[str, str]]] = [] line = _seed_renderable_order_line(sync_session, tmp_path, with_blank_materials=True) monkeypatch.setattr( "app.domains.pipeline.tasks.render_thumbnail.regenerate_thumbnail.delay", lambda cad_file_id, part_colors: queued_thumbnail.append((cad_file_id, part_colors)), ) monkeypatch.setattr( "app.domains.rendering.workflow_runtime_services.extract_bbox_from_step_cadquery", lambda step_path: { "dimensions_mm": {"x": 12.5, "y": 20.0, "z": 7.5}, "bbox_center_mm": {"x": 6.25, "y": 10.0, "z": 3.75}, }, ) monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: SimpleNamespace(id=f"task-{len(args)}"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "materials", "step": "material_map_resolve", "params": {}}, {"id": "autofill", "step": "auto_populate_materials", "params": {}}, {"id": "bbox", "step": "glb_bbox", "params": {}}, {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 1024}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "materials"}, {"from": "materials", "to": "autofill"}, {"from": "autofill", "to": "bbox"}, {"from": "bbox", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} sync_session.refresh(line.product) assert dispatch_result.task_ids == ["task-1"] assert refreshed_run.status == "pending" assert refreshed_run.celery_task_id == "task-1" assert node_results["setup"].status == "completed" assert node_results["setup"].output["cad_file_id"] == str(line.product.cad_file_id) assert node_results["template"].status == "completed" assert node_results["template"].output["template_name"] == "Bearing Studio" assert node_results["materials"].status == "completed" assert node_results["materials"].output["material_map_count"] == 0 assert node_results["autofill"].status == "completed" assert node_results["autofill"].output["updated_product_count"] == 1 assert node_results["autofill"].output["queued_thumbnail_regeneration"] is True assert node_results["bbox"].status == "completed" assert node_results["bbox"].output["has_bbox"] is True assert node_results["render"].status == "queued" assert node_results["render"].output["task_id"] == "task-1" assert line.product.cad_part_materials == [ {"part_name": "InnerRing", "material": "Steel"}, {"part_name": "OuterRing", "material": "Rubber"}, ] assert queued_thumbnail == [ ( str(line.product.cad_file_id), {"InnerRing": "Steel", "OuterRing": "Rubber"}, ) ] def test_execute_graph_workflow_routes_cad_thumbnail_save_using_upstream_threejs_request( sync_session, tmp_path, monkeypatch, ): cad_file = _seed_cad_file_only(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-threejs-thumb") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "threejs", "step": "threejs_render", "params": {"width": 640, "height": 480, "transparent_bg": True}, }, {"id": "save", "step": "thumbnail_save", "params": {}}, ], "edges": [ {"from": "threejs", "to": "save"}, ], }, context_id=str(cad_file.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) assert find_unsupported_graph_nodes(workflow_context) == [] dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-threejs-thumb"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.tasks.step_tasks.render_graph_thumbnail" assert send_calls[0][1] == [str(cad_file.id)] assert send_calls[0][2]["renderer"] == "threejs" assert send_calls[0][2]["width"] == 640 assert send_calls[0][2]["height"] == 480 assert send_calls[0][2]["transparent_bg"] is True assert node_results["threejs"].status == "completed" assert node_results["threejs"].output["renderer"] == "threejs" assert node_results["save"].status == "queued" assert node_results["save"].output["predicted_output_path"].endswith(f"{cad_file.id}.png") def test_execute_graph_workflow_serializes_template_schema_and_template_inputs( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) template = sync_session.execute(select(RenderTemplate)).unique().scalar_one() monkeypatch.setattr( "app.domains.rendering.workflow_runtime_services.resolve_material_map", lambda raw_map: {key: f"resolved:{value}" for key, value in raw_map.items()}, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, { "id": "template", "step": "resolve_template", "params": { "template_id_override": str(template.id), "template_input__studio_variant": "warm", }, }, ], "edges": [ {"from": "setup", "to": "template"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == [] assert node_results["template"].status == "completed" assert node_results["template"].output["workflow_input_schema"] == template.workflow_input_schema assert node_results["template"].output["template_inputs"] == {"studio_variant": "warm"} assert node_results["template"].output["template_input_count"] == 1 def test_execute_graph_workflow_passes_template_inputs_to_still_task( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) template = sync_session.execute(select(RenderTemplate)).unique().scalar_one() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-still-template-inputs") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, { "id": "template", "step": "resolve_template", "params": { "template_id_override": str(template.id), "template_input__studio_variant": "warm", }, }, {"id": "render", "step": "blender_still", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-still-template-inputs"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" assert send_calls[0][1] == [str(line.id)] assert send_calls[0][2]["template_inputs"] == {"studio_variant": "warm"} def test_execute_graph_workflow_passes_template_inputs_and_duration_to_turntable_task( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) template = sync_session.execute(select(RenderTemplate)).unique().scalar_one() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-turntable-template-inputs") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, { "id": "template", "step": "resolve_template", "params": { "template_id_override": str(template.id), "template_input__studio_variant": "warm", }, }, { "id": "render", "step": "blender_turntable", "params": { "fps": 12, "duration_s": 7, "frame_count": 999, }, }, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-turntable-template-inputs"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_turntable_task" assert send_calls[0][1] == [str(line.id)] assert send_calls[0][2]["template_inputs"] == {"studio_variant": "warm"} assert send_calls[0][2]["duration_s"] == 7.0 assert send_calls[0][2]["fps"] == 12 assert send_calls[0][2]["frame_count"] == 84 def test_execute_graph_workflow_completes_cad_bridge_only_nodes_without_queueing( sync_session, tmp_path, monkeypatch, ): cad_file = _seed_cad_file_only(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="unexpected-task") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "resolve", "step": "resolve_step_path", "params": {}}, {"id": "stl", "step": "stl_cache_generate", "params": {}}, ], "edges": [ {"from": "resolve", "to": "stl"}, ], }, context_id=str(cad_file.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) assert find_unsupported_graph_nodes(workflow_context) == [] dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == [] assert send_calls == [] assert refreshed_run.status == "completed" assert refreshed_run.completed_at is not None assert node_results["resolve"].status == "completed" assert node_results["resolve"].output["cad_file_id"] == str(cad_file.id) assert node_results["resolve"].output["processing_status"] == "pending" assert node_results["resolve"].output["step_path"].endswith("thumbnail.step") assert node_results["stl"].status == "completed" assert node_results["stl"].output["cache_mode"] == "compatibility_noop" assert node_results["stl"].output["cache_required"] is False assert node_results["stl"].output["cad_file_id"] == str(cad_file.id) def test_execute_graph_workflow_routes_each_thumbnail_save_to_its_connected_request( sync_session, tmp_path, monkeypatch, ): cad_file = _seed_cad_file_only(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id=f"task-{len(send_calls)}") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "blender", "step": "blender_render", "params": {"width": 512, "height": 512, "samples": 64}, }, { "id": "threejs", "step": "threejs_render", "params": {"width": 640, "height": 480, "transparent_bg": True}, }, {"id": "save_blender", "step": "thumbnail_save", "params": {}}, {"id": "save_threejs", "step": "thumbnail_save", "params": {}}, ], "edges": [ {"from": "blender", "to": "save_blender"}, {"from": "threejs", "to": "save_threejs"}, ], }, context_id=str(cad_file.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-1", "task-2"] assert [call[0] for call in send_calls] == [ "app.tasks.step_tasks.render_graph_thumbnail", "app.tasks.step_tasks.render_graph_thumbnail", ] assert send_calls[0][2]["renderer"] == "blender" assert send_calls[0][2]["width"] == 512 assert send_calls[0][2]["height"] == 512 assert send_calls[1][2]["renderer"] == "threejs" assert send_calls[1][2]["width"] == 640 assert send_calls[1][2]["height"] == 480 assert send_calls[1][2]["transparent_bg"] is True assert send_calls[0][2]["workflow_node_id"] == "save_blender" assert send_calls[1][2]["workflow_node_id"] == "save_threejs" assert node_results["save_blender"].status == "queued" assert node_results["save_threejs"].status == "queued" def test_execute_graph_workflow_merges_legacy_render_context_into_still_task_kwargs( sync_session, tmp_path, monkeypatch, ): from app.config import settings monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) line = _seed_renderable_order_line(sync_session, tmp_path) asset_library = sync_session.execute( select(AssetLibrary).where(AssetLibrary.is_active == True) # noqa: E712 ).scalar_one() asset_library_path = Path(settings.upload_dir) / "asset-libraries" / f"{asset_library.id}.blend" asset_library_path.parent.mkdir(parents=True, exist_ok=True) asset_library_path.write_text("BLEND", encoding="utf-8") assert line.output_type is not None line.output_type.render_settings = { "width": 2048, "height": 2048, "engine": "cycles", "samples": 128, "noise_threshold": "0.05", } line.output_type.transparent_bg = True line.output_type.cycles_device = "cuda" line.render_overrides = { "height": 1440, "samples": 96, "denoiser": "OPENIMAGEDENOISE", } sync_session.commit() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-merged-still") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", lambda _session, _line: SimpleNamespace( rotation_x=15.0, rotation_y=25.0, rotation_z=35.0, focal_length_mm=85.0, sensor_width_mm=36.0, ), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {"width": 1024, "samples": 32}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="shadow", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-merged-still"] assert len(send_calls) == 1 task_name, args, kwargs = send_calls[0] assert task_name == "app.domains.rendering.tasks.render_order_line_still_task" assert args == [str(line.id)] assert kwargs["width"] == 2048 assert kwargs["height"] == 1440 assert kwargs["engine"] == "cycles" assert kwargs["samples"] == 96 assert kwargs["noise_threshold"] == "0.05" assert kwargs["denoiser"] == "OPENIMAGEDENOISE" assert kwargs["transparent_bg"] is True assert kwargs["cycles_device"] == "cuda" assert kwargs["template_path"] == "/templates/bearing.blend" assert kwargs["target_collection"] == "Product" assert kwargs["material_library_path"] == str(asset_library_path) assert kwargs["material_map"] is not None assert kwargs["part_colors"] == {"InnerRing": "Steel raw", "OuterRing": "Steel raw"} assert kwargs["part_names_ordered"] == ["InnerRing", "OuterRing"] assert kwargs["rotation_x"] == 15.0 assert kwargs["rotation_y"] == 25.0 assert kwargs["rotation_z"] == 35.0 assert kwargs["focal_length_mm"] == 85.0 assert kwargs["sensor_width_mm"] == 36.0 assert kwargs["publish_asset_enabled"] is False assert kwargs["emit_events"] is False assert kwargs["job_document_enabled"] is False assert kwargs["output_name_suffix"].startswith("shadow-") def test_build_task_kwargs_autoscales_default_samples_via_shared_render_invocation( tmp_path, monkeypatch, ): step_path = tmp_path / "cad" / "bearing.step" step_path.parent.mkdir(parents=True, exist_ok=True) step_path.write_text("STEP", encoding="utf-8") output_type = OutputType( id=uuid.uuid4(), name="Still Preview", renderer="blender", output_format="png", render_settings={"width": 1024, "height": 512}, ) output_type.invocation_overrides = {"samples": 128, "engine": "cycles"} cad_file = CadFile( id=uuid.uuid4(), original_name="bearing.step", stored_path=str(step_path), file_hash="hash-graph-1", parsed_objects={"objects": ["InnerRing", "OuterRing"]}, ) product = Product( id=uuid.uuid4(), pim_id="P-graph-1", name="Bearing G", category_key="bearings", cad_file_id=cad_file.id, cad_file=cad_file, ) line = OrderLine( id=uuid.uuid4(), order_id=uuid.uuid4(), product_id=product.id, product=product, output_type_id=output_type.id, output_type=output_type, ) state = WorkflowGraphState( setup=OrderLineRenderSetupResult( status="ready", order_line=line, cad_file=cad_file, part_colors={"InnerRing": "Steel raw"}, ) ) workflow_context = SimpleNamespace( workflow_run_id=uuid.uuid4(), execution_mode="shadow", ordered_nodes=[], edges=[], ) node = SimpleNamespace(id="render", step=StepName.BLENDER_STILL, params={}) monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", lambda _session, _line: SimpleNamespace( rotation_x=0.0, rotation_y=0.0, rotation_z=0.0, focal_length_mm=None, sensor_width_mm=None, ), ) kwargs = _build_task_kwargs( session=object(), workflow_context=workflow_context, state=state, node=node, ) assert kwargs["width"] == 1024 assert kwargs["height"] == 512 assert kwargs["samples"] == 64 def test_build_task_kwargs_ignores_authoritative_still_overrides_without_opt_in( tmp_path, monkeypatch, ): step_path = tmp_path / "cad" / "bearing.step" step_path.parent.mkdir(parents=True, exist_ok=True) step_path.write_text("STEP", encoding="utf-8") output_type = OutputType( id=uuid.uuid4(), name="Still Preview", renderer="blender", output_format="png", render_settings={ "width": 2048, "height": 1536, "engine": "cycles", "samples": 128, "noise_threshold": "0.05", }, transparent_bg=True, cycles_device="cuda", ) cad_file = CadFile( id=uuid.uuid4(), original_name="bearing.step", stored_path=str(step_path), file_hash="hash-graph-2", parsed_objects={"objects": ["InnerRing", "OuterRing"]}, ) product = Product( id=uuid.uuid4(), pim_id="P-graph-2", name="Bearing G2", category_key="bearings", cad_file_id=cad_file.id, cad_file=cad_file, ) line = OrderLine( id=uuid.uuid4(), order_id=uuid.uuid4(), product_id=product.id, product=product, output_type_id=output_type.id, output_type=output_type, ) state = WorkflowGraphState( setup=OrderLineRenderSetupResult( status="ready", order_line=line, cad_file=cad_file, part_colors={"InnerRing": "Steel raw"}, ) ) workflow_context = SimpleNamespace( workflow_run_id=uuid.uuid4(), execution_mode="graph", ordered_nodes=[], edges=[], ) node = SimpleNamespace( id="render", step=StepName.BLENDER_STILL, params={ "width": 1024, "height": 768, "samples": 16, "render_engine": "eevee", "transparent_bg": False, "cycles_device": "cpu", "noise_threshold": "0.2", }, ) monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", lambda _session, _line: SimpleNamespace( rotation_x=0.0, rotation_y=0.0, rotation_z=0.0, focal_length_mm=None, sensor_width_mm=None, ), ) kwargs = _build_task_kwargs( session=object(), workflow_context=workflow_context, state=state, node=node, ) assert kwargs["width"] == 2048 assert kwargs["height"] == 1536 assert kwargs["engine"] == "cycles" assert kwargs["samples"] == 128 assert kwargs["transparent_bg"] is True assert kwargs["cycles_device"] == "cuda" assert kwargs["noise_threshold"] == "0.05" assert "render_engine" not in kwargs def test_execute_graph_workflow_respects_custom_render_settings_opt_in_for_still_task( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) assert line.output_type is not None line.output_type.render_settings = { "width": 2048, "height": 2048, "engine": "cycles", "samples": 128, } sync_session.commit() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-custom-still") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, { "id": "render", "step": "blender_still", "params": { "use_custom_render_settings": True, "width": 1024, "height": 768, "samples": 32, "render_engine": "eevee", }, }, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-custom-still"] assert len(send_calls) == 1 task_name, args, kwargs = send_calls[0] assert task_name == "app.domains.rendering.tasks.render_order_line_still_task" assert args == [str(line.id)] assert kwargs["width"] == 1024 assert kwargs["height"] == 768 assert kwargs["samples"] == 32 assert kwargs["render_engine"] == "eevee" assert kwargs["engine"] == "cycles" def test_execute_graph_workflow_preserves_turntable_timing_without_custom_render_settings( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) assert line.output_type is not None line.output_type.render_settings = { "width": 2048, "height": 2048, "engine": "cycles", "samples": 128, "fps": 30, "frame_count": 180, "bg_color": "#ffffff", "turntable_axis": "world_y", } line.output_type.transparent_bg = True line.output_type.cycles_device = "gpu" line.material_override = "Studio White" sync_session.commit() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-turntable") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", lambda _session, _line: SimpleNamespace( rotation_x=10.0, rotation_y=20.0, rotation_z=30.0, focal_length_mm=70.0, sensor_width_mm=35.0, ), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, { "id": "render", "step": "blender_turntable", "params": { "width": 1024, "samples": 32, "fps": 24, "duration_s": 5, "frame_count": 120, }, }, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="shadow", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-turntable"] assert len(send_calls) == 1 task_name, args, kwargs = send_calls[0] assert task_name == "app.domains.rendering.tasks.render_turntable_task" assert args == [str(line.id)] assert kwargs["width"] == 2048 assert kwargs["height"] == 2048 assert kwargs["samples"] == 128 assert kwargs["transparent_bg"] is True assert kwargs["cycles_device"] == "gpu" assert kwargs["fps"] == 24 assert kwargs["frame_count"] == 120 assert kwargs["bg_color"] == "#ffffff" assert kwargs["turntable_axis"] == "world_y" assert kwargs["rotation_x"] == 10.0 assert kwargs["rotation_y"] == 20.0 assert kwargs["rotation_z"] == 30.0 assert kwargs["focal_length_mm"] == 70.0 assert kwargs["sensor_width_mm"] == 35.0 assert kwargs["material_override"] == "Studio White" assert kwargs["output_name_suffix"].startswith("shadow-") def test_execute_graph_workflow_respects_custom_render_settings_opt_in_for_turntable_task( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) assert line.output_type is not None line.output_type.render_settings = { "width": 2048, "height": 2048, "engine": "cycles", "samples": 128, "fps": 30, "frame_count": 180, } sync_session.commit() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-custom-turntable") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, { "id": "render", "step": "blender_turntable", "params": { "use_custom_render_settings": True, "width": 1024, "height": 768, "samples": 32, "render_engine": "eevee", "fps": 12, "duration_s": 6, }, }, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-custom-turntable"] assert len(send_calls) == 1 task_name, args, kwargs = send_calls[0] assert task_name == "app.domains.rendering.tasks.render_turntable_task" assert args == [str(line.id)] assert kwargs["width"] == 1024 assert kwargs["height"] == 768 assert kwargs["samples"] == 32 assert kwargs["render_engine"] == "eevee" assert kwargs["engine"] == "cycles" assert kwargs["fps"] == 12 assert kwargs["duration_s"] == 6.0 assert kwargs["frame_count"] == 72 def test_execute_graph_workflow_preserves_template_camera_orbit_without_custom_render_settings( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) template = sync_session.execute(select(RenderTemplate)).unique().scalar_one() template.camera_orbit = False assert line.output_type is not None line.output_type.render_settings = { "width": 2048, "height": 2048, "engine": "cycles", "samples": 128, "fps": 30, "frame_count": 180, } sync_session.commit() send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-turntable-camera-orbit") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, { "id": "render", "step": "blender_turntable", "params": { "fps": 24, "frame_count": 120, }, }, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-turntable-camera-orbit"] assert len(send_calls) == 1 assert send_calls[0][2]["camera_orbit"] is False def test_execute_graph_workflow_serializes_template_override_modes( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) template = sync_session.execute(select(RenderTemplate)).unique().scalar_one() template.target_collection = "TemplateCollection" template.material_replace_enabled = False template.lighting_only = False template.shadow_catcher_enabled = False template.camera_orbit = True sync_session.commit() workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, { "id": "template", "step": "resolve_template", "params": { "target_collection": "NodeCollection", "material_library_path": "/libraries/materials.blend", "material_replace_mode": "enabled", "lighting_only_mode": "enabled", "shadow_catcher_mode": "enabled", "camera_orbit_mode": "disabled", }, }, ], "edges": [ {"from": "setup", "to": "template"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == [] assert node_results["template"].status == "completed" assert node_results["template"].output["target_collection"] == "NodeCollection" assert node_results["template"].output["use_materials"] is True assert node_results["template"].output["lighting_only"] is True assert node_results["template"].output["shadow_catcher"] is True assert node_results["template"].output["camera_orbit"] is False def test_execute_graph_workflow_retries_bridge_node_and_persists_attempt_metadata( sync_session, monkeypatch, ): attempts = {"count": 0} def _flaky_prepare(_session, _context_id): attempts["count"] += 1 if attempts["count"] == 1: raise RuntimeError("temporary setup failure") return OrderLineRenderSetupResult(status="skip", reason="line_cancelled") monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context", _flaky_prepare, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "setup", "step": "order_line_setup", "params": {"retry_policy": {"max_attempts": 2}}, }, ], "edges": [], }, context_id=str(uuid.uuid4()), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup") assert dispatch_result.task_ids == [] assert refreshed_run.status == "completed" assert setup_result.status == "skipped" assert setup_result.output["attempt_count"] == 2 assert setup_result.output["max_attempts"] == 2 assert setup_result.output["retry_state"] == "recovered" assert setup_result.output["last_error"] == "temporary setup failure" assert setup_result.output["retry_policy"]["max_attempts"] == 2 def test_execute_graph_workflow_marks_failed_node_with_retry_exhausted_metadata( sync_session, monkeypatch, ): monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context", lambda _session, _context_id: (_ for _ in ()).throw(RuntimeError("permanent setup failure")), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "setup", "step": "order_line_setup", "params": { "retry_policy": {"max_attempts": 2}, "failure_policy": {"fallback_to_legacy": True}, }, }, ], "edges": [], }, context_id=str(uuid.uuid4()), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) with pytest.raises(WorkflowGraphRuntimeError, match="permanent setup failure"): execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup") assert setup_result.status == "failed" assert setup_result.output["attempt_count"] == 2 assert setup_result.output["max_attempts"] == 2 assert setup_result.output["retry_exhausted"] is True assert setup_result.output["last_error"] == "permanent setup failure" assert setup_result.output["failure_policy"]["fallback_to_legacy"] is True def test_execute_graph_workflow_supports_output_save_bridge_node( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) or SimpleNamespace(id="task-output-save"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, {"id": "output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, {"from": "render", "to": "output"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) assert find_unsupported_graph_nodes(workflow_context) == [] dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-output-save"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" assert send_calls[0][1] == [str(line.id)] assert send_calls[0][2]["publish_asset_enabled"] is False assert send_calls[0][2]["graph_authoritative_output_enabled"] is True assert send_calls[0][2]["graph_output_node_ids"] == ["output"] assert node_results["render"].status == "queued" assert node_results["output"].status == "pending" assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" assert node_results["output"].output["order_line_id"] == str(line.id) assert node_results["output"].output["handoff_state"] == "armed" assert node_results["output"].output["handoff_node_ids"] == ["render"] assert node_results["output"].output["artifact_count"] == 1 assert node_results["output"].output["upstream_artifacts"] == [ { "node_id": "render", "artifact_role": "render_output", "predicted_output_path": str( build_order_line_step_render_path( line.product.cad_file.stored_path, str(line.id), f"line_{line.id}.png", ) ), "predicted_asset_type": "still", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["output"], "notify_handoff_enabled": False, "task_id": "task-output-save", } ] def test_execute_graph_workflow_arms_output_save_handoff_for_export_blend( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) or SimpleNamespace(id="task-blend-output-save"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "blend", "step": "export_blend", "params": {}}, {"id": "output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "blend"}, {"from": "blend", "to": "output"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-blend-output-save"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.export_blend_for_order_line_task" assert send_calls[0][1] == [str(line.id)] assert send_calls[0][2]["publish_asset_enabled"] is False assert send_calls[0][2]["graph_authoritative_output_enabled"] is True assert send_calls[0][2]["graph_output_node_ids"] == ["output"] assert node_results["blend"].status == "queued" assert node_results["output"].status == "pending" assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" assert node_results["output"].output["handoff_state"] == "armed" assert node_results["output"].output["handoff_node_ids"] == ["blend"] assert node_results["output"].output["artifact_count"] == 1 assert node_results["output"].output["upstream_artifacts"] == [ { "node_id": "blend", "artifact_role": "blend_export", "predicted_output_path": str(build_order_line_export_path(str(line.id), "bearing_production.blend")), "predicted_asset_type": "blend_production", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["output"], "notify_handoff_enabled": False, "task_id": "task-blend-output-save", } ] def test_execute_graph_workflow_arms_output_save_handoff_for_turntable( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) or SimpleNamespace(id="task-turntable-output-save"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, {"id": "output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "turntable"}, {"from": "turntable", "to": "output"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-turntable-output-save"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_turntable_task" assert send_calls[0][1] == [str(line.id)] assert send_calls[0][2]["publish_asset_enabled"] is False assert send_calls[0][2]["graph_authoritative_output_enabled"] is True assert send_calls[0][2]["graph_output_node_ids"] == ["output"] assert send_calls[0][2]["workflow_node_id"] == "turntable" assert node_results["turntable"].status == "queued" assert node_results["output"].status == "pending" assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" assert node_results["output"].output["handoff_state"] == "armed" assert node_results["output"].output["handoff_node_ids"] == ["turntable"] assert node_results["output"].output["artifact_count"] == 1 assert node_results["output"].output["upstream_artifacts"] == [ { "node_id": "turntable", "artifact_role": "turntable_output", "predicted_output_path": str( build_order_line_step_render_path(line.product.cad_file.stored_path, str(line.id), "turntable.mp4") ), "predicted_asset_type": "turntable", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["output"], "notify_handoff_enabled": False, "task_id": "task-turntable-output-save", } ] def test_execute_graph_workflow_arms_shadow_output_save_handoff_for_turntable( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) or SimpleNamespace(id="task-shadow-turntable-output-save"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, {"id": "output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "turntable"}, {"from": "turntable", "to": "output"}, ], }, context_id=str(line.id), execution_mode="shadow", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-shadow-turntable-output-save"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_turntable_task" assert send_calls[0][1] == [str(line.id)] assert send_calls[0][2]["publish_asset_enabled"] is False assert send_calls[0][2]["observer_output_enabled"] is True assert send_calls[0][2]["graph_output_node_ids"] == ["output"] assert "graph_authoritative_output_enabled" not in send_calls[0][2] assert node_results["turntable"].status == "queued" assert node_results["output"].status == "pending" assert node_results["output"].output["publication_mode"] == "shadow_observer_only" assert node_results["output"].output["handoff_state"] == "armed" assert node_results["output"].output["handoff_node_ids"] == ["turntable"] assert node_results["output"].output["artifact_count"] == 1 assert node_results["output"].output["upstream_artifacts"] == [ { "node_id": "turntable", "artifact_role": "turntable_output", "predicted_output_path": str( build_order_line_step_render_path( line.product.cad_file.stored_path, str(line.id), f"turntable_shadow-{str(run.id)[:8]}.mp4", ) ), "predicted_asset_type": "turntable", "publish_asset_enabled": False, "graph_authoritative_output_enabled": False, "graph_output_node_ids": ["output"], "notify_handoff_enabled": False, "task_id": "task-shadow-turntable-output-save", } ] def test_execute_graph_workflow_routes_shadow_render_tasks_to_light_queue_when_available( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object], dict[str, object]]] = [] monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime._inspect_active_worker_queues", lambda timeout=1.0: {"asset_pipeline", "asset_pipeline_light"}, ) def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object], **task_options): send_calls.append((task_name, args, kwargs, task_options)) return SimpleNamespace(id="task-shadow-light-queue") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, {"id": "output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "turntable"}, {"from": "turntable", "to": "output"}, ], }, context_id=str(line.id), execution_mode="shadow", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-shadow-light-queue"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_turntable_task" assert send_calls[0][3]["queue"] == "asset_pipeline_light" assert node_results["turntable"].output["task_queue"] == "asset_pipeline_light" def test_execute_graph_workflow_routes_output_save_handoffs_per_connected_branch( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id=f"task-branch-{len(send_calls)}") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "still", "step": "blender_still", "params": {"width": 1024, "height": 768}}, {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, {"id": "still_output", "step": "output_save", "params": {}}, {"id": "turntable_output", "step": "output_save", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "still"}, {"from": "template", "to": "turntable"}, {"from": "still", "to": "still_output"}, {"from": "turntable", "to": "turntable_output"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-branch-1", "task-branch-2"] assert len(send_calls) == 2 assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" assert send_calls[0][2]["graph_output_node_ids"] == ["still_output"] assert send_calls[1][0] == "app.domains.rendering.tasks.render_turntable_task" assert send_calls[1][2]["graph_output_node_ids"] == ["turntable_output"] assert node_results["still_output"].status == "pending" assert node_results["still_output"].output["handoff_state"] == "armed" assert node_results["still_output"].output["handoff_node_ids"] == ["still"] assert node_results["still_output"].output["artifact_count"] == 1 assert node_results["still_output"].output["upstream_artifacts"] == [ { "node_id": "still", "artifact_role": "render_output", "predicted_output_path": str( build_order_line_step_render_path( line.product.cad_file.stored_path, str(line.id), f"line_{line.id}.png", ) ), "predicted_asset_type": "still", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["still_output"], "notify_handoff_enabled": False, "task_id": "task-branch-1", } ] assert node_results["turntable_output"].status == "pending" assert node_results["turntable_output"].output["handoff_state"] == "armed" assert node_results["turntable_output"].output["handoff_node_ids"] == ["turntable"] assert node_results["turntable_output"].output["artifact_count"] == 1 assert node_results["turntable_output"].output["upstream_artifacts"] == [ { "node_id": "turntable", "artifact_role": "turntable_output", "predicted_output_path": str( build_order_line_step_render_path(line.product.cad_file.stored_path, str(line.id), "turntable.mp4") ), "predicted_asset_type": "turntable", "publish_asset_enabled": False, "graph_authoritative_output_enabled": True, "graph_output_node_ids": ["turntable_output"], "notify_handoff_enabled": False, "task_id": "task-branch-2", } ] def test_execute_graph_workflow_keeps_self_publish_when_no_output_save_node( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-no-output-save") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() assert dispatch_result.task_ids == ["task-no-output-save"] assert len(send_calls) == 1 assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" assert send_calls[0][2].get("publish_asset_enabled", True) is True assert send_calls[0][2].get("graph_authoritative_output_enabled") in (None, False) assert send_calls[0][2].get("graph_output_node_ids") in (None, []) def test_execute_graph_workflow_arms_notify_handoff_for_graph_render_task( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-notify-handoff") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {}}, {"id": "notify", "step": "notify", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, {"from": "render", "to": "notify"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-notify-handoff"] assert len(send_calls) == 1 assert send_calls[0][2]["emit_legacy_notifications"] is True assert send_calls[0][2]["graph_notify_node_ids"] == ["notify"] assert node_results["render"].output["graph_notify_node_ids"] == ["notify"] assert node_results["notify"].status == "pending" assert node_results["notify"].output["notification_mode"] == "deferred_to_render_task" assert node_results["notify"].output["armed_node_ids"] == ["render"] assert node_results["notify"].output["handoff_state"] == "armed" def test_execute_graph_workflow_routes_notify_handoffs_per_connected_branch( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id=f"task-notify-branch-{len(send_calls)}") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "still", "step": "blender_still", "params": {}}, {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, {"id": "still_notify", "step": "notify", "params": {}}, {"id": "turntable_notify", "step": "notify", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "still"}, {"from": "template", "to": "turntable"}, {"from": "still", "to": "still_notify"}, {"from": "turntable", "to": "turntable_notify"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-notify-branch-1", "task-notify-branch-2"] assert len(send_calls) == 2 assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" assert send_calls[0][2]["emit_legacy_notifications"] is True assert send_calls[0][2]["graph_notify_node_ids"] == ["still_notify"] assert send_calls[1][0] == "app.domains.rendering.tasks.render_turntable_task" assert send_calls[1][2]["emit_legacy_notifications"] is True assert send_calls[1][2]["graph_notify_node_ids"] == ["turntable_notify"] assert node_results["still"].output["graph_notify_node_ids"] == ["still_notify"] assert node_results["turntable"].output["graph_notify_node_ids"] == ["turntable_notify"] assert node_results["still_notify"].status == "pending" assert node_results["still_notify"].output["notification_mode"] == "deferred_to_render_task" assert node_results["still_notify"].output["armed_node_ids"] == ["still"] assert node_results["still_notify"].output["handoff_state"] == "armed" assert node_results["turntable_notify"].status == "pending" assert node_results["turntable_notify"].output["notification_mode"] == "deferred_to_render_task" assert node_results["turntable_notify"].output["armed_node_ids"] == ["turntable"] assert node_results["turntable_notify"].output["handoff_state"] == "armed" def test_execute_graph_workflow_suppresses_notify_node_in_shadow_mode( sync_session, tmp_path, monkeypatch, ): line = _seed_renderable_order_line(sync_session, tmp_path) send_calls: list[tuple[str, list[str], dict[str, object]]] = [] def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): send_calls.append((task_name, args, kwargs)) return SimpleNamespace(id="task-shadow-notify") monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", _fake_send_task, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "render", "step": "blender_still", "params": {}}, {"id": "notify", "step": "notify", "params": {}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "render"}, {"from": "render", "to": "notify"}, ], }, context_id=str(line.id), execution_mode="shadow", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} assert dispatch_result.task_ids == ["task-shadow-notify"] assert len(send_calls) == 1 assert send_calls[0][2].get("emit_legacy_notifications", False) is False assert node_results["notify"].status == "skipped" assert node_results["notify"].output["notification_mode"] == "shadow_suppressed"