from __future__ import annotations import os import uuid from pathlib import Path from types import SimpleNamespace import pytest from sqlalchemy import create_engine, select, text from sqlalchemy.orm import Session, selectinload from app.database import Base from app.domains.auth.models import User, UserRole from app.domains.materials.models import AssetLibrary from app.domains.orders.models import Order, OrderLine, OrderStatus from app.domains.products.models import CadFile, Product from app.domains.rendering.models import OutputType, RenderTemplate, WorkflowRun from app.domains.rendering.workflow_executor import prepare_workflow_context from app.domains.rendering.workflow_graph_runtime import WorkflowGraphRuntimeError, execute_graph_workflow from app.domains.rendering.workflow_run_service import create_workflow_run from app.domains.rendering.workflow_runtime_services import OrderLineRenderSetupResult import app.models # noqa: F401 TEST_DB_URL = os.environ.get( "TEST_DATABASE_URL", "postgresql+asyncpg://hartomat:hartomat@localhost:5432/hartomat_test", ).replace("+asyncpg", "") @pytest.fixture def sync_session(): engine = create_engine(TEST_DB_URL) with engine.begin() as conn: Base.metadata.create_all(conn) session = Session(engine) try: yield session finally: session.close() with engine.begin() as conn: conn.execute(text("DROP SCHEMA public CASCADE")) conn.execute(text("CREATE SCHEMA public")) engine.dispose() def _seed_renderable_order_line( session: Session, tmp_path: Path, *, with_blank_materials: bool = False, ) -> OrderLine: step_path = tmp_path / "cad" / "bearing.step" step_path.parent.mkdir(parents=True, exist_ok=True) step_path.write_text("STEP", encoding="utf-8") user = User( id=uuid.uuid4(), email=f"graph-{uuid.uuid4().hex[:8]}@test.local", password_hash="hash", full_name="Graph Runtime Tester", role=UserRole.admin, is_active=True, ) cad_file = CadFile( id=uuid.uuid4(), original_name="bearing.step", stored_path=str(step_path), file_hash=f"hash-{uuid.uuid4().hex}", parsed_objects={"objects": ["InnerRing", "OuterRing"]}, ) product = Product( id=uuid.uuid4(), pim_id=f"P-{uuid.uuid4().hex[:8]}", name="Bearing A", category_key="bearings", cad_file_id=cad_file.id, cad_file=cad_file, components=[ {"part_name": "InnerRing", "material": "Steel"}, {"part_name": "OuterRing", "material": "Rubber"}, ], cad_part_materials=( [] if with_blank_materials else [ {"part_name": "InnerRing", "material": "Steel raw"}, {"part_name": "OuterRing", "material": "Steel raw"}, ] ), ) output_type = OutputType( id=uuid.uuid4(), name=f"Still-{uuid.uuid4().hex[:6]}", renderer="blender", output_format="png", render_settings={"width": 1600, "height": 900}, ) order = Order( id=uuid.uuid4(), order_number=f"ORD-{uuid.uuid4().hex[:8]}", status=OrderStatus.processing, created_by=user.id, ) line = OrderLine( id=uuid.uuid4(), order_id=order.id, product_id=product.id, product=product, output_type_id=output_type.id, output_type=output_type, render_status="pending", ) session.add_all([user, cad_file, product, output_type, order, line]) session.flush() session.add( AssetLibrary( id=uuid.uuid4(), name="Default Library", blend_file_path="/libraries/materials.blend", is_active=True, ) ) session.add( RenderTemplate( id=uuid.uuid4(), name="Bearing Studio", category_key="bearings", blend_file_path="/templates/bearing.blend", original_filename="bearing.blend", target_collection="Product", material_replace_enabled=True, lighting_only=False, is_active=True, output_types=[output_type], ) ) session.commit() return line def test_execute_graph_workflow_persists_bridge_outputs_and_queues_render_task( sync_session, tmp_path, monkeypatch, ): from app.config import settings monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) queued_thumbnail: list[tuple[str, dict[str, str]]] = [] line = _seed_renderable_order_line(sync_session, tmp_path, with_blank_materials=True) monkeypatch.setattr( "app.domains.pipeline.tasks.render_thumbnail.regenerate_thumbnail.delay", lambda cad_file_id, part_colors: queued_thumbnail.append((cad_file_id, part_colors)), ) monkeypatch.setattr( "app.domains.rendering.workflow_runtime_services.extract_bbox_from_step_cadquery", lambda step_path: { "dimensions_mm": {"x": 12.5, "y": 20.0, "z": 7.5}, "bbox_center_mm": {"x": 6.25, "y": 10.0, "z": 3.75}, }, ) monkeypatch.setattr( "app.tasks.celery_app.celery_app.send_task", lambda task_name, args, kwargs: SimpleNamespace(id=f"task-{len(args)}"), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ {"id": "setup", "step": "order_line_setup", "params": {}}, {"id": "template", "step": "resolve_template", "params": {}}, {"id": "materials", "step": "material_map_resolve", "params": {}}, {"id": "autofill", "step": "auto_populate_materials", "params": {}}, {"id": "bbox", "step": "glb_bbox", "params": {}}, {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 1024}}, ], "edges": [ {"from": "setup", "to": "template"}, {"from": "template", "to": "materials"}, {"from": "materials", "to": "autofill"}, {"from": "autofill", "to": "bbox"}, {"from": "bbox", "to": "render"}, ], }, context_id=str(line.id), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=line.id, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} sync_session.refresh(line.product) assert dispatch_result.task_ids == ["task-1"] assert refreshed_run.status == "pending" assert refreshed_run.celery_task_id == "task-1" assert node_results["setup"].status == "completed" assert node_results["setup"].output["cad_file_id"] == str(line.product.cad_file_id) assert node_results["template"].status == "completed" assert node_results["template"].output["template_name"] == "Bearing Studio" assert node_results["materials"].status == "completed" assert node_results["materials"].output["material_map_count"] == 0 assert node_results["autofill"].status == "completed" assert node_results["autofill"].output["updated_product_count"] == 1 assert node_results["autofill"].output["queued_thumbnail_regeneration"] is True assert node_results["bbox"].status == "completed" assert node_results["bbox"].output["has_bbox"] is True assert node_results["render"].status == "queued" assert node_results["render"].output["task_id"] == "task-1" assert line.product.cad_part_materials == [ {"part_name": "InnerRing", "material": "Steel"}, {"part_name": "OuterRing", "material": "Rubber"}, ] assert queued_thumbnail == [ ( str(line.product.cad_file_id), {"InnerRing": "Steel", "OuterRing": "Rubber"}, ) ] def test_execute_graph_workflow_retries_bridge_node_and_persists_attempt_metadata( sync_session, monkeypatch, ): attempts = {"count": 0} def _flaky_prepare(_session, _context_id): attempts["count"] += 1 if attempts["count"] == 1: raise RuntimeError("temporary setup failure") return OrderLineRenderSetupResult(status="skip", reason="line_cancelled") monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context", _flaky_prepare, ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "setup", "step": "order_line_setup", "params": {"retry_policy": {"max_attempts": 2}}, }, ], "edges": [], }, context_id=str(uuid.uuid4()), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) dispatch_result = execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup") assert dispatch_result.task_ids == [] assert refreshed_run.status == "completed" assert setup_result.status == "skipped" assert setup_result.output["attempt_count"] == 2 assert setup_result.output["max_attempts"] == 2 assert setup_result.output["retry_state"] == "recovered" assert setup_result.output["last_error"] == "temporary setup failure" assert setup_result.output["retry_policy"]["max_attempts"] == 2 def test_execute_graph_workflow_marks_failed_node_with_retry_exhausted_metadata( sync_session, monkeypatch, ): monkeypatch.setattr( "app.domains.rendering.workflow_graph_runtime.prepare_order_line_render_context", lambda _session, _context_id: (_ for _ in ()).throw(RuntimeError("permanent setup failure")), ) workflow_context = prepare_workflow_context( { "version": 1, "nodes": [ { "id": "setup", "step": "order_line_setup", "params": { "retry_policy": {"max_attempts": 2}, "failure_policy": {"fallback_to_legacy": True}, }, }, ], "edges": [], }, context_id=str(uuid.uuid4()), execution_mode="graph", ) run = create_workflow_run( sync_session, workflow_def_id=None, order_line_id=None, workflow_context=workflow_context, ) with pytest.raises(WorkflowGraphRuntimeError, match="permanent setup failure"): execute_graph_workflow(sync_session, workflow_context) sync_session.commit() refreshed_run = sync_session.execute( select(WorkflowRun) .where(WorkflowRun.id == run.id) .options(selectinload(WorkflowRun.node_results)) ).scalar_one() setup_result = next(node for node in refreshed_run.node_results if node.node_name == "setup") assert setup_result.status == "failed" assert setup_result.output["attempt_count"] == 2 assert setup_result.output["max_attempts"] == 2 assert setup_result.output["retry_exhausted"] is True assert setup_result.output["last_error"] == "permanent setup failure" assert setup_result.output["failure_policy"]["fallback_to_legacy"] is True