Files
HartOMat/backend/tests/domains/test_workflow_graph_runtime.py
T

1518 lines
54 KiB
Python

from __future__ import annotations
import os
import uuid
from pathlib import Path
from types import SimpleNamespace
import pytest
from sqlalchemy import create_engine, select, text
from sqlalchemy.orm import Session, selectinload
from app.database import Base
from app.core.process_steps import StepName
from app.domains.auth.models import User, UserRole
from app.domains.materials.models import AssetLibrary
from app.domains.orders.models import Order, OrderLine, OrderStatus
from app.domains.products.models import CadFile, Product
from app.domains.rendering.models import OutputType, RenderTemplate, WorkflowRun
from app.domains.rendering.workflow_executor import prepare_workflow_context
from app.domains.rendering.workflow_graph_runtime import (
_build_task_kwargs,
WorkflowGraphState,
WorkflowGraphRuntimeError,
execute_graph_workflow,
find_unsupported_graph_nodes,
)
from app.domains.rendering.workflow_run_service import create_workflow_run
from app.domains.rendering.workflow_runtime_services import OrderLineRenderSetupResult
import app.models # noqa: F401
from tests.db_test_utils import reset_public_schema_sync, resolve_test_db_url
@pytest.fixture
def sync_session():
engine = create_engine(resolve_test_db_url(async_driver=False))
with engine.begin() as conn:
reset_public_schema_sync(conn)
Base.metadata.create_all(conn)
session = Session(engine)
try:
yield session
finally:
session.close()
with engine.begin() as conn:
reset_public_schema_sync(conn)
engine.dispose()
def _seed_renderable_order_line(
session: Session,
tmp_path: Path,
*,
with_blank_materials: bool = False,
) -> OrderLine:
step_path = tmp_path / "cad" / "bearing.step"
step_path.parent.mkdir(parents=True, exist_ok=True)
step_path.write_text("STEP", encoding="utf-8")
user = User(
id=uuid.uuid4(),
email=f"graph-{uuid.uuid4().hex[:8]}@test.local",
password_hash="hash",
full_name="Graph Runtime Tester",
role=UserRole.admin,
is_active=True,
)
cad_file = CadFile(
id=uuid.uuid4(),
original_name="bearing.step",
stored_path=str(step_path),
file_hash=f"hash-{uuid.uuid4().hex}",
parsed_objects={"objects": ["InnerRing", "OuterRing"]},
)
product = Product(
id=uuid.uuid4(),
pim_id=f"P-{uuid.uuid4().hex[:8]}",
name="Bearing A",
category_key="bearings",
cad_file_id=cad_file.id,
cad_file=cad_file,
components=[
{"part_name": "InnerRing", "material": "Steel"},
{"part_name": "OuterRing", "material": "Rubber"},
],
cad_part_materials=(
[]
if with_blank_materials
else [
{"part_name": "InnerRing", "material": "Steel raw"},
{"part_name": "OuterRing", "material": "Steel raw"},
]
),
)
output_type = OutputType(
id=uuid.uuid4(),
name=f"Still-{uuid.uuid4().hex[:6]}",
renderer="blender",
output_format="png",
render_settings={"width": 1600, "height": 900},
)
order = Order(
id=uuid.uuid4(),
order_number=f"ORD-{uuid.uuid4().hex[:8]}",
status=OrderStatus.processing,
created_by=user.id,
)
line = OrderLine(
id=uuid.uuid4(),
order_id=order.id,
product_id=product.id,
product=product,
output_type_id=output_type.id,
output_type=output_type,
render_status="pending",
)
session.add_all([user, cad_file, product, output_type, order, line])
session.flush()
session.add(
AssetLibrary(
id=uuid.uuid4(),
name="Default Library",
blend_file_path="/libraries/materials.blend",
is_active=True,
)
)
session.add(
RenderTemplate(
id=uuid.uuid4(),
name="Bearing Studio",
category_key="bearings",
blend_file_path="/templates/bearing.blend",
original_filename="bearing.blend",
target_collection="Product",
material_replace_enabled=True,
lighting_only=False,
is_active=True,
output_types=[output_type],
)
)
session.commit()
return line
def _seed_cad_file_only(session: Session, tmp_path: Path) -> CadFile:
step_path = tmp_path / "cad-only" / "thumbnail.step"
step_path.parent.mkdir(parents=True, exist_ok=True)
step_path.write_text("STEP", encoding="utf-8")
cad_file = CadFile(
id=uuid.uuid4(),
original_name="thumbnail.step",
stored_path=str(step_path),
file_hash=f"hash-{uuid.uuid4().hex}",
parsed_objects={"objects": ["Body"]},
)
session.add(cad_file)
session.commit()
return cad_file
def test_execute_graph_workflow_persists_bridge_outputs_and_queues_render_task(
sync_session,
tmp_path,
monkeypatch,
):
from app.config import settings
monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads"))
queued_thumbnail: list[tuple[str, dict[str, str]]] = []
line = _seed_renderable_order_line(sync_session, tmp_path, with_blank_materials=True)
monkeypatch.setattr(
"app.domains.pipeline.tasks.render_thumbnail.regenerate_thumbnail.delay",
lambda cad_file_id, part_colors: queued_thumbnail.append((cad_file_id, part_colors)),
)
monkeypatch.setattr(
"app.domains.rendering.workflow_runtime_services.extract_bbox_from_step_cadquery",
lambda step_path: {
"dimensions_mm": {"x": 12.5, "y": 20.0, "z": 7.5},
"bbox_center_mm": {"x": 6.25, "y": 10.0, "z": 3.75},
},
)
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
lambda task_name, args, kwargs: SimpleNamespace(id=f"task-{len(args)}"),
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{"id": "materials", "step": "material_map_resolve", "params": {}},
{"id": "autofill", "step": "auto_populate_materials", "params": {}},
{"id": "bbox", "step": "glb_bbox", "params": {}},
{"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 1024}},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "materials"},
{"from": "materials", "to": "autofill"},
{"from": "autofill", "to": "bbox"},
{"from": "bbox", "to": "render"},
],
},
context_id=str(line.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
sync_session.refresh(line.product)
assert dispatch_result.task_ids == ["task-1"]
assert refreshed_run.status == "pending"
assert refreshed_run.celery_task_id == "task-1"
assert node_results["setup"].status == "completed"
assert node_results["setup"].output["cad_file_id"] == str(line.product.cad_file_id)
assert node_results["template"].status == "completed"
assert node_results["template"].output["template_name"] == "Bearing Studio"
assert node_results["materials"].status == "completed"
assert node_results["materials"].output["material_map_count"] == 0
assert node_results["autofill"].status == "completed"
assert node_results["autofill"].output["updated_product_count"] == 1
assert node_results["autofill"].output["queued_thumbnail_regeneration"] is True
assert node_results["bbox"].status == "completed"
assert node_results["bbox"].output["has_bbox"] is True
assert node_results["render"].status == "queued"
assert node_results["render"].output["task_id"] == "task-1"
assert line.product.cad_part_materials == [
{"part_name": "InnerRing", "material": "Steel"},
{"part_name": "OuterRing", "material": "Rubber"},
]
assert queued_thumbnail == [
(
str(line.product.cad_file_id),
{"InnerRing": "Steel", "OuterRing": "Rubber"},
)
]
def test_execute_graph_workflow_routes_cad_thumbnail_save_using_upstream_threejs_request(
sync_session,
tmp_path,
monkeypatch,
):
cad_file = _seed_cad_file_only(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id="task-threejs-thumb")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{
"id": "threejs",
"step": "threejs_render",
"params": {"width": 640, "height": 480, "transparent_bg": True},
},
{"id": "save", "step": "thumbnail_save", "params": {}},
],
"edges": [
{"from": "threejs", "to": "save"},
],
},
context_id=str(cad_file.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=None,
workflow_context=workflow_context,
)
assert find_unsupported_graph_nodes(workflow_context) == []
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == ["task-threejs-thumb"]
assert len(send_calls) == 1
assert send_calls[0][0] == "app.tasks.step_tasks.render_graph_thumbnail"
assert send_calls[0][1] == [str(cad_file.id)]
assert send_calls[0][2]["renderer"] == "threejs"
assert send_calls[0][2]["width"] == 640
assert send_calls[0][2]["height"] == 480
assert send_calls[0][2]["transparent_bg"] is True
assert node_results["threejs"].status == "completed"
assert node_results["threejs"].output["renderer"] == "threejs"
assert node_results["save"].status == "queued"
assert node_results["save"].output["predicted_output_path"].endswith(f"{cad_file.id}.png")
def test_execute_graph_workflow_completes_cad_bridge_only_nodes_without_queueing(
sync_session,
tmp_path,
monkeypatch,
):
cad_file = _seed_cad_file_only(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id="unexpected-task")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "resolve", "step": "resolve_step_path", "params": {}},
{"id": "stl", "step": "stl_cache_generate", "params": {}},
],
"edges": [
{"from": "resolve", "to": "stl"},
],
},
context_id=str(cad_file.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=None,
workflow_context=workflow_context,
)
assert find_unsupported_graph_nodes(workflow_context) == []
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == []
assert send_calls == []
assert refreshed_run.status == "completed"
assert refreshed_run.completed_at is not None
assert node_results["resolve"].status == "completed"
assert node_results["resolve"].output["cad_file_id"] == str(cad_file.id)
assert node_results["resolve"].output["processing_status"] == "pending"
assert node_results["resolve"].output["step_path"].endswith("thumbnail.step")
assert node_results["stl"].status == "completed"
assert node_results["stl"].output["cache_mode"] == "compatibility_noop"
assert node_results["stl"].output["cache_required"] is False
assert node_results["stl"].output["cad_file_id"] == str(cad_file.id)
def test_execute_graph_workflow_routes_each_thumbnail_save_to_its_connected_request(
sync_session,
tmp_path,
monkeypatch,
):
cad_file = _seed_cad_file_only(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id=f"task-{len(send_calls)}")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{
"id": "blender",
"step": "blender_render",
"params": {"width": 512, "height": 512, "samples": 64},
},
{
"id": "threejs",
"step": "threejs_render",
"params": {"width": 640, "height": 480, "transparent_bg": True},
},
{"id": "save_blender", "step": "thumbnail_save", "params": {}},
{"id": "save_threejs", "step": "thumbnail_save", "params": {}},
],
"edges": [
{"from": "blender", "to": "save_blender"},
{"from": "threejs", "to": "save_threejs"},
],
},
context_id=str(cad_file.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=None,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == ["task-1", "task-2"]
assert [call[0] for call in send_calls] == [
"app.tasks.step_tasks.render_graph_thumbnail",
"app.tasks.step_tasks.render_graph_thumbnail",
]
assert send_calls[0][2]["renderer"] == "blender"
assert send_calls[0][2]["width"] == 512
assert send_calls[0][2]["height"] == 512
assert send_calls[1][2]["renderer"] == "threejs"
assert send_calls[1][2]["width"] == 640
assert send_calls[1][2]["height"] == 480
assert send_calls[1][2]["transparent_bg"] is True
assert send_calls[0][2]["workflow_node_id"] == "save_blender"
assert send_calls[1][2]["workflow_node_id"] == "save_threejs"
assert node_results["save_blender"].status == "queued"
assert node_results["save_threejs"].status == "queued"
def test_execute_graph_workflow_merges_legacy_render_context_into_still_task_kwargs(
sync_session,
tmp_path,
monkeypatch,
):
from app.config import settings
monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads"))
line = _seed_renderable_order_line(sync_session, tmp_path)
asset_library = sync_session.execute(
select(AssetLibrary).where(AssetLibrary.is_active == True) # noqa: E712
).scalar_one()
asset_library_path = Path(settings.upload_dir) / "asset-libraries" / f"{asset_library.id}.blend"
asset_library_path.parent.mkdir(parents=True, exist_ok=True)
asset_library_path.write_text("BLEND", encoding="utf-8")
assert line.output_type is not None
line.output_type.render_settings = {
"width": 2048,
"height": 2048,
"engine": "cycles",
"samples": 128,
"noise_threshold": "0.05",
}
line.output_type.transparent_bg = True
line.output_type.cycles_device = "cuda"
line.render_overrides = {
"height": 1440,
"samples": 96,
"denoiser": "OPENIMAGEDENOISE",
}
sync_session.commit()
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id="task-merged-still")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
monkeypatch.setattr(
"app.domains.rendering.workflow_graph_runtime.resolve_render_position_context",
lambda _session, _line: SimpleNamespace(
rotation_x=15.0,
rotation_y=25.0,
rotation_z=35.0,
focal_length_mm=85.0,
sensor_width_mm=36.0,
),
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{"id": "render", "step": "blender_still", "params": {"width": 1024, "samples": 32}},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
],
},
context_id=str(line.id),
execution_mode="shadow",
)
create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
assert dispatch_result.task_ids == ["task-merged-still"]
assert len(send_calls) == 1
task_name, args, kwargs = send_calls[0]
assert task_name == "app.domains.rendering.tasks.render_order_line_still_task"
assert args == [str(line.id)]
assert kwargs["width"] == 2048
assert kwargs["height"] == 1440
assert kwargs["engine"] == "cycles"
assert kwargs["samples"] == 96
assert kwargs["noise_threshold"] == "0.05"
assert kwargs["denoiser"] == "OPENIMAGEDENOISE"
assert kwargs["transparent_bg"] is True
assert kwargs["cycles_device"] == "cuda"
assert kwargs["template_path"] == "/templates/bearing.blend"
assert kwargs["target_collection"] == "Product"
assert kwargs["material_library_path"] == str(asset_library_path)
assert kwargs["material_map"] is not None
assert kwargs["part_colors"] == {"InnerRing": "Steel raw", "OuterRing": "Steel raw"}
assert kwargs["part_names_ordered"] == ["InnerRing", "OuterRing"]
assert kwargs["rotation_x"] == 15.0
assert kwargs["rotation_y"] == 25.0
assert kwargs["rotation_z"] == 35.0
assert kwargs["focal_length_mm"] == 85.0
assert kwargs["sensor_width_mm"] == 36.0
assert kwargs["publish_asset_enabled"] is False
assert kwargs["emit_events"] is False
assert kwargs["job_document_enabled"] is False
assert kwargs["output_name_suffix"].startswith("shadow-")
def test_build_task_kwargs_autoscales_default_samples_via_shared_render_invocation(
tmp_path,
monkeypatch,
):
step_path = tmp_path / "cad" / "bearing.step"
step_path.parent.mkdir(parents=True, exist_ok=True)
step_path.write_text("STEP", encoding="utf-8")
output_type = OutputType(
id=uuid.uuid4(),
name="Still Preview",
renderer="blender",
output_format="png",
render_settings={"width": 1024, "height": 512},
)
output_type.invocation_overrides = {"samples": 128, "engine": "cycles"}
cad_file = CadFile(
id=uuid.uuid4(),
original_name="bearing.step",
stored_path=str(step_path),
file_hash="hash-graph-1",
parsed_objects={"objects": ["InnerRing", "OuterRing"]},
)
product = Product(
id=uuid.uuid4(),
pim_id="P-graph-1",
name="Bearing G",
category_key="bearings",
cad_file_id=cad_file.id,
cad_file=cad_file,
)
line = OrderLine(
id=uuid.uuid4(),
order_id=uuid.uuid4(),
product_id=product.id,
product=product,
output_type_id=output_type.id,
output_type=output_type,
)
state = WorkflowGraphState(
setup=OrderLineRenderSetupResult(
status="ready",
order_line=line,
cad_file=cad_file,
part_colors={"InnerRing": "Steel raw"},
)
)
workflow_context = SimpleNamespace(
workflow_run_id=uuid.uuid4(),
execution_mode="shadow",
ordered_nodes=[],
edges=[],
)
node = SimpleNamespace(id="render", step=StepName.BLENDER_STILL, params={})
monkeypatch.setattr(
"app.domains.rendering.workflow_graph_runtime.resolve_render_position_context",
lambda _session, _line: SimpleNamespace(
rotation_x=0.0,
rotation_y=0.0,
rotation_z=0.0,
focal_length_mm=None,
sensor_width_mm=None,
),
)
kwargs = _build_task_kwargs(
session=object(),
workflow_context=workflow_context,
state=state,
node=node,
)
assert kwargs["width"] == 1024
assert kwargs["height"] == 512
assert kwargs["samples"] == 64
def test_execute_graph_workflow_respects_custom_render_settings_opt_in_for_still_task(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
assert line.output_type is not None
line.output_type.render_settings = {
"width": 2048,
"height": 2048,
"engine": "cycles",
"samples": 128,
}
sync_session.commit()
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id="task-custom-still")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{
"id": "render",
"step": "blender_still",
"params": {
"use_custom_render_settings": True,
"width": 1024,
"height": 768,
"samples": 32,
"render_engine": "eevee",
},
},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
],
},
context_id=str(line.id),
execution_mode="graph",
)
create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
assert dispatch_result.task_ids == ["task-custom-still"]
assert len(send_calls) == 1
task_name, args, kwargs = send_calls[0]
assert task_name == "app.domains.rendering.tasks.render_order_line_still_task"
assert args == [str(line.id)]
assert kwargs["width"] == 1024
assert kwargs["height"] == 768
assert kwargs["samples"] == 32
assert kwargs["render_engine"] == "eevee"
assert kwargs["engine"] == "cycles"
def test_execute_graph_workflow_preserves_turntable_timing_without_custom_render_settings(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
assert line.output_type is not None
line.output_type.render_settings = {
"width": 2048,
"height": 2048,
"engine": "cycles",
"samples": 128,
"fps": 30,
"frame_count": 180,
"bg_color": "#ffffff",
"turntable_axis": "world_y",
}
line.output_type.transparent_bg = True
line.output_type.cycles_device = "gpu"
line.material_override = "Studio White"
sync_session.commit()
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id="task-turntable")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
monkeypatch.setattr(
"app.domains.rendering.workflow_graph_runtime.resolve_render_position_context",
lambda _session, _line: SimpleNamespace(
rotation_x=10.0,
rotation_y=20.0,
rotation_z=30.0,
focal_length_mm=70.0,
sensor_width_mm=35.0,
),
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{
"id": "render",
"step": "blender_turntable",
"params": {
"width": 1024,
"samples": 32,
"fps": 24,
"duration_s": 5,
"frame_count": 120,
},
},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
],
},
context_id=str(line.id),
execution_mode="shadow",
)
create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
assert dispatch_result.task_ids == ["task-turntable"]
assert len(send_calls) == 1
task_name, args, kwargs = send_calls[0]
assert task_name == "app.domains.rendering.tasks.render_turntable_task"
assert args == [str(line.id)]
assert kwargs["width"] == 2048
assert kwargs["height"] == 2048
assert kwargs["samples"] == 128
assert kwargs["transparent_bg"] is True
assert kwargs["cycles_device"] == "gpu"
assert kwargs["fps"] == 24
assert kwargs["frame_count"] == 120
assert kwargs["bg_color"] == "#ffffff"
assert kwargs["turntable_axis"] == "world_y"
assert kwargs["rotation_x"] == 10.0
assert kwargs["rotation_y"] == 20.0
assert kwargs["rotation_z"] == 30.0
assert kwargs["focal_length_mm"] == 70.0
assert kwargs["sensor_width_mm"] == 35.0
assert kwargs["material_override"] == "Studio White"
assert kwargs["output_name_suffix"].startswith("shadow-")
def test_execute_graph_workflow_retries_bridge_node_and_persists_attempt_metadata(
sync_session,
monkeypatch,
):
attempts = {"count": 0}
def _flaky_prepare(_session, _context_id):
attempts["count"] += 1
if attempts["count"] == 1:
raise RuntimeError("temporary setup failure")
return OrderLineRenderSetupResult(status="skip", reason="line_cancelled")
monkeypatch.setattr(
"app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context",
_flaky_prepare,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{
"id": "setup",
"step": "order_line_setup",
"params": {"retry_policy": {"max_attempts": 2}},
},
],
"edges": [],
},
context_id=str(uuid.uuid4()),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=None,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup")
assert dispatch_result.task_ids == []
assert refreshed_run.status == "completed"
assert setup_result.status == "skipped"
assert setup_result.output["attempt_count"] == 2
assert setup_result.output["max_attempts"] == 2
assert setup_result.output["retry_state"] == "recovered"
assert setup_result.output["last_error"] == "temporary setup failure"
assert setup_result.output["retry_policy"]["max_attempts"] == 2
def test_execute_graph_workflow_marks_failed_node_with_retry_exhausted_metadata(
sync_session,
monkeypatch,
):
monkeypatch.setattr(
"app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context",
lambda _session, _context_id: (_ for _ in ()).throw(RuntimeError("permanent setup failure")),
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{
"id": "setup",
"step": "order_line_setup",
"params": {
"retry_policy": {"max_attempts": 2},
"failure_policy": {"fallback_to_legacy": True},
},
},
],
"edges": [],
},
context_id=str(uuid.uuid4()),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=None,
workflow_context=workflow_context,
)
with pytest.raises(WorkflowGraphRuntimeError, match="permanent setup failure"):
execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup")
assert setup_result.status == "failed"
assert setup_result.output["attempt_count"] == 2
assert setup_result.output["max_attempts"] == 2
assert setup_result.output["retry_exhausted"] is True
assert setup_result.output["last_error"] == "permanent setup failure"
assert setup_result.output["failure_policy"]["fallback_to_legacy"] is True
def test_execute_graph_workflow_supports_output_save_bridge_node(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs))
or SimpleNamespace(id="task-output-save"),
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}},
{"id": "output", "step": "output_save", "params": {}},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
{"from": "render", "to": "output"},
],
},
context_id=str(line.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
assert find_unsupported_graph_nodes(workflow_context) == []
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == ["task-output-save"]
assert len(send_calls) == 1
assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task"
assert send_calls[0][1] == [str(line.id)]
assert send_calls[0][2]["publish_asset_enabled"] is False
assert send_calls[0][2]["graph_authoritative_output_enabled"] is True
assert send_calls[0][2]["graph_output_node_ids"] == ["output"]
assert node_results["render"].status == "queued"
assert node_results["output"].status == "completed"
assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save"
assert node_results["output"].output["order_line_id"] == str(line.id)
assert node_results["output"].output["artifact_count"] == 1
assert node_results["output"].output["upstream_artifacts"] == [
{
"node_id": "render",
"artifact_role": "render_output",
"predicted_output_path": str(
tmp_path / "cad" / "renders" / f"line_{line.id}.png"
),
"predicted_asset_type": "still",
"publish_asset_enabled": False,
"graph_authoritative_output_enabled": True,
"graph_output_node_ids": ["output"],
"notify_handoff_enabled": False,
"task_id": "task-output-save",
}
]
def test_execute_graph_workflow_arms_output_save_handoff_for_export_blend(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs))
or SimpleNamespace(id="task-blend-output-save"),
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "blend", "step": "export_blend", "params": {}},
{"id": "output", "step": "output_save", "params": {}},
],
"edges": [
{"from": "setup", "to": "blend"},
{"from": "blend", "to": "output"},
],
},
context_id=str(line.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == ["task-blend-output-save"]
assert len(send_calls) == 1
assert send_calls[0][0] == "app.domains.rendering.tasks.export_blend_for_order_line_task"
assert send_calls[0][1] == [str(line.id)]
assert send_calls[0][2]["publish_asset_enabled"] is False
assert send_calls[0][2]["graph_authoritative_output_enabled"] is True
assert send_calls[0][2]["graph_output_node_ids"] == ["output"]
assert node_results["blend"].status == "queued"
assert node_results["output"].status == "completed"
assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save"
assert node_results["output"].output["artifact_count"] == 1
assert node_results["output"].output["upstream_artifacts"] == [
{
"node_id": "blend",
"artifact_role": "blend_export",
"predicted_output_path": str(tmp_path / "cad" / "bearing_production.blend"),
"predicted_asset_type": "blend_production",
"publish_asset_enabled": False,
"graph_authoritative_output_enabled": True,
"graph_output_node_ids": ["output"],
"notify_handoff_enabled": False,
"task_id": "task-blend-output-save",
}
]
def test_execute_graph_workflow_arms_output_save_handoff_for_turntable(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs))
or SimpleNamespace(id="task-turntable-output-save"),
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}},
{"id": "output", "step": "output_save", "params": {}},
],
"edges": [
{"from": "setup", "to": "turntable"},
{"from": "turntable", "to": "output"},
],
},
context_id=str(line.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == ["task-turntable-output-save"]
assert len(send_calls) == 1
assert send_calls[0][0] == "app.domains.rendering.tasks.render_turntable_task"
assert send_calls[0][1] == [str(line.id)]
assert send_calls[0][2]["publish_asset_enabled"] is False
assert send_calls[0][2]["graph_authoritative_output_enabled"] is True
assert send_calls[0][2]["graph_output_node_ids"] == ["output"]
assert send_calls[0][2]["workflow_node_id"] == "turntable"
assert node_results["turntable"].status == "queued"
assert node_results["output"].status == "completed"
assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save"
assert node_results["output"].output["artifact_count"] == 1
assert node_results["output"].output["upstream_artifacts"] == [
{
"node_id": "turntable",
"artifact_role": "turntable_output",
"predicted_output_path": str(tmp_path / "cad" / "renders" / "turntable.mp4"),
"predicted_asset_type": "turntable",
"publish_asset_enabled": False,
"graph_authoritative_output_enabled": True,
"graph_output_node_ids": ["output"],
"notify_handoff_enabled": False,
"task_id": "task-turntable-output-save",
}
]
def test_execute_graph_workflow_routes_output_save_handoffs_per_connected_branch(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id=f"task-branch-{len(send_calls)}")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{"id": "still", "step": "blender_still", "params": {"width": 1024, "height": 768}},
{"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}},
{"id": "still_output", "step": "output_save", "params": {}},
{"id": "turntable_output", "step": "output_save", "params": {}},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "still"},
{"from": "template", "to": "turntable"},
{"from": "still", "to": "still_output"},
{"from": "turntable", "to": "turntable_output"},
],
},
context_id=str(line.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == ["task-branch-1", "task-branch-2"]
assert len(send_calls) == 2
assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task"
assert send_calls[0][2]["graph_output_node_ids"] == ["still_output"]
assert send_calls[1][0] == "app.domains.rendering.tasks.render_turntable_task"
assert send_calls[1][2]["graph_output_node_ids"] == ["turntable_output"]
assert node_results["still_output"].output["artifact_count"] == 1
assert node_results["still_output"].output["upstream_artifacts"] == [
{
"node_id": "still",
"artifact_role": "render_output",
"predicted_output_path": str(tmp_path / "cad" / "renders" / f"line_{line.id}.png"),
"predicted_asset_type": "still",
"publish_asset_enabled": False,
"graph_authoritative_output_enabled": True,
"graph_output_node_ids": ["still_output"],
"notify_handoff_enabled": False,
"task_id": "task-branch-1",
}
]
assert node_results["turntable_output"].output["artifact_count"] == 1
assert node_results["turntable_output"].output["upstream_artifacts"] == [
{
"node_id": "turntable",
"artifact_role": "turntable_output",
"predicted_output_path": str(tmp_path / "cad" / "renders" / "turntable.mp4"),
"predicted_asset_type": "turntable",
"publish_asset_enabled": False,
"graph_authoritative_output_enabled": True,
"graph_output_node_ids": ["turntable_output"],
"notify_handoff_enabled": False,
"task_id": "task-branch-2",
}
]
def test_execute_graph_workflow_keeps_self_publish_when_no_output_save_node(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id="task-no-output-save")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
],
},
context_id=str(line.id),
execution_mode="graph",
)
create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
assert dispatch_result.task_ids == ["task-no-output-save"]
assert len(send_calls) == 1
assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task"
assert send_calls[0][2].get("publish_asset_enabled", True) is True
assert send_calls[0][2].get("graph_authoritative_output_enabled") in (None, False)
assert send_calls[0][2].get("graph_output_node_ids") in (None, [])
def test_execute_graph_workflow_arms_notify_handoff_for_graph_render_task(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id="task-notify-handoff")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{"id": "render", "step": "blender_still", "params": {}},
{"id": "notify", "step": "notify", "params": {}},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
{"from": "render", "to": "notify"},
],
},
context_id=str(line.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == ["task-notify-handoff"]
assert len(send_calls) == 1
assert send_calls[0][2]["emit_legacy_notifications"] is True
assert send_calls[0][2]["graph_notify_node_ids"] == ["notify"]
assert node_results["render"].output["graph_notify_node_ids"] == ["notify"]
assert node_results["notify"].status == "completed"
assert node_results["notify"].output["notification_mode"] == "deferred_to_render_task"
assert node_results["notify"].output["armed_node_ids"] == ["render"]
def test_execute_graph_workflow_routes_notify_handoffs_per_connected_branch(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id=f"task-notify-branch-{len(send_calls)}")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{"id": "still", "step": "blender_still", "params": {}},
{"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}},
{"id": "still_notify", "step": "notify", "params": {}},
{"id": "turntable_notify", "step": "notify", "params": {}},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "still"},
{"from": "template", "to": "turntable"},
{"from": "still", "to": "still_notify"},
{"from": "turntable", "to": "turntable_notify"},
],
},
context_id=str(line.id),
execution_mode="graph",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == ["task-notify-branch-1", "task-notify-branch-2"]
assert len(send_calls) == 2
assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task"
assert send_calls[0][2]["emit_legacy_notifications"] is True
assert send_calls[0][2]["graph_notify_node_ids"] == ["still_notify"]
assert send_calls[1][0] == "app.domains.rendering.tasks.render_turntable_task"
assert send_calls[1][2]["emit_legacy_notifications"] is True
assert send_calls[1][2]["graph_notify_node_ids"] == ["turntable_notify"]
assert node_results["still"].output["graph_notify_node_ids"] == ["still_notify"]
assert node_results["turntable"].output["graph_notify_node_ids"] == ["turntable_notify"]
assert node_results["still_notify"].status == "completed"
assert node_results["still_notify"].output["armed_node_ids"] == ["still"]
assert node_results["turntable_notify"].status == "completed"
assert node_results["turntable_notify"].output["armed_node_ids"] == ["turntable"]
def test_execute_graph_workflow_suppresses_notify_node_in_shadow_mode(
sync_session,
tmp_path,
monkeypatch,
):
line = _seed_renderable_order_line(sync_session, tmp_path)
send_calls: list[tuple[str, list[str], dict[str, object]]] = []
def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]):
send_calls.append((task_name, args, kwargs))
return SimpleNamespace(id="task-shadow-notify")
monkeypatch.setattr(
"app.tasks.celery_app.celery_app.send_task",
_fake_send_task,
)
workflow_context = prepare_workflow_context(
{
"version": 1,
"nodes": [
{"id": "setup", "step": "order_line_setup", "params": {}},
{"id": "template", "step": "resolve_template", "params": {}},
{"id": "render", "step": "blender_still", "params": {}},
{"id": "notify", "step": "notify", "params": {}},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
{"from": "render", "to": "notify"},
],
},
context_id=str(line.id),
execution_mode="shadow",
)
run = create_workflow_run(
sync_session,
workflow_def_id=None,
order_line_id=line.id,
workflow_context=workflow_context,
)
dispatch_result = execute_graph_workflow(sync_session, workflow_context)
sync_session.commit()
refreshed_run = sync_session.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run.id)
.options(selectinload(WorkflowRun.node_results))
).scalar_one()
node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results}
assert dispatch_result.task_ids == ["task-shadow-notify"]
assert len(send_calls) == 1
assert send_calls[0][2].get("emit_legacy_notifications", False) is False
assert node_results["notify"].status == "skipped"
assert node_results["notify"].output["notification_mode"] == "shadow_suppressed"