diff --git a/backend/app/domains/pipeline/tasks/render_order_line.py b/backend/app/domains/pipeline/tasks/render_order_line.py index fe32d0c..f909deb 100644 --- a/backend/app/domains/pipeline/tasks/render_order_line.py +++ b/backend/app/domains/pipeline/tasks/render_order_line.py @@ -75,6 +75,7 @@ def render_order_line_task(self, order_line_id: str): from sqlalchemy.orm import Session from app.config import settings as app_settings from app.domains.rendering.workflow_runtime_services import ( + build_order_line_render_invocation, emit_order_line_render_notifications, persist_order_line_output, prepare_order_line_render_context, @@ -100,9 +101,6 @@ def render_order_line_task(self, order_line_id: str): line = setup.order_line cad_file = setup.cad_file - materials_source = setup.materials_source - usd_render_path = setup.usd_render_path - glb_reuse_path = setup.glb_reuse_path part_colors = setup.part_colors render_start = setup.render_start @@ -112,10 +110,7 @@ def render_order_line_task(self, order_line_id: str): emit=emit, ) template = template_context.template - material_library = template_context.material_library material_map = template_context.material_map - use_materials = template_context.use_materials - override_mat = template_context.override_material cad_name = cad_file.original_name if cad_file else "?" position_context = resolve_render_position_context(session, line, emit=emit) @@ -125,150 +120,62 @@ def render_order_line_task(self, order_line_id: str): focal_length_mm = position_context.focal_length_mm sensor_width_mm = position_context.sensor_width_mm - emit(order_line_id, f"Starting render for {cad_name} ({len(part_colors)} coloured parts)") - - # Determine if this is an animation output type - is_animation = bool(line.output_type and getattr(line.output_type, 'is_animation', False)) - - # Detect cinematic render type (render_settings.cinematic flag) - is_cinematic = bool( - line.output_type and - line.output_type.render_settings and - line.output_type.render_settings.get("cinematic") + render_invocation = build_order_line_render_invocation( + setup, + template_context=template_context, + position_context=position_context, ) + emit(order_line_id, f"Starting render for {cad_name} ({len(part_colors)} coloured parts)") + if getattr(line, "render_overrides", None): + emit(order_line_id, f"Render overrides active: {line.render_overrides}") + if ( + line.output_type + and line.output_type.render_settings + and render_invocation.samples is not None + and line.output_type.render_settings.get("samples") + and render_invocation.width is not None + and render_invocation.height is not None + ): + base_samples = int(line.output_type.render_settings["samples"]) + if render_invocation.samples < base_samples: + emit( + order_line_id, + f"Auto-scaled samples {base_samples} -> {render_invocation.samples} " + f"for {render_invocation.width}x{render_invocation.height}", + ) - # Determine output format/extension - out_ext = "jpg" - if line.output_type and line.output_type.output_format: - fmt = line.output_type.output_format.lower() - if fmt == "mp4": - out_ext = "mp4" - elif fmt == "webp": - out_ext = "webp" - elif fmt in ("png", "jpg", "jpeg"): - out_ext = "png" if fmt == "png" else "jpg" - - # Build meaningful output filename - import re - def _sanitize(s: str) -> str: - return re.sub(r'[^\w\-.]', '_', s.strip())[:100] - - product_name = line.product.name or line.product.pim_id or "product" - ot_name = line.output_type.name if line.output_type else "render" - filename = f"{_sanitize(product_name)}_{_sanitize(ot_name)}.{out_ext}" - - # Render to per-line output directory - from pathlib import Path as _Path - render_dir = _Path(app_settings.upload_dir) / "renders" / order_line_id - render_dir.mkdir(parents=True, exist_ok=True) - output_path = str(render_dir / filename) - - # Extract per-output-type render settings - render_width = None - render_height = None - render_engine = None - render_samples = None - noise_threshold = "" - denoiser = "" - denoising_input_passes = "" - denoising_prefilter = "" - denoising_quality = "" - denoising_use_gpu = "" - frame_count = 24 - fps = 25 - bg_color = "" - turntable_axis = "world_z" - if line.output_type and line.output_type.render_settings: - rs = line.output_type.render_settings - if rs.get("width"): - render_width = int(rs["width"]) - if rs.get("height"): - render_height = int(rs["height"]) - if rs.get("engine"): - render_engine = rs["engine"] - if rs.get("samples"): - render_samples = int(rs["samples"]) - if rs.get("frame_count"): - frame_count = int(rs["frame_count"]) - if rs.get("fps"): - fps = int(rs["fps"]) - bg_color = rs.get("bg_color", "") - turntable_axis = rs.get("turntable_axis", "world_z") - noise_threshold = str(rs.get("noise_threshold", "")) - denoiser = str(rs.get("denoiser", "")) - denoising_input_passes = str(rs.get("denoising_input_passes", "")) - denoising_prefilter = str(rs.get("denoising_prefilter", "")) - denoising_quality = str(rs.get("denoising_quality", "")) - denoising_use_gpu = str(rs.get("denoising_use_gpu", "")) - - # Auto-scale samples for lower resolutions (thumbnails/previews). - # Only applies when the output type provides both samples and dimensions. - if render_samples and render_width and render_height: - max_dim = max(render_width, render_height) - if max_dim <= 1024: - scaled = max(32, int(render_samples * max_dim / 2048)) - if scaled < render_samples: - emit(order_line_id, f"Auto-scaled samples {render_samples} \u2192 {scaled} for {render_width}x{render_height}") - render_samples = scaled - - transparent_bg = bool(line.output_type and line.output_type.transparent_bg) - cycles_device_val = (line.output_type.cycles_device or "auto") if line.output_type else "auto" - - # Apply per-line render overrides (format, resolution, samples, etc.) - _render_overrides = getattr(line, 'render_overrides', None) - if _render_overrides and isinstance(_render_overrides, dict): - if 'width' in _render_overrides: - render_width = int(_render_overrides['width']) - if 'height' in _render_overrides: - render_height = int(_render_overrides['height']) - if 'samples' in _render_overrides: - render_samples = int(_render_overrides['samples']) - if 'engine' in _render_overrides: - render_engine = _render_overrides['engine'] - if 'frame_count' in _render_overrides: - frame_count = int(_render_overrides['frame_count']) - if 'fps' in _render_overrides: - fps = int(_render_overrides['fps']) - if 'bg_color' in _render_overrides: - bg_color = _render_overrides['bg_color'] - if 'turntable_axis' in _render_overrides: - turntable_axis = _render_overrides['turntable_axis'] - if 'noise_threshold' in _render_overrides: - noise_threshold = str(_render_overrides['noise_threshold']) - if 'denoiser' in _render_overrides: - denoiser = str(_render_overrides['denoiser']) - if 'denoising_input_passes' in _render_overrides: - denoising_input_passes = str(_render_overrides['denoising_input_passes']) - if 'denoising_prefilter' in _render_overrides: - denoising_prefilter = str(_render_overrides['denoising_prefilter']) - if 'denoising_quality' in _render_overrides: - denoising_quality = str(_render_overrides['denoising_quality']) - if 'denoising_use_gpu' in _render_overrides: - denoising_use_gpu = str(_render_overrides['denoising_use_gpu']) - if 'transparent_bg' in _render_overrides: - transparent_bg = bool(_render_overrides['transparent_bg']) - if 'cycles_device' in _render_overrides: - cycles_device_val = _render_overrides['cycles_device'] - emit(order_line_id, f"Render overrides active: {_render_overrides}") - - # Apply output_format override (affects out_ext and filename) - if 'output_format' in _render_overrides: - fmt_override = _render_overrides['output_format'].lower() - if fmt_override == "mp4": - out_ext = "mp4" - elif fmt_override == "webp": - out_ext = "webp" - elif fmt_override in ("png", "jpg", "jpeg"): - out_ext = "png" if fmt_override == "png" else "jpg" - # Rebuild filename with new extension - filename = f"{_sanitize(product_name)}_{_sanitize(ot_name)}.{out_ext}" - output_path = str(render_dir / filename) - - # Build ordered part names list for index-based Blender matching - part_names_ordered = None - if cad_file and cad_file.parsed_objects: - part_names_ordered = cad_file.parsed_objects.get("objects", []) or None - + is_animation = render_invocation.is_animation + is_cinematic = render_invocation.is_cinematic + product_name = render_invocation.product_name + ot_name = render_invocation.output_type_name + output_path = render_invocation.output_path + _Path(output_path).parent.mkdir(parents=True, exist_ok=True) + render_width = render_invocation.width + render_height = render_invocation.height + render_engine = render_invocation.engine + render_samples = render_invocation.samples + frame_count = render_invocation.frame_count + fps = render_invocation.fps + bg_color = render_invocation.bg_color + turntable_axis = render_invocation.turntable_axis + noise_threshold = render_invocation.noise_threshold + denoiser = render_invocation.denoiser + denoising_input_passes = render_invocation.denoising_input_passes + denoising_prefilter = render_invocation.denoising_prefilter + denoising_quality = render_invocation.denoising_quality + denoising_use_gpu = render_invocation.denoising_use_gpu + transparent_bg = render_invocation.transparent_bg + cycles_device_val = render_invocation.cycles_device + part_names_ordered = render_invocation.part_names_ordered + material_override = render_invocation.material_override + target_collection = render_invocation.target_collection + template_path = render_invocation.template_path + material_library_path = render_invocation.material_library_path + camera_orbit = render_invocation.camera_orbit + lighting_only = render_invocation.lighting_only + shadow_catcher = render_invocation.shadow_catcher + usd_path = render_invocation.usd_path + step_path = _Path(cad_file.stored_path) tmpl_info = f" template={template.name}" if template else "" if is_cinematic: @@ -285,33 +192,22 @@ def render_order_line_task(self, order_line_id: str): from app.services.step_processor import _get_all_settings _sys = _get_all_settings() try: - service_data = render_cinematic_to_file( - step_path=_Path(cad_file.stored_path), + cinematic_kwargs = render_invocation.as_cinematic_renderer_kwargs( + step_path=step_path, output_path=_Path(output_path), - width=render_width or 1920, - height=render_height or 1080, - engine=render_engine or _sys.get("blender_engine", "cycles"), - samples=render_samples or int(_sys.get(f"blender_{render_engine or _sys.get('blender_engine','cycles')}_samples", 128)), + default_width=1920, + default_height=1080, + default_engine=_sys.get("blender_engine", "cycles"), + default_samples=int( + _sys.get( + f"blender_{render_engine or _sys.get('blender_engine', 'cycles')}_samples", + 128, + ) + ), smooth_angle=int(_sys.get("blender_smooth_angle", 30)), - cycles_device=cycles_device_val, - transparent_bg=transparent_bg, - part_colors=part_colors or None, - template_path=template.blend_file_path if template else None, - target_collection=template.target_collection if template else "Product", - material_library_path=material_library if use_materials else None, - material_map=material_map, - part_names_ordered=part_names_ordered, - lighting_only=bool(template.lighting_only) if template else False, - shadow_catcher=bool(template.shadow_catcher_enabled) if template else False, - rotation_x=rotation_x, - rotation_y=rotation_y, - rotation_z=rotation_z, - usd_path=usd_render_path, - focal_length_mm=focal_length_mm, - sensor_width_mm=sensor_width_mm, - material_override=override_mat, log_callback=lambda line: emit(order_line_id, line), ) + service_data = render_cinematic_to_file(**cinematic_kwargs) success = True render_log = { "renderer": "blender", @@ -352,37 +248,21 @@ def render_order_line_task(self, order_line_id: str): from app.services.step_processor import _get_all_settings _sys = _get_all_settings() try: - service_data = render_turntable_to_file( - step_path=_Path(cad_file.stored_path), + turntable_kwargs = render_invocation.as_turntable_renderer_kwargs( + step_path=step_path, output_path=_Path(output_path), - frame_count=frame_count, - fps=fps, - width=render_width or 1920, - height=render_height or 1920, - engine=render_engine or _sys.get("blender_engine", "cycles"), - samples=render_samples or int(_sys.get(f"blender_{render_engine or _sys.get('blender_engine','cycles')}_samples", 128)), + default_width=1920, + default_height=1920, + default_engine=_sys.get("blender_engine", "cycles"), + default_samples=int( + _sys.get( + f"blender_{render_engine or _sys.get('blender_engine', 'cycles')}_samples", + 128, + ) + ), smooth_angle=int(_sys.get("blender_smooth_angle", 30)), - cycles_device=cycles_device_val, - transparent_bg=transparent_bg, - bg_color=bg_color, - turntable_axis=turntable_axis, - part_colors=part_colors or None, - template_path=template.blend_file_path if template else None, - target_collection=template.target_collection if template else "Product", - material_library_path=material_library if use_materials else None, - material_map=material_map, - part_names_ordered=part_names_ordered, - lighting_only=bool(template.lighting_only) if template else False, - shadow_catcher=bool(template.shadow_catcher_enabled) if template else False, - rotation_x=rotation_x, - rotation_y=rotation_y, - rotation_z=rotation_z, - camera_orbit=bool(template.camera_orbit) if template else True, - usd_path=usd_render_path, - focal_length_mm=focal_length_mm, - sensor_width_mm=sensor_width_mm, - material_override=override_mat, ) + service_data = render_turntable_to_file(**turntable_kwargs) success = True render_log = { "renderer": "blender", @@ -414,43 +294,18 @@ def render_order_line_task(self, order_line_id: str): logger.error("Turntable render failed for %s: %s", order_line_id, exc) else: # ── Still image path ──────────────────────────────────────── - _render_path_label = "USD → Blender" if usd_render_path else "STEP → GLB → Blender" + _render_path_label = "USD → Blender" if usd_path else "STEP → GLB → Blender" emit(order_line_id, f"Calling renderer ({_render_path_label}) {render_width or 'default'}x{render_height or 'default'}{' [transparent]' if transparent_bg else ''}{f' engine={render_engine}' if render_engine else ''}{f' samples={render_samples}' if render_samples else ''}{tmpl_info}") pl.step_start("blender_still", {"width": render_width, "height": render_height}) from app.services.step_processor import render_to_file success, render_log = render_to_file( - step_path=cad_file.stored_path, - output_path=output_path, - part_colors=part_colors, - width=render_width, - height=render_height, - transparent_bg=transparent_bg, - engine=render_engine, - samples=render_samples, - template_path=template.blend_file_path if template else None, - target_collection=template.target_collection if template else "Product", - material_library_path=material_library if use_materials else None, - material_map=material_map, - part_names_ordered=part_names_ordered, - lighting_only=bool(template.lighting_only) if template else False, - shadow_catcher=bool(template.shadow_catcher_enabled) if template else False, - cycles_device=line.output_type.cycles_device if line.output_type else None, - rotation_x=rotation_x, - rotation_y=rotation_y, - rotation_z=rotation_z, - focal_length_mm=focal_length_mm, - sensor_width_mm=sensor_width_mm, - material_override=override_mat, - job_id=order_line_id, - order_line_id=order_line_id, - noise_threshold=noise_threshold, - denoiser=denoiser, - denoising_input_passes=denoising_input_passes, - denoising_prefilter=denoising_prefilter, - denoising_quality=denoising_quality, - denoising_use_gpu=denoising_use_gpu, - usd_path=usd_render_path, + **render_invocation.as_still_renderer_kwargs( + step_path=cad_file.stored_path, + output_path=output_path, + job_id=order_line_id, + order_line_id=order_line_id, + ) ) if success: pl.step_done("blender_still") diff --git a/backend/app/domains/rendering/workflow_graph_runtime.py b/backend/app/domains/rendering/workflow_graph_runtime.py index 438445d..a51bf7b 100644 --- a/backend/app/domains/rendering/workflow_graph_runtime.py +++ b/backend/app/domains/rendering/workflow_graph_runtime.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging import time +import uuid from dataclasses import dataclass, field from datetime import datetime from pathlib import Path @@ -10,7 +11,9 @@ from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session, selectinload +from app.config import settings from app.core.process_steps import StepName +from app.domains.products.models import CadFile from app.domains.rendering.models import WorkflowNodeResult, WorkflowRun from app.domains.rendering.workflow_executor import STEP_TASK_MAP, WorkflowContext, WorkflowDispatchResult from app.domains.rendering.workflow_node_registry import get_node_definition @@ -21,10 +24,12 @@ from app.domains.rendering.workflow_runtime_services import ( OrderLineRenderSetupResult, TemplateResolutionResult, auto_populate_materials_for_cad, + build_order_line_render_invocation, prepare_order_line_render_context, resolve_cad_bbox, resolve_order_line_material_map, resolve_order_line_template_context, + resolve_render_position_context, ) logger = logging.getLogger(__name__) @@ -37,6 +42,7 @@ class WorkflowGraphRuntimeError(RuntimeError): @dataclass(slots=True) class WorkflowGraphState: setup: OrderLineRenderSetupResult | None = None + cad_file: CadFile | None = None template: TemplateResolutionResult | None = None materials: MaterialResolutionResult | None = None auto_populate: AutoPopulateMaterialsResult | None = None @@ -52,6 +58,119 @@ _ORDER_LINE_RENDER_STEPS = { StepName.NOTIFY, } +_STILL_TASK_KEYS = { + "width", + "height", + "engine", + "samples", + "smooth_angle", + "cycles_device", + "transparent_bg", + "part_colors", + "template_path", + "target_collection", + "material_library_path", + "material_map", + "part_names_ordered", + "lighting_only", + "shadow_catcher", + "rotation_x", + "rotation_y", + "rotation_z", + "noise_threshold", + "denoiser", + "denoising_input_passes", + "denoising_prefilter", + "denoising_quality", + "denoising_use_gpu", + "usd_path", + "focal_length_mm", + "sensor_width_mm", + "material_override", + "render_engine", + "resolution", +} + +_TURNTABLE_TASK_KEYS = { + "output_name", + "engine", + "samples", + "smooth_angle", + "cycles_device", + "transparent_bg", + "width", + "height", + "frame_count", + "fps", + "turntable_degrees", + "turntable_axis", + "bg_color", + "template_path", + "target_collection", + "material_library_path", + "material_map", + "part_names_ordered", + "lighting_only", + "shadow_catcher", + "camera_orbit", + "rotation_x", + "rotation_y", + "rotation_z", + "focal_length_mm", + "sensor_width_mm", + "material_override", +} + +_THUMBNAIL_TASK_KEYS = { + "renderer", + "render_engine", + "samples", + "width", + "height", + "transparent_bg", +} + +_AUTHORITATIVE_RENDER_SETTING_KEYS = { + "render_engine", + "engine", + "samples", + "width", + "height", + "transparent_bg", + "cycles_device", + "noise_threshold", + "denoiser", + "denoising_input_passes", + "denoising_prefilter", + "denoising_quality", + "denoising_use_gpu", + "camera_orbit", + "focal_length_mm", + "sensor_width_mm", + "bg_color", +} + + +def _filter_graph_render_overrides(step: StepName, params: dict[str, Any]) -> dict[str, Any]: + normalized = dict(params) + use_custom_render_settings = bool(normalized.pop("use_custom_render_settings", False)) + if use_custom_render_settings: + return normalized + + filtered = dict(normalized) + for key in _AUTHORITATIVE_RENDER_SETTING_KEYS: + if key in filtered: + filtered.pop(key, None) + + if step == StepName.BLENDER_TURNTABLE: + # Turntable timing remains workflow-specific even when render quality inherits from the output type. + for key in ("fps", "duration_s", "frame_count", "turntable_degrees", "turntable_axis"): + value = normalized.get(key) + if value not in (None, ""): + filtered[key] = value + + return filtered + def find_unsupported_graph_nodes(workflow_context: WorkflowContext) -> list[str]: unsupported: list[str] = [] @@ -119,6 +238,7 @@ def execute_graph_workflow( session=session, workflow_context=workflow_context, state=state, + node=node, node_params=node.params, ) except Exception as exc: @@ -208,14 +328,12 @@ def execute_graph_workflow( from app.tasks.celery_app import celery_app - task_kwargs = dict(node.params) - task_kwargs["workflow_run_id"] = str(workflow_context.workflow_run_id) - task_kwargs["workflow_node_id"] = node.id - if workflow_context.execution_mode == "shadow": - task_kwargs["publish_asset_enabled"] = False - task_kwargs["emit_events"] = False - task_kwargs["job_document_enabled"] = False - task_kwargs["output_name_suffix"] = f"shadow-{str(workflow_context.workflow_run_id)[:8]}" + task_kwargs = _build_task_kwargs( + session=session, + workflow_context=workflow_context, + state=state, + node=node, + ) result = celery_app.send_task( task_name, @@ -228,10 +346,19 @@ def execute_graph_workflow( metadata["attempt_count"] = 1 metadata["max_attempts"] = retry_policy["max_attempts"] metadata["execution_mode"] = workflow_context.execution_mode + predicted_output = _predict_task_output_metadata( + workflow_context=workflow_context, + state=state, + node=node, + task_kwargs=task_kwargs, + ) + if predicted_output: + metadata.update(predicted_output) node_result.status = "queued" node_result.output = metadata node_result.log = None node_result.duration_s = None + state.node_outputs[node.id] = dict(metadata) session.flush() task_ids.append(result.id) node_task_ids[node.id] = result.id @@ -377,13 +504,330 @@ def _serialize_bbox_result(result: BBoxResolutionResult) -> dict[str, Any]: } +def _serialize_cad_file_result(cad_file: CadFile) -> dict[str, Any]: + parsed_objects = cad_file.parsed_objects or {} + objects = parsed_objects.get("objects") + object_count = len(objects) if isinstance(objects, list) else None + return { + "cad_file_id": str(cad_file.id), + "step_path": cad_file.stored_path, + "original_name": cad_file.original_name, + "processing_status": cad_file.processing_status.value if getattr(cad_file, "processing_status", None) else None, + "object_count": object_count, + "has_parsed_objects": bool(parsed_objects), + "gltf_path": cad_file.gltf_path, + } + + +def _workflow_node_ids(workflow_context: WorkflowContext, step: StepName) -> list[str]: + return [node.id for node in workflow_context.ordered_nodes if node.step == step] + + +def _workflow_node_map(workflow_context: WorkflowContext) -> dict[str, Any]: + return {node.id: node for node in workflow_context.ordered_nodes} + + +def _upstream_node_ids(workflow_context: WorkflowContext, node_id: str) -> list[str]: + return [edge.from_node for edge in workflow_context.edges if edge.to_node == node_id] + + +def _downstream_node_ids(workflow_context: WorkflowContext, node_id: str) -> list[str]: + return [edge.to_node for edge in workflow_context.edges if edge.from_node == node_id] + + +def _connected_node_ids_by_step( + workflow_context: WorkflowContext, + *, + node_id: str, + step: StepName, + direction: str, +) -> list[str]: + node_map = _workflow_node_map(workflow_context) + if direction == "upstream": + candidate_ids = _upstream_node_ids(workflow_context, node_id) + elif direction == "downstream": + candidate_ids = _downstream_node_ids(workflow_context, node_id) + else: + raise ValueError(f"Unsupported graph direction: {direction}") + return [ + candidate_id + for candidate_id in candidate_ids + if node_map.get(candidate_id) is not None and node_map[candidate_id].step == step + ] + + +def _connected_upstream_artifacts( + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node_id: str, +) -> list[dict[str, Any]]: + preferred_upstream_ids = set(_upstream_node_ids(workflow_context, node_id)) + artifacts = _collect_upstream_artifacts(state) + if not preferred_upstream_ids: + return [] + return [artifact for artifact in artifacts if artifact["node_id"] in preferred_upstream_ids] + + +def _predict_task_output_metadata( + *, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node, + task_kwargs: dict[str, Any], +) -> dict[str, Any]: + if node.step == StepName.THUMBNAIL_SAVE: + renderer = str(task_kwargs.get("renderer") or "blender") + output_format = "png" if renderer == "threejs" or bool(task_kwargs.get("transparent_bg")) else "jpg" + output_dir = Path(settings.upload_dir) / "thumbnails" + return { + "artifact_role": "thumbnail_output", + "predicted_output_path": str(output_dir / f"{workflow_context.context_id}.{output_format}"), + "predicted_asset_type": "thumbnail", + "publish_asset_enabled": True, + "graph_authoritative_output_enabled": True, + "graph_output_node_ids": [node.id], + "notify_handoff_enabled": False, + } + + if state.setup is None or state.setup.order_line is None or state.setup.cad_file is None: + return {} + + step_path = Path(state.setup.cad_file.stored_path) + output_name_suffix = task_kwargs.get("output_name_suffix") + order_line_id = str(state.setup.order_line.id) + + if node.step == StepName.BLENDER_STILL: + output_dir = step_path.parent / "renders" + output_filename = f"line_{order_line_id}.png" + if output_name_suffix: + output_filename = f"line_{order_line_id}_{output_name_suffix}.png" + return { + "artifact_role": "render_output", + "predicted_output_path": str(output_dir / output_filename), + "predicted_asset_type": "still", + "publish_asset_enabled": bool(task_kwargs.get("publish_asset_enabled", True)), + "graph_authoritative_output_enabled": bool( + task_kwargs.get("graph_authoritative_output_enabled", False) + ), + "graph_output_node_ids": list(task_kwargs.get("graph_output_node_ids") or []), + "notify_handoff_enabled": bool(task_kwargs.get("emit_legacy_notifications", False)), + "graph_notify_node_ids": list(task_kwargs.get("graph_notify_node_ids") or []), + } + + if node.step == StepName.EXPORT_BLEND: + output_filename = f"{step_path.stem}_production.blend" + if output_name_suffix: + output_filename = f"{step_path.stem}_production_{output_name_suffix}.blend" + return { + "artifact_role": "blend_export", + "predicted_output_path": str(step_path.parent / output_filename), + "predicted_asset_type": "blend_production", + "publish_asset_enabled": bool(task_kwargs.get("publish_asset_enabled", True)), + "graph_authoritative_output_enabled": bool( + task_kwargs.get("graph_authoritative_output_enabled", False) + ), + "graph_output_node_ids": list(task_kwargs.get("graph_output_node_ids") or []), + "notify_handoff_enabled": bool(task_kwargs.get("emit_legacy_notifications", False)), + "graph_notify_node_ids": list(task_kwargs.get("graph_notify_node_ids") or []), + } + + if node.step == StepName.BLENDER_TURNTABLE: + output_name = str(task_kwargs.get("output_name") or "turntable") + output_name_suffix = task_kwargs.get("output_name_suffix") + if output_name_suffix: + output_name = f"{output_name}_{output_name_suffix}" + output_dir = task_kwargs.get("output_dir") + predicted_output_path = None + if isinstance(output_dir, str) and output_dir.strip(): + predicted_output_path = str(Path(output_dir) / f"{output_name}.mp4") + else: + predicted_output_path = str(step_path.parent / "renders" / f"{output_name}.mp4") + return { + "artifact_role": "turntable_output", + "predicted_output_path": predicted_output_path, + "predicted_asset_type": "turntable", + "publish_asset_enabled": bool(task_kwargs.get("publish_asset_enabled", True)), + "graph_authoritative_output_enabled": bool( + task_kwargs.get("graph_authoritative_output_enabled", False) + ), + "graph_output_node_ids": list(task_kwargs.get("graph_output_node_ids") or []), + "notify_handoff_enabled": bool(task_kwargs.get("emit_legacy_notifications", False)), + "graph_notify_node_ids": list(task_kwargs.get("graph_notify_node_ids") or []), + } + + return {} + + +def _collect_upstream_artifacts(state: WorkflowGraphState) -> list[dict[str, Any]]: + artifacts: list[dict[str, Any]] = [] + for node_id, output in state.node_outputs.items(): + predicted_output_path = output.get("predicted_output_path") + artifact_role = output.get("artifact_role") + if not artifact_role and not predicted_output_path: + continue + artifacts.append( + { + "node_id": node_id, + "artifact_role": artifact_role, + "predicted_output_path": predicted_output_path, + "predicted_asset_type": output.get("predicted_asset_type"), + "publish_asset_enabled": bool(output.get("publish_asset_enabled", False)), + "graph_authoritative_output_enabled": bool( + output.get("graph_authoritative_output_enabled", False) + ), + "graph_output_node_ids": list(output.get("graph_output_node_ids") or []), + "notify_handoff_enabled": bool(output.get("notify_handoff_enabled", False)), + "task_id": output.get("task_id"), + **( + {"graph_notify_node_ids": list(output.get("graph_notify_node_ids") or [])} + if output.get("graph_notify_node_ids") + else {} + ), + } + ) + return artifacts + + +def _resolve_cad_file_context( + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, +) -> CadFile: + if state.cad_file is not None: + return state.cad_file + + try: + cad_file_id = workflow_context.context_id + except AttributeError as exc: + raise WorkflowGraphRuntimeError("cad_file context_id is missing") from exc + + try: + parsed_cad_file_id = uuid.UUID(cad_file_id) + except ValueError as exc: + raise WorkflowGraphRuntimeError(f"cad_file context is not a valid UUID: {cad_file_id}") from exc + + cad_file = session.get(CadFile, parsed_cad_file_id) + if cad_file is None: + raise WorkflowGraphRuntimeError(f"cad_file context not found: {cad_file_id}") + + state.cad_file = cad_file + return cad_file + + +def _resolve_thumbnail_request( + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node_id: str, +) -> dict[str, Any] | None: + preferred_upstream_ids = set(_upstream_node_ids(workflow_context, node_id)) + if preferred_upstream_ids: + for upstream_node in reversed(workflow_context.ordered_nodes): + if upstream_node.id not in preferred_upstream_ids: + continue + output = state.node_outputs.get(upstream_node.id) + if output and output.get("thumbnail_request") is True: + return output + for output in reversed(list(state.node_outputs.values())): + if output.get("thumbnail_request") is True: + return output + return None + + +def _build_task_kwargs( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node, +) -> dict[str, Any]: + task_kwargs = dict(node.params) + connected_output_node_ids: list[str] = [] + connected_notify_node_ids: list[str] = [] + render_defaults: dict[str, Any] = {} + + if state.setup is not None and state.setup.is_ready and state.setup.order_line is not None: + render_invocation = build_order_line_render_invocation( + state.setup, + template_context=state.template, + position_context=resolve_render_position_context(session, state.setup.order_line), + material_context=state.materials, + ) + render_defaults = render_invocation.task_defaults() + + if node.step == StepName.BLENDER_STILL: + task_kwargs = _filter_graph_render_overrides(StepName.BLENDER_STILL, task_kwargs) + task_kwargs = { + key: value + for key, value in { + **render_defaults, + **task_kwargs, + }.items() + if key in _STILL_TASK_KEYS + } + elif node.step == StepName.BLENDER_TURNTABLE: + task_kwargs = _filter_graph_render_overrides(StepName.BLENDER_TURNTABLE, task_kwargs) + task_kwargs = { + key: value + for key, value in { + **render_defaults, + **task_kwargs, + }.items() + if key in _TURNTABLE_TASK_KEYS + } + elif node.step == StepName.THUMBNAIL_SAVE: + thumbnail_request = _resolve_thumbnail_request(workflow_context, state, node.id) or {} + task_kwargs = { + key: value + for key, value in { + **thumbnail_request, + **task_kwargs, + }.items() + if key in _THUMBNAIL_TASK_KEYS + } + + task_kwargs["workflow_run_id"] = str(workflow_context.workflow_run_id) + task_kwargs["workflow_node_id"] = node.id + if workflow_context.execution_mode == "graph" and node.step in { + StepName.BLENDER_STILL, + StepName.EXPORT_BLEND, + StepName.BLENDER_TURNTABLE, + }: + connected_output_node_ids = _connected_node_ids_by_step( + workflow_context, + node_id=node.id, + step=StepName.OUTPUT_SAVE, + direction="downstream", + ) + connected_notify_node_ids = _connected_node_ids_by_step( + workflow_context, + node_id=node.id, + step=StepName.NOTIFY, + direction="downstream", + ) + if connected_output_node_ids: + task_kwargs["publish_asset_enabled"] = False + task_kwargs["graph_authoritative_output_enabled"] = True + task_kwargs["graph_output_node_ids"] = connected_output_node_ids + if connected_notify_node_ids: + task_kwargs["emit_legacy_notifications"] = True + task_kwargs["graph_notify_node_ids"] = connected_notify_node_ids + if workflow_context.execution_mode == "shadow": + task_kwargs["publish_asset_enabled"] = False + task_kwargs["emit_events"] = False + task_kwargs["job_document_enabled"] = False + task_kwargs["output_name_suffix"] = f"shadow-{str(workflow_context.workflow_run_id)[:8]}" + return task_kwargs + + def _execute_order_line_setup( *, session: Session, workflow_context: WorkflowContext, state: WorkflowGraphState, + node, node_params: dict[str, Any], ) -> tuple[dict[str, Any], str, str | None]: + del node del node_params shadow_mode = workflow_context.execution_mode == "shadow" if shadow_mode: @@ -409,8 +853,10 @@ def _execute_resolve_template( session: Session, workflow_context: WorkflowContext, state: WorkflowGraphState, + node, node_params: dict[str, Any], ) -> tuple[dict[str, Any], str, str | None]: + del node del workflow_context, node_params if state.setup is None or not state.setup.is_ready: if state.setup is not None and state.setup.status == "skip": @@ -426,8 +872,10 @@ def _execute_material_map_resolve( session: Session, workflow_context: WorkflowContext, state: WorkflowGraphState, + node, node_params: dict[str, Any], ) -> tuple[dict[str, Any], str, str | None]: + del node del session, workflow_context, node_params if state.setup is None or not state.setup.is_ready: if state.setup is not None and state.setup.status == "skip": @@ -457,8 +905,10 @@ def _execute_auto_populate_materials( session: Session, workflow_context: WorkflowContext, state: WorkflowGraphState, + node, node_params: dict[str, Any], ) -> tuple[dict[str, Any], str, str | None]: + del node del node_params if state.setup is None or state.setup.cad_file is None: if state.setup is not None and state.setup.status == "skip": @@ -487,8 +937,10 @@ def _execute_glb_bbox( session: Session, workflow_context: WorkflowContext, state: WorkflowGraphState, + node, node_params: dict[str, Any], ) -> tuple[dict[str, Any], str, str | None]: + del node del session, workflow_context if state.setup is None or state.setup.cad_file is None: if state.setup is not None and state.setup.status == "skip": @@ -510,10 +962,198 @@ def _execute_glb_bbox( return _serialize_bbox_result(result), "completed", None +def _execute_resolve_step_path( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + del node + del node_params + cad_file = _resolve_cad_file_context(session, workflow_context, state) + return _serialize_cad_file_result(cad_file), "completed", None + + +def _execute_stl_cache_generate( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + del node + del node_params + cad_file = _resolve_cad_file_context(session, workflow_context, state) + step_path = Path(cad_file.stored_path) + stl_dir = step_path.parent / "stl_cache" + payload = _serialize_cad_file_result(cad_file) + payload.update( + { + "cache_mode": "compatibility_noop", + "cache_required": False, + "stl_cache_dir": str(stl_dir), + "reason": "HartOMat CAD graph uses direct OCC/GLB export instead of legacy STL cache generation.", + } + ) + return payload, "completed", None + + +def _execute_thumbnail_render_request( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node, + node_params: dict[str, Any], + renderer: str, +) -> tuple[dict[str, Any], str, str | None]: + del node + cad_file = _resolve_cad_file_context(session, workflow_context, state) + payload: dict[str, Any] = { + "cad_file_id": str(cad_file.id), + "step_path": cad_file.stored_path, + "renderer": renderer, + "thumbnail_request": True, + } + for key in ("width", "height", "transparent_bg", "render_engine", "samples"): + value = node_params.get(key) + if value not in (None, ""): + payload[key] = value + return payload, "completed", None + + +def _execute_blender_thumbnail_render( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + return _execute_thumbnail_render_request( + session=session, + workflow_context=workflow_context, + state=state, + node=node, + node_params=node_params, + renderer="blender", + ) + + +def _execute_threejs_thumbnail_render( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + return _execute_thumbnail_render_request( + session=session, + workflow_context=workflow_context, + state=state, + node=node, + node_params=node_params, + renderer="threejs", + ) + + +def _execute_output_save( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + del session, node_params + if state.setup is None or state.setup.order_line is None: + raise WorkflowGraphRuntimeError("output_save requires an order_line_setup result") + + if state.setup.status == "skip": + return _serialize_setup_result(state.setup), "skipped", state.setup.reason + if not state.setup.is_ready: + return _serialize_setup_result(state.setup), "failed", state.setup.reason or "output_save_blocked" + + order_line = state.setup.order_line + payload: dict[str, Any] = { + "order_line_id": str(order_line.id), + "authoritative_result_path": order_line.result_path, + "shadow_mode": workflow_context.execution_mode == "shadow", + } + upstream_artifacts = _connected_upstream_artifacts(workflow_context, state, node.id) + if workflow_context.execution_mode == "shadow": + payload["publication_mode"] = "shadow_observer_only" + elif any(artifact["publish_asset_enabled"] for artifact in upstream_artifacts): + payload["publication_mode"] = "deferred_to_render_task" + else: + payload["publication_mode"] = "awaiting_graph_authoritative_save" + if upstream_artifacts: + payload["artifact_count"] = len(upstream_artifacts) + payload["upstream_artifacts"] = upstream_artifacts + if state.template is not None and state.template.template is not None: + payload["template_name"] = state.template.template.name + if state.materials is not None: + payload["material_map_count"] = len(state.materials.material_map or {}) + return payload, "completed", None + + +def _execute_notify( + *, + session: Session, + workflow_context: WorkflowContext, + state: WorkflowGraphState, + node, + node_params: dict[str, Any], +) -> tuple[dict[str, Any], str, str | None]: + del session, node_params + if state.setup is None or state.setup.order_line is None: + raise WorkflowGraphRuntimeError("notify requires an order_line_setup result") + + if state.setup.status == "skip": + return _serialize_setup_result(state.setup), "skipped", state.setup.reason + if not state.setup.is_ready: + return _serialize_setup_result(state.setup), "failed", state.setup.reason or "notify_blocked" + + payload: dict[str, Any] = { + "order_line_id": str(state.setup.order_line.id), + "shadow_mode": workflow_context.execution_mode == "shadow", + "channel": "audit_log", + } + + if workflow_context.execution_mode == "shadow": + payload["notification_mode"] = "shadow_suppressed" + return payload, "skipped", "shadow mode suppresses user notifications" + + connected_artifacts = _connected_upstream_artifacts(workflow_context, state, node.id) + armed_node_ids = [ + artifact["node_id"] + for artifact in connected_artifacts + if artifact["notify_handoff_enabled"] + ] + if not armed_node_ids: + payload["notification_mode"] = "not_armed" + return payload, "skipped", "No graph render task is configured for notification handoff" + + payload["notification_mode"] = "deferred_to_render_task" + payload["armed_node_ids"] = armed_node_ids + payload["armed_node_count"] = len(armed_node_ids) + return payload, "completed", None + + _BRIDGE_EXECUTORS = { + StepName.RESOLVE_STEP_PATH: _execute_resolve_step_path, + StepName.BLENDER_RENDER: _execute_blender_thumbnail_render, + StepName.THREEJS_RENDER: _execute_threejs_thumbnail_render, StepName.ORDER_LINE_SETUP: _execute_order_line_setup, StepName.RESOLVE_TEMPLATE: _execute_resolve_template, StepName.MATERIAL_MAP_RESOLVE: _execute_material_map_resolve, StepName.AUTO_POPULATE_MATERIALS: _execute_auto_populate_materials, StepName.GLB_BBOX: _execute_glb_bbox, + StepName.STL_CACHE_GENERATE: _execute_stl_cache_generate, + StepName.OUTPUT_SAVE: _execute_output_save, + StepName.NOTIFY: _execute_notify, } diff --git a/backend/app/domains/rendering/workflow_runtime_services.py b/backend/app/domains/rendering/workflow_runtime_services.py index 37b16ea..cfefbee 100644 --- a/backend/app/domains/rendering/workflow_runtime_services.py +++ b/backend/app/domains/rendering/workflow_runtime_services.py @@ -1,7 +1,9 @@ from __future__ import annotations import logging +import re import shutil +import uuid from dataclasses import dataclass, field from datetime import datetime from pathlib import Path @@ -11,10 +13,17 @@ from sqlalchemy import select, update as sql_update from sqlalchemy.orm import Session, joinedload from app.config import settings as app_settings +from app.core.render_paths import resolve_result_path, result_path_to_storage_key from app.domains.media.models import MediaAsset, MediaAssetType from app.domains.orders.models import Order, OrderLine, OrderStatus from app.domains.products.models import CadFile, Product -from app.domains.rendering.models import GlobalRenderPosition, ProductRenderPosition, RenderTemplate +from app.domains.rendering.output_type_contracts import merge_output_type_invocation_overrides +from app.domains.rendering.models import ( + GlobalRenderPosition, + ProductRenderPosition, + RenderTemplate, + WorkflowRun, +) from app.services.material_service import resolve_material_map from app.services.step_processor import build_part_colors from app.services.template_service import ( @@ -108,6 +117,216 @@ class OutputSaveResult: asset_type: MediaAssetType | None = None +@dataclass(slots=True) +class OrderLineRenderInvocation: + product_name: str + output_type_name: str + output_extension: str + output_filename: str + output_path: str + is_animation: bool + is_cinematic: bool + width: int | None = None + height: int | None = None + engine: str | None = None + samples: int | None = None + frame_count: int = 24 + fps: int = 25 + bg_color: str = "" + turntable_axis: str = "world_z" + noise_threshold: str = "" + denoiser: str = "" + denoising_input_passes: str = "" + denoising_prefilter: str = "" + denoising_quality: str = "" + denoising_use_gpu: str = "" + transparent_bg: bool = False + cycles_device: str = "auto" + part_colors: dict[str, str] = field(default_factory=dict) + part_names_ordered: list[str] | None = None + template_path: str | None = None + target_collection: str = "Product" + material_library_path: str | None = None + material_map: dict[str, str] | None = None + lighting_only: bool = False + shadow_catcher: bool = False + camera_orbit: bool = True + rotation_x: float = 0.0 + rotation_y: float = 0.0 + rotation_z: float = 0.0 + focal_length_mm: float | None = None + sensor_width_mm: float | None = None + usd_path: str | None = None + material_override: str | None = None + + def task_defaults(self) -> dict[str, Any]: + payload: dict[str, Any] = { + "transparent_bg": self.transparent_bg, + "cycles_device": self.cycles_device, + "part_colors": self.part_colors, + "target_collection": self.target_collection, + "lighting_only": self.lighting_only, + "shadow_catcher": self.shadow_catcher, + "camera_orbit": self.camera_orbit, + "rotation_x": self.rotation_x, + "rotation_y": self.rotation_y, + "rotation_z": self.rotation_z, + "frame_count": self.frame_count, + "fps": self.fps, + "bg_color": self.bg_color, + "turntable_axis": self.turntable_axis, + "noise_threshold": self.noise_threshold, + "denoiser": self.denoiser, + "denoising_input_passes": self.denoising_input_passes, + "denoising_prefilter": self.denoising_prefilter, + "denoising_quality": self.denoising_quality, + "denoising_use_gpu": self.denoising_use_gpu, + } + optional_values = { + "width": self.width, + "height": self.height, + "engine": self.engine, + "samples": self.samples, + "template_path": self.template_path, + "material_library_path": self.material_library_path, + "material_map": self.material_map, + "part_names_ordered": self.part_names_ordered, + "focal_length_mm": self.focal_length_mm, + "sensor_width_mm": self.sensor_width_mm, + "usd_path": self.usd_path, + "material_override": self.material_override, + } + for key, value in optional_values.items(): + if value not in (None, ""): + payload[key] = value + return payload + + def as_still_renderer_kwargs( + self, + *, + step_path: str, + output_path: str, + job_id: str | None = None, + order_line_id: str | None = None, + ) -> dict[str, Any]: + return { + "step_path": step_path, + "output_path": output_path, + "part_colors": self.part_colors or None, + "width": self.width, + "height": self.height, + "transparent_bg": self.transparent_bg, + "engine": self.engine, + "samples": self.samples, + "template_path": self.template_path, + "target_collection": self.target_collection, + "material_library_path": self.material_library_path, + "material_map": self.material_map, + "part_names_ordered": self.part_names_ordered, + "lighting_only": self.lighting_only, + "shadow_catcher": self.shadow_catcher, + "cycles_device": self.cycles_device, + "rotation_x": self.rotation_x, + "rotation_y": self.rotation_y, + "rotation_z": self.rotation_z, + "job_id": job_id, + "noise_threshold": self.noise_threshold, + "denoiser": self.denoiser, + "denoising_input_passes": self.denoising_input_passes, + "denoising_prefilter": self.denoising_prefilter, + "denoising_quality": self.denoising_quality, + "denoising_use_gpu": self.denoising_use_gpu, + "order_line_id": order_line_id, + "usd_path": self.usd_path, + "focal_length_mm": self.focal_length_mm, + "sensor_width_mm": self.sensor_width_mm, + "material_override": self.material_override, + } + + def as_turntable_renderer_kwargs( + self, + *, + step_path: Path, + output_path: Path, + smooth_angle: int, + default_width: int, + default_height: int, + default_engine: str, + default_samples: int, + ) -> dict[str, Any]: + return { + "step_path": step_path, + "output_path": output_path, + "frame_count": self.frame_count, + "fps": self.fps, + "width": self.width or default_width, + "height": self.height or default_height, + "engine": self.engine or default_engine, + "samples": self.samples or default_samples, + "smooth_angle": smooth_angle, + "cycles_device": self.cycles_device, + "transparent_bg": self.transparent_bg, + "bg_color": self.bg_color, + "turntable_axis": self.turntable_axis, + "part_colors": self.part_colors or None, + "template_path": self.template_path, + "target_collection": self.target_collection, + "material_library_path": self.material_library_path, + "material_map": self.material_map, + "part_names_ordered": self.part_names_ordered, + "lighting_only": self.lighting_only, + "shadow_catcher": self.shadow_catcher, + "rotation_x": self.rotation_x, + "rotation_y": self.rotation_y, + "rotation_z": self.rotation_z, + "camera_orbit": self.camera_orbit, + "usd_path": self.usd_path, + "focal_length_mm": self.focal_length_mm, + "sensor_width_mm": self.sensor_width_mm, + "material_override": self.material_override, + } + + def as_cinematic_renderer_kwargs( + self, + *, + step_path: Path, + output_path: Path, + smooth_angle: int, + default_width: int, + default_height: int, + default_engine: str, + default_samples: int, + log_callback: Callable[[str], None] | None = None, + ) -> dict[str, Any]: + return { + "step_path": step_path, + "output_path": output_path, + "width": self.width or default_width, + "height": self.height or default_height, + "engine": self.engine or default_engine, + "samples": self.samples or default_samples, + "smooth_angle": smooth_angle, + "cycles_device": self.cycles_device, + "transparent_bg": self.transparent_bg, + "part_colors": self.part_colors or None, + "template_path": self.template_path, + "target_collection": self.target_collection, + "material_library_path": self.material_library_path, + "material_map": self.material_map, + "part_names_ordered": self.part_names_ordered, + "lighting_only": self.lighting_only, + "shadow_catcher": self.shadow_catcher, + "rotation_x": self.rotation_x, + "rotation_y": self.rotation_y, + "rotation_z": self.rotation_z, + "usd_path": self.usd_path, + "focal_length_mm": self.focal_length_mm, + "sensor_width_mm": self.sensor_width_mm, + "material_override": self.material_override, + "log_callback": log_callback, + } + + def _emit(emit: EmitFn, order_line_id: str, message: str, level: str | None = None) -> None: if emit is None: return @@ -118,14 +337,42 @@ def _emit(emit: EmitFn, order_line_id: str, message: str, level: str | None = No def _resolve_asset_path(storage_key: str | None) -> Path | None: - if not storage_key: - return None - candidate = Path(app_settings.upload_dir) / storage_key - if candidate.exists(): - return candidate + return resolve_result_path(storage_key) + + +def _usd_master_refresh_reason(cad_file: CadFile) -> str | None: + resolved = cad_file.resolved_material_assignments + if not isinstance(resolved, dict) or not resolved: + return "missing resolved material assignments" + + canonical_materials: list[str] = [] + for meta in resolved.values(): + if not isinstance(meta, dict): + continue + canonical = meta.get("canonical_material") + if isinstance(canonical, str) and canonical.strip(): + canonical_materials.append(canonical.strip()) + + if not canonical_materials: + return "missing canonical material metadata" + + if any(material.upper().startswith("SCHAEFFLER_") for material in canonical_materials): + return "legacy Schaeffler material metadata" + return None +def _queue_usd_master_refresh(cad_file_id: str) -> bool: + try: + from app.tasks.step_tasks import generate_usd_master_task + + generate_usd_master_task.delay(cad_file_id) + return True + except Exception: + logger.exception("render_order_line: failed to queue usd_master refresh for cad %s", cad_file_id) + return False + + def extract_bbox_from_glb(glb_path: str) -> dict[str, dict[str, float]] | None: """Extract a bounding box from a GLB file in meters and convert to mm.""" try: @@ -207,8 +454,7 @@ def resolve_cad_bbox( def _normalize_storage_key(output_path: str) -> str: - upload_prefix = str(app_settings.upload_dir).rstrip("/") + "/" - return output_path[len(upload_prefix):] if output_path.startswith(upload_prefix) else output_path + return result_path_to_storage_key(output_path) or output_path def _resolve_output_asset_type(output_path: str) -> MediaAssetType: @@ -218,6 +464,8 @@ def _resolve_output_asset_type(output_path: str) -> MediaAssetType: def _resolve_output_mime_type(output_path: str) -> str: extension = output_path.rsplit(".", 1)[-1].lower() if "." in output_path else "bin" + if extension == "blend": + return "application/x-blender" if extension in ("mp4", "webm"): return "video/mp4" if extension == "webp": @@ -227,6 +475,333 @@ def _resolve_output_mime_type(output_path: str) -> str: return "image/png" +def _sanitize_public_output_name(value: str) -> str: + sanitized = re.sub(r"[^\w\-.]", "_", value.strip()) + return sanitized[:100] or "output" + + +def _coerce_int(value: Any) -> int | None: + if value in (None, ""): + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _coerce_bool(value: Any) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.strip().lower() in {"1", "true", "yes", "on"} + return bool(value) + + +def _resolve_render_output_extension(line: OrderLine) -> str: + output_type = line.output_type + output_extension = "jpg" + if output_type is not None and output_type.output_format: + fmt = str(output_type.output_format).lower() + if fmt == "mp4": + output_extension = "mp4" + elif fmt == "webp": + output_extension = "webp" + elif fmt in {"png", "jpg", "jpeg"}: + output_extension = "png" if fmt == "png" else "jpg" + + render_overrides = getattr(line, "render_overrides", None) + if isinstance(render_overrides, dict) and render_overrides.get("output_format") not in (None, ""): + override = str(render_overrides["output_format"]).lower() + if override == "mp4": + return "mp4" + if override == "webp": + return "webp" + if override in {"png", "jpg", "jpeg"}: + return "png" if override == "png" else "jpg" + return output_extension + + +def _scale_render_samples_for_resolution( + samples: int | None, + width: int | None, + height: int | None, +) -> int | None: + if samples is None or width is None or height is None: + return samples + max_dim = max(width, height) + if max_dim > 1024: + return samples + scaled = max(32, int(samples * max_dim / 2048)) + return scaled if scaled < samples else samples + + +def build_order_line_render_invocation( + setup: OrderLineRenderSetupResult, + *, + template_context: TemplateResolutionResult | None = None, + position_context: RenderPositionContext | None = None, + material_context: MaterialResolutionResult | None = None, + emit: EmitFn = None, +) -> OrderLineRenderInvocation: + if not setup.is_ready or setup.order_line is None or setup.cad_file is None: + raise ValueError("build_order_line_render_invocation requires a ready order-line setup") + + line = setup.order_line + cad_file = setup.cad_file + output_type = line.output_type + position = position_context or RenderPositionContext() + render_settings = ( + merge_output_type_invocation_overrides( + output_type.render_settings, + getattr(output_type, "invocation_overrides", None), + ) + if output_type is not None + else {} + ) + + width = _coerce_int(render_settings.get("width")) + height = _coerce_int(render_settings.get("height")) + samples = _coerce_int(render_settings.get("samples")) + frame_count = _coerce_int(render_settings.get("frame_count")) or 24 + fps = _coerce_int(render_settings.get("fps")) or 25 + engine = render_settings.get("engine") + bg_color = str(render_settings.get("bg_color", "")) + turntable_axis = str(render_settings.get("turntable_axis", "world_z")) + noise_threshold = str(render_settings.get("noise_threshold", "")) + denoiser = str(render_settings.get("denoiser", "")) + denoising_input_passes = str(render_settings.get("denoising_input_passes", "")) + denoising_prefilter = str(render_settings.get("denoising_prefilter", "")) + denoising_quality = str(render_settings.get("denoising_quality", "")) + denoising_use_gpu = str(render_settings.get("denoising_use_gpu", "")) + transparent_bg = bool(output_type and output_type.transparent_bg) + cycles_device = (output_type.cycles_device or "auto") if output_type is not None else "auto" + + render_overrides = getattr(line, "render_overrides", None) + if isinstance(render_overrides, dict): + width = _coerce_int(render_overrides.get("width")) or width + height = _coerce_int(render_overrides.get("height")) or height + samples = _coerce_int(render_overrides.get("samples")) or samples + frame_count = _coerce_int(render_overrides.get("frame_count")) or frame_count + fps = _coerce_int(render_overrides.get("fps")) or fps + engine = render_overrides.get("engine") or engine + if render_overrides.get("bg_color") not in (None, ""): + bg_color = str(render_overrides["bg_color"]) + if render_overrides.get("turntable_axis") not in (None, ""): + turntable_axis = str(render_overrides["turntable_axis"]) + if render_overrides.get("noise_threshold") not in (None, ""): + noise_threshold = str(render_overrides["noise_threshold"]) + if render_overrides.get("denoiser") not in (None, ""): + denoiser = str(render_overrides["denoiser"]) + if render_overrides.get("denoising_input_passes") not in (None, ""): + denoising_input_passes = str(render_overrides["denoising_input_passes"]) + if render_overrides.get("denoising_prefilter") not in (None, ""): + denoising_prefilter = str(render_overrides["denoising_prefilter"]) + if render_overrides.get("denoising_quality") not in (None, ""): + denoising_quality = str(render_overrides["denoising_quality"]) + if render_overrides.get("denoising_use_gpu") not in (None, ""): + denoising_use_gpu = str(render_overrides["denoising_use_gpu"]) + if "transparent_bg" in render_overrides: + transparent_bg = _coerce_bool(render_overrides["transparent_bg"]) + if render_overrides.get("cycles_device") not in (None, ""): + cycles_device = str(render_overrides["cycles_device"]) + _emit(emit, str(line.id), f"Render overrides active: {render_overrides}") + + scaled_samples = _scale_render_samples_for_resolution(samples, width, height) + if ( + samples is not None + and scaled_samples is not None + and scaled_samples < samples + and width is not None + and height is not None + ): + _emit( + emit, + str(line.id), + f"Auto-scaled samples {samples} -> {scaled_samples} for {width}x{height}", + ) + samples = scaled_samples + + part_names_ordered = None + if cad_file.parsed_objects: + part_names = cad_file.parsed_objects.get("objects", []) + part_names_ordered = part_names or None + + product_name = line.product.name or line.product.pim_id or "product" + output_type_name = output_type.name if output_type is not None else "render" + output_extension = _resolve_render_output_extension(line) + output_filename = ( + f"{_sanitize_public_output_name(product_name)}_" + f"{_sanitize_public_output_name(output_type_name)}.{output_extension}" + ) + output_dir = Path(app_settings.upload_dir) / "renders" / str(line.id) + + material_map = None + use_materials = False + material_override = None + if template_context is not None: + material_map = template_context.material_map + use_materials = template_context.use_materials + material_override = template_context.override_material + if material_context is not None: + material_map = material_context.material_map + use_materials = material_context.use_materials + material_override = material_context.override_material + + return OrderLineRenderInvocation( + product_name=product_name, + output_type_name=output_type_name, + output_extension=output_extension, + output_filename=output_filename, + output_path=str(output_dir / output_filename), + is_animation=bool(output_type and output_type.is_animation), + is_cinematic=bool(output_type and render_settings.get("cinematic")), + width=width, + height=height, + engine=str(engine) if engine not in (None, "") else None, + samples=samples, + frame_count=frame_count, + fps=fps, + bg_color=bg_color, + turntable_axis=turntable_axis, + noise_threshold=noise_threshold, + denoiser=denoiser, + denoising_input_passes=denoising_input_passes, + denoising_prefilter=denoising_prefilter, + denoising_quality=denoising_quality, + denoising_use_gpu=denoising_use_gpu, + transparent_bg=transparent_bg, + cycles_device=cycles_device, + part_colors=dict(setup.part_colors or {}), + part_names_ordered=part_names_ordered, + template_path=template_context.template.blend_file_path if template_context and template_context.template else None, + target_collection=( + template_context.template.target_collection + if template_context and template_context.template and template_context.template.target_collection + else "Product" + ), + material_library_path=( + template_context.material_library if template_context and use_materials else None + ), + material_map=material_map, + lighting_only=bool(template_context.template.lighting_only) if template_context and template_context.template else False, + shadow_catcher=( + bool(template_context.template.shadow_catcher_enabled) + if template_context and template_context.template + else False + ), + camera_orbit=bool(template_context.template.camera_orbit) if template_context and template_context.template else True, + rotation_x=position.rotation_x, + rotation_y=position.rotation_y, + rotation_z=position.rotation_z, + focal_length_mm=position.focal_length_mm, + sensor_width_mm=position.sensor_width_mm, + usd_path=str(setup.usd_render_path) if setup.usd_render_path is not None else None, + material_override=material_override, + ) + + +def _canonical_public_output_path(line: OrderLine, output_path: str) -> str: + source_path = Path(output_path) + upload_root = Path(app_settings.upload_dir) + + try: + source_path.relative_to(upload_root / "renders") + return str(source_path) + except ValueError: + pass + + extension = source_path.suffix or ".bin" + product_name = None + if line.product is not None: + product_name = getattr(line.product, "name", None) or getattr(line.product, "pim_id", None) + output_type_name = getattr(line.output_type, "name", None) if line.output_type is not None else None + filename = f"{_sanitize_public_output_name(product_name or 'product')}_{_sanitize_public_output_name(output_type_name or 'render')}{extension}" + return str(upload_root / "renders" / str(line.id) / filename) + + +def _materialize_public_output(line: OrderLine, output_path: str) -> str: + canonical_path = Path(_canonical_public_output_path(line, output_path)) + source_path = Path(output_path) + canonical_path.parent.mkdir(parents=True, exist_ok=True) + if source_path != canonical_path: + shutil.copy2(source_path, canonical_path) + return str(canonical_path) + + +def _resolve_existing_workflow_run_id(session: Session, workflow_run_id: str | None) -> uuid.UUID | None: + if workflow_run_id in (None, ""): + return None + try: + candidate = uuid.UUID(str(workflow_run_id)) + except (TypeError, ValueError): + return None + existing = session.get(WorkflowRun, candidate) + return existing.id if existing is not None else None + + +def persist_order_line_media_asset( + session: Session, + line: OrderLine, + *, + success: bool, + output_path: str, + asset_type: MediaAssetType, + render_log: dict[str, Any] | None = None, + workflow_run_id: str | None = None, +) -> OutputSaveResult: + """Persist a non-primary workflow artifact as a MediaAsset without mutating order-line result fields.""" + status: Literal["completed", "failed"] = "completed" if success else "failed" + + asset_id: str | None = None + storage_key: str | None = None + resolved_workflow_run_id = _resolve_existing_workflow_run_id(session, workflow_run_id) + + if success: + storage_key = _normalize_storage_key(output_path) + output_file = Path(output_path) + existing_asset = session.execute( + select(MediaAsset).where(MediaAsset.storage_key == storage_key).limit(1) + ).scalar_one_or_none() + if existing_asset is None: + asset = MediaAsset( + tenant_id=line.product.cad_file.tenant_id if (line.product and line.product.cad_file) else None, + product_id=line.product_id, + cad_file_id=line.product.cad_file_id if line.product is not None else None, + order_line_id=line.id, + workflow_run_id=resolved_workflow_run_id, + asset_type=asset_type, + storage_key=storage_key, + mime_type=_resolve_output_mime_type(output_path), + file_size_bytes=output_file.stat().st_size if output_file.exists() else None, + render_config=render_log if isinstance(render_log, dict) else None, + ) + session.add(asset) + session.flush() + asset_id = str(asset.id) + else: + existing_asset.asset_type = asset_type + existing_asset.order_line_id = line.id + existing_asset.product_id = line.product_id + existing_asset.cad_file_id = line.product.cad_file_id if line.product is not None else None + existing_asset.mime_type = _resolve_output_mime_type(output_path) + existing_asset.file_size_bytes = output_file.stat().st_size if output_file.exists() else None + if isinstance(render_log, dict): + existing_asset.render_config = render_log + if resolved_workflow_run_id is not None: + existing_asset.workflow_run_id = resolved_workflow_run_id + session.flush() + asset_id = str(existing_asset.id) + + session.commit() + return OutputSaveResult( + status=status, + result_path=output_path if success else None, + asset_id=asset_id, + storage_key=storage_key, + asset_type=asset_type if success else None, + ) + + def _extract_render_error(render_log: dict[str, Any] | None) -> str | None: if not isinstance(render_log, dict): return None @@ -319,28 +894,43 @@ def persist_order_line_output( output_path: str, render_log: dict[str, Any] | None, render_completed_at: datetime | None = None, + workflow_run_id: str | None = None, ) -> OutputSaveResult: """Persist the render result for an order line and publish the media asset if needed.""" status: Literal["completed", "failed"] = "completed" if success else "failed" completed_at = render_completed_at or datetime.utcnow() + persisted_output_path = output_path line.render_status = status line.render_completed_at = completed_at line.render_log = render_log - line.result_path = output_path if success else None + if success: + persisted_output_path = _materialize_public_output(line, output_path) + line.result_path = persisted_output_path if success else None session.flush() asset_id: str | None = None storage_key: str | None = None asset_type: MediaAssetType | None = None + resolved_workflow_run_id = _resolve_existing_workflow_run_id(session, workflow_run_id) if success: - storage_key = _normalize_storage_key(output_path) - asset_type = _resolve_output_asset_type(output_path) + storage_key = _normalize_storage_key(persisted_output_path) + asset_type = _resolve_output_asset_type(persisted_output_path) + output_file = Path(persisted_output_path) existing_asset = session.execute( select(MediaAsset).where(MediaAsset.storage_key == storage_key).limit(1) ).scalar_one_or_none() if existing_asset is None: - output_file = Path(output_path) + existing_asset = session.execute( + select(MediaAsset) + .where( + MediaAsset.order_line_id == line.id, + MediaAsset.asset_type == asset_type, + ) + .order_by(MediaAsset.created_at.desc()) + .limit(1) + ).scalar_one_or_none() + if existing_asset is None: render_config = None if isinstance(render_log, dict): render_config = { @@ -360,9 +950,10 @@ def persist_order_line_output( tenant_id=line.product.cad_file.tenant_id if (line.product and line.product.cad_file) else None, order_line_id=line.id, product_id=line.product_id, + workflow_run_id=resolved_workflow_run_id, asset_type=asset_type, storage_key=storage_key, - mime_type=_resolve_output_mime_type(output_path), + mime_type=_resolve_output_mime_type(persisted_output_path), file_size_bytes=output_file.stat().st_size if output_file.exists() else None, width=None, height=None, @@ -372,9 +963,41 @@ def persist_order_line_output( session.flush() asset_id = str(asset.id) else: + existing_asset.order_line_id = line.id + existing_asset.product_id = line.product_id + existing_asset.asset_type = asset_type + existing_asset.storage_key = storage_key + existing_asset.mime_type = _resolve_output_mime_type(persisted_output_path) + existing_asset.file_size_bytes = output_file.stat().st_size if output_file.exists() else None + if line.product is not None: + existing_asset.cad_file_id = line.product.cad_file_id + if isinstance(render_log, dict): + existing_asset.render_config = { + key: render_log[key] + for key in ( + "renderer", + "engine_used", + "engine", + "samples", + "device_used", + "compute_type", + "total_duration_s", + ) + if key in render_log + } + if resolved_workflow_run_id is not None: + existing_asset.workflow_run_id = resolved_workflow_run_id + session.flush() asset_id = str(existing_asset.id) session.commit() + if line.order_id is not None: + try: + from app.domains.orders.service import check_order_completion + + check_order_completion(str(line.order_id)) + except Exception: + logger.exception("Failed to check order completion for order_line %s", line.id) return OutputSaveResult( status=status, result_path=line.result_path, @@ -480,13 +1103,29 @@ def prepare_order_line_render_context( .limit(1) ).scalar_one_or_none() if usd_asset: - usd_render_path = _resolve_asset_path(usd_asset.storage_key) - if usd_render_path: - logger.info( - "render_order_line: using usd_master %s for cad %s", - usd_render_path.name, + refresh_reason = _usd_master_refresh_reason(cad_file) + if refresh_reason is not None: + logger.warning( + "render_order_line: ignoring stale usd_master for cad %s (%s)", cad_file.id, + refresh_reason, ) + _emit( + emit, + order_line_id, + f"Existing USD master is stale ({refresh_reason}) — falling back to GLB/STEP", + "warning", + ) + if _queue_usd_master_refresh(str(cad_file.id)): + _emit(emit, order_line_id, "Queued USD master regeneration in background") + else: + usd_render_path = _resolve_asset_path(usd_asset.storage_key) + if usd_render_path: + logger.info( + "render_order_line: using usd_master %s for cad %s", + usd_render_path.name, + cad_file.id, + ) glb_reuse_path = None if not usd_render_path: diff --git a/backend/tests/domains/test_workflow_graph_runtime.py b/backend/tests/domains/test_workflow_graph_runtime.py index 653d9c1..de761f0 100644 --- a/backend/tests/domains/test_workflow_graph_runtime.py +++ b/backend/tests/domains/test_workflow_graph_runtime.py @@ -10,29 +10,32 @@ from sqlalchemy import create_engine, select, text from sqlalchemy.orm import Session, selectinload from app.database import Base +from app.core.process_steps import StepName from app.domains.auth.models import User, UserRole from app.domains.materials.models import AssetLibrary from app.domains.orders.models import Order, OrderLine, OrderStatus from app.domains.products.models import CadFile, Product from app.domains.rendering.models import OutputType, RenderTemplate, WorkflowRun from app.domains.rendering.workflow_executor import prepare_workflow_context -from app.domains.rendering.workflow_graph_runtime import WorkflowGraphRuntimeError, execute_graph_workflow +from app.domains.rendering.workflow_graph_runtime import ( + _build_task_kwargs, + WorkflowGraphState, + WorkflowGraphRuntimeError, + execute_graph_workflow, + find_unsupported_graph_nodes, +) from app.domains.rendering.workflow_run_service import create_workflow_run from app.domains.rendering.workflow_runtime_services import OrderLineRenderSetupResult import app.models # noqa: F401 - - -TEST_DB_URL = os.environ.get( - "TEST_DATABASE_URL", - "postgresql+asyncpg://hartomat:hartomat@localhost:5432/hartomat_test", -).replace("+asyncpg", "") +from tests.db_test_utils import reset_public_schema_sync, resolve_test_db_url @pytest.fixture def sync_session(): - engine = create_engine(TEST_DB_URL) + engine = create_engine(resolve_test_db_url(async_driver=False)) with engine.begin() as conn: + reset_public_schema_sync(conn) Base.metadata.create_all(conn) session = Session(engine) @@ -41,8 +44,7 @@ def sync_session(): finally: session.close() with engine.begin() as conn: - conn.execute(text("DROP SCHEMA public CASCADE")) - conn.execute(text("CREATE SCHEMA public")) + reset_public_schema_sync(conn) engine.dispose() @@ -143,6 +145,23 @@ def _seed_renderable_order_line( return line +def _seed_cad_file_only(session: Session, tmp_path: Path) -> CadFile: + step_path = tmp_path / "cad-only" / "thumbnail.step" + step_path.parent.mkdir(parents=True, exist_ok=True) + step_path.write_text("STEP", encoding="utf-8") + + cad_file = CadFile( + id=uuid.uuid4(), + original_name="thumbnail.step", + stored_path=str(step_path), + file_hash=f"hash-{uuid.uuid4().hex}", + parsed_objects={"objects": ["Body"]}, + ) + session.add(cad_file) + session.commit() + return cad_file + + def test_execute_graph_workflow_persists_bridge_outputs_and_queues_render_task( sync_session, tmp_path, @@ -242,6 +261,583 @@ def test_execute_graph_workflow_persists_bridge_outputs_and_queues_render_task( ] +def test_execute_graph_workflow_routes_cad_thumbnail_save_using_upstream_threejs_request( + sync_session, + tmp_path, + monkeypatch, +): + cad_file = _seed_cad_file_only(sync_session, tmp_path) + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id="task-threejs-thumb") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + { + "id": "threejs", + "step": "threejs_render", + "params": {"width": 640, "height": 480, "transparent_bg": True}, + }, + {"id": "save", "step": "thumbnail_save", "params": {}}, + ], + "edges": [ + {"from": "threejs", "to": "save"}, + ], + }, + context_id=str(cad_file.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=None, + workflow_context=workflow_context, + ) + + assert find_unsupported_graph_nodes(workflow_context) == [] + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == ["task-threejs-thumb"] + assert len(send_calls) == 1 + assert send_calls[0][0] == "app.tasks.step_tasks.render_graph_thumbnail" + assert send_calls[0][1] == [str(cad_file.id)] + assert send_calls[0][2]["renderer"] == "threejs" + assert send_calls[0][2]["width"] == 640 + assert send_calls[0][2]["height"] == 480 + assert send_calls[0][2]["transparent_bg"] is True + assert node_results["threejs"].status == "completed" + assert node_results["threejs"].output["renderer"] == "threejs" + assert node_results["save"].status == "queued" + assert node_results["save"].output["predicted_output_path"].endswith(f"{cad_file.id}.png") + + +def test_execute_graph_workflow_completes_cad_bridge_only_nodes_without_queueing( + sync_session, + tmp_path, + monkeypatch, +): + cad_file = _seed_cad_file_only(sync_session, tmp_path) + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id="unexpected-task") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "resolve", "step": "resolve_step_path", "params": {}}, + {"id": "stl", "step": "stl_cache_generate", "params": {}}, + ], + "edges": [ + {"from": "resolve", "to": "stl"}, + ], + }, + context_id=str(cad_file.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=None, + workflow_context=workflow_context, + ) + + assert find_unsupported_graph_nodes(workflow_context) == [] + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == [] + assert send_calls == [] + assert refreshed_run.status == "completed" + assert refreshed_run.completed_at is not None + assert node_results["resolve"].status == "completed" + assert node_results["resolve"].output["cad_file_id"] == str(cad_file.id) + assert node_results["resolve"].output["processing_status"] == "pending" + assert node_results["resolve"].output["step_path"].endswith("thumbnail.step") + assert node_results["stl"].status == "completed" + assert node_results["stl"].output["cache_mode"] == "compatibility_noop" + assert node_results["stl"].output["cache_required"] is False + assert node_results["stl"].output["cad_file_id"] == str(cad_file.id) + + +def test_execute_graph_workflow_routes_each_thumbnail_save_to_its_connected_request( + sync_session, + tmp_path, + monkeypatch, +): + cad_file = _seed_cad_file_only(sync_session, tmp_path) + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id=f"task-{len(send_calls)}") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + { + "id": "blender", + "step": "blender_render", + "params": {"width": 512, "height": 512, "samples": 64}, + }, + { + "id": "threejs", + "step": "threejs_render", + "params": {"width": 640, "height": 480, "transparent_bg": True}, + }, + {"id": "save_blender", "step": "thumbnail_save", "params": {}}, + {"id": "save_threejs", "step": "thumbnail_save", "params": {}}, + ], + "edges": [ + {"from": "blender", "to": "save_blender"}, + {"from": "threejs", "to": "save_threejs"}, + ], + }, + context_id=str(cad_file.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=None, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == ["task-1", "task-2"] + assert [call[0] for call in send_calls] == [ + "app.tasks.step_tasks.render_graph_thumbnail", + "app.tasks.step_tasks.render_graph_thumbnail", + ] + assert send_calls[0][2]["renderer"] == "blender" + assert send_calls[0][2]["width"] == 512 + assert send_calls[0][2]["height"] == 512 + assert send_calls[1][2]["renderer"] == "threejs" + assert send_calls[1][2]["width"] == 640 + assert send_calls[1][2]["height"] == 480 + assert send_calls[1][2]["transparent_bg"] is True + assert send_calls[0][2]["workflow_node_id"] == "save_blender" + assert send_calls[1][2]["workflow_node_id"] == "save_threejs" + assert node_results["save_blender"].status == "queued" + assert node_results["save_threejs"].status == "queued" + + +def test_execute_graph_workflow_merges_legacy_render_context_into_still_task_kwargs( + sync_session, + tmp_path, + monkeypatch, +): + from app.config import settings + + monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) + + line = _seed_renderable_order_line(sync_session, tmp_path) + asset_library = sync_session.execute( + select(AssetLibrary).where(AssetLibrary.is_active == True) # noqa: E712 + ).scalar_one() + asset_library_path = Path(settings.upload_dir) / "asset-libraries" / f"{asset_library.id}.blend" + asset_library_path.parent.mkdir(parents=True, exist_ok=True) + asset_library_path.write_text("BLEND", encoding="utf-8") + + assert line.output_type is not None + line.output_type.render_settings = { + "width": 2048, + "height": 2048, + "engine": "cycles", + "samples": 128, + "noise_threshold": "0.05", + } + line.output_type.transparent_bg = True + line.output_type.cycles_device = "cuda" + line.render_overrides = { + "height": 1440, + "samples": 96, + "denoiser": "OPENIMAGEDENOISE", + } + sync_session.commit() + + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id="task-merged-still") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", + lambda _session, _line: SimpleNamespace( + rotation_x=15.0, + rotation_y=25.0, + rotation_z=35.0, + focal_length_mm=85.0, + sensor_width_mm=36.0, + ), + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "render", "step": "blender_still", "params": {"width": 1024, "samples": 32}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + ], + }, + context_id=str(line.id), + execution_mode="shadow", + ) + create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + assert dispatch_result.task_ids == ["task-merged-still"] + assert len(send_calls) == 1 + + task_name, args, kwargs = send_calls[0] + assert task_name == "app.domains.rendering.tasks.render_order_line_still_task" + assert args == [str(line.id)] + assert kwargs["width"] == 2048 + assert kwargs["height"] == 1440 + assert kwargs["engine"] == "cycles" + assert kwargs["samples"] == 96 + assert kwargs["noise_threshold"] == "0.05" + assert kwargs["denoiser"] == "OPENIMAGEDENOISE" + assert kwargs["transparent_bg"] is True + assert kwargs["cycles_device"] == "cuda" + assert kwargs["template_path"] == "/templates/bearing.blend" + assert kwargs["target_collection"] == "Product" + assert kwargs["material_library_path"] == str(asset_library_path) + assert kwargs["material_map"] is not None + assert kwargs["part_colors"] == {"InnerRing": "Steel raw", "OuterRing": "Steel raw"} + assert kwargs["part_names_ordered"] == ["InnerRing", "OuterRing"] + assert kwargs["rotation_x"] == 15.0 + assert kwargs["rotation_y"] == 25.0 + assert kwargs["rotation_z"] == 35.0 + assert kwargs["focal_length_mm"] == 85.0 + assert kwargs["sensor_width_mm"] == 36.0 + assert kwargs["publish_asset_enabled"] is False + assert kwargs["emit_events"] is False + assert kwargs["job_document_enabled"] is False + assert kwargs["output_name_suffix"].startswith("shadow-") + + +def test_build_task_kwargs_autoscales_default_samples_via_shared_render_invocation( + tmp_path, + monkeypatch, +): + step_path = tmp_path / "cad" / "bearing.step" + step_path.parent.mkdir(parents=True, exist_ok=True) + step_path.write_text("STEP", encoding="utf-8") + + output_type = OutputType( + id=uuid.uuid4(), + name="Still Preview", + renderer="blender", + output_format="png", + render_settings={"width": 1024, "height": 512}, + ) + output_type.invocation_overrides = {"samples": 128, "engine": "cycles"} + cad_file = CadFile( + id=uuid.uuid4(), + original_name="bearing.step", + stored_path=str(step_path), + file_hash="hash-graph-1", + parsed_objects={"objects": ["InnerRing", "OuterRing"]}, + ) + product = Product( + id=uuid.uuid4(), + pim_id="P-graph-1", + name="Bearing G", + category_key="bearings", + cad_file_id=cad_file.id, + cad_file=cad_file, + ) + line = OrderLine( + id=uuid.uuid4(), + order_id=uuid.uuid4(), + product_id=product.id, + product=product, + output_type_id=output_type.id, + output_type=output_type, + ) + state = WorkflowGraphState( + setup=OrderLineRenderSetupResult( + status="ready", + order_line=line, + cad_file=cad_file, + part_colors={"InnerRing": "Steel raw"}, + ) + ) + workflow_context = SimpleNamespace( + workflow_run_id=uuid.uuid4(), + execution_mode="shadow", + ordered_nodes=[], + edges=[], + ) + node = SimpleNamespace(id="render", step=StepName.BLENDER_STILL, params={}) + + monkeypatch.setattr( + "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", + lambda _session, _line: SimpleNamespace( + rotation_x=0.0, + rotation_y=0.0, + rotation_z=0.0, + focal_length_mm=None, + sensor_width_mm=None, + ), + ) + + kwargs = _build_task_kwargs( + session=object(), + workflow_context=workflow_context, + state=state, + node=node, + ) + + assert kwargs["width"] == 1024 + assert kwargs["height"] == 512 + assert kwargs["samples"] == 64 + + +def test_execute_graph_workflow_respects_custom_render_settings_opt_in_for_still_task( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + assert line.output_type is not None + line.output_type.render_settings = { + "width": 2048, + "height": 2048, + "engine": "cycles", + "samples": 128, + } + sync_session.commit() + + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id="task-custom-still") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + { + "id": "render", + "step": "blender_still", + "params": { + "use_custom_render_settings": True, + "width": 1024, + "height": 768, + "samples": 32, + "render_engine": "eevee", + }, + }, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + ], + }, + context_id=str(line.id), + execution_mode="graph", + ) + create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + assert dispatch_result.task_ids == ["task-custom-still"] + assert len(send_calls) == 1 + + task_name, args, kwargs = send_calls[0] + assert task_name == "app.domains.rendering.tasks.render_order_line_still_task" + assert args == [str(line.id)] + assert kwargs["width"] == 1024 + assert kwargs["height"] == 768 + assert kwargs["samples"] == 32 + assert kwargs["render_engine"] == "eevee" + assert kwargs["engine"] == "cycles" + + +def test_execute_graph_workflow_preserves_turntable_timing_without_custom_render_settings( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + assert line.output_type is not None + line.output_type.render_settings = { + "width": 2048, + "height": 2048, + "engine": "cycles", + "samples": 128, + "fps": 30, + "frame_count": 180, + "bg_color": "#ffffff", + "turntable_axis": "world_y", + } + line.output_type.transparent_bg = True + line.output_type.cycles_device = "gpu" + line.material_override = "Studio White" + sync_session.commit() + + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id="task-turntable") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + monkeypatch.setattr( + "app.domains.rendering.workflow_graph_runtime.resolve_render_position_context", + lambda _session, _line: SimpleNamespace( + rotation_x=10.0, + rotation_y=20.0, + rotation_z=30.0, + focal_length_mm=70.0, + sensor_width_mm=35.0, + ), + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + { + "id": "render", + "step": "blender_turntable", + "params": { + "width": 1024, + "samples": 32, + "fps": 24, + "duration_s": 5, + "frame_count": 120, + }, + }, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + ], + }, + context_id=str(line.id), + execution_mode="shadow", + ) + create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + assert dispatch_result.task_ids == ["task-turntable"] + assert len(send_calls) == 1 + + task_name, args, kwargs = send_calls[0] + assert task_name == "app.domains.rendering.tasks.render_turntable_task" + assert args == [str(line.id)] + assert kwargs["width"] == 2048 + assert kwargs["height"] == 2048 + assert kwargs["samples"] == 128 + assert kwargs["transparent_bg"] is True + assert kwargs["cycles_device"] == "gpu" + assert kwargs["fps"] == 24 + assert kwargs["frame_count"] == 120 + assert kwargs["bg_color"] == "#ffffff" + assert kwargs["turntable_axis"] == "world_y" + assert kwargs["rotation_x"] == 10.0 + assert kwargs["rotation_y"] == 20.0 + assert kwargs["rotation_z"] == 30.0 + assert kwargs["focal_length_mm"] == 70.0 + assert kwargs["sensor_width_mm"] == 35.0 + assert kwargs["material_override"] == "Studio White" + assert kwargs["output_name_suffix"].startswith("shadow-") + + def test_execute_graph_workflow_retries_bridge_node_and_persists_attempt_metadata( sync_session, monkeypatch, @@ -353,3 +949,569 @@ def test_execute_graph_workflow_marks_failed_node_with_retry_exhausted_metadata( assert setup_result.output["retry_exhausted"] is True assert setup_result.output["last_error"] == "permanent setup failure" assert setup_result.output["failure_policy"]["fallback_to_legacy"] is True + + +def test_execute_graph_workflow_supports_output_save_bridge_node( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) + or SimpleNamespace(id="task-output-save"), + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, + {"id": "output", "step": "output_save", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + {"from": "render", "to": "output"}, + ], + }, + context_id=str(line.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + assert find_unsupported_graph_nodes(workflow_context) == [] + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == ["task-output-save"] + assert len(send_calls) == 1 + assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" + assert send_calls[0][1] == [str(line.id)] + assert send_calls[0][2]["publish_asset_enabled"] is False + assert send_calls[0][2]["graph_authoritative_output_enabled"] is True + assert send_calls[0][2]["graph_output_node_ids"] == ["output"] + assert node_results["render"].status == "queued" + assert node_results["output"].status == "completed" + assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" + assert node_results["output"].output["order_line_id"] == str(line.id) + assert node_results["output"].output["artifact_count"] == 1 + assert node_results["output"].output["upstream_artifacts"] == [ + { + "node_id": "render", + "artifact_role": "render_output", + "predicted_output_path": str( + tmp_path / "cad" / "renders" / f"line_{line.id}.png" + ), + "predicted_asset_type": "still", + "publish_asset_enabled": False, + "graph_authoritative_output_enabled": True, + "graph_output_node_ids": ["output"], + "notify_handoff_enabled": False, + "task_id": "task-output-save", + } + ] + + +def test_execute_graph_workflow_arms_output_save_handoff_for_export_blend( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) + or SimpleNamespace(id="task-blend-output-save"), + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "blend", "step": "export_blend", "params": {}}, + {"id": "output", "step": "output_save", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "blend"}, + {"from": "blend", "to": "output"}, + ], + }, + context_id=str(line.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == ["task-blend-output-save"] + assert len(send_calls) == 1 + assert send_calls[0][0] == "app.domains.rendering.tasks.export_blend_for_order_line_task" + assert send_calls[0][1] == [str(line.id)] + assert send_calls[0][2]["publish_asset_enabled"] is False + assert send_calls[0][2]["graph_authoritative_output_enabled"] is True + assert send_calls[0][2]["graph_output_node_ids"] == ["output"] + assert node_results["blend"].status == "queued" + assert node_results["output"].status == "completed" + assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" + assert node_results["output"].output["artifact_count"] == 1 + assert node_results["output"].output["upstream_artifacts"] == [ + { + "node_id": "blend", + "artifact_role": "blend_export", + "predicted_output_path": str(tmp_path / "cad" / "bearing_production.blend"), + "predicted_asset_type": "blend_production", + "publish_asset_enabled": False, + "graph_authoritative_output_enabled": True, + "graph_output_node_ids": ["output"], + "notify_handoff_enabled": False, + "task_id": "task-blend-output-save", + } + ] + + +def test_execute_graph_workflow_arms_output_save_handoff_for_turntable( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + lambda task_name, args, kwargs: send_calls.append((task_name, args, kwargs)) + or SimpleNamespace(id="task-turntable-output-save"), + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, + {"id": "output", "step": "output_save", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "turntable"}, + {"from": "turntable", "to": "output"}, + ], + }, + context_id=str(line.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == ["task-turntable-output-save"] + assert len(send_calls) == 1 + assert send_calls[0][0] == "app.domains.rendering.tasks.render_turntable_task" + assert send_calls[0][1] == [str(line.id)] + assert send_calls[0][2]["publish_asset_enabled"] is False + assert send_calls[0][2]["graph_authoritative_output_enabled"] is True + assert send_calls[0][2]["graph_output_node_ids"] == ["output"] + assert send_calls[0][2]["workflow_node_id"] == "turntable" + assert node_results["turntable"].status == "queued" + assert node_results["output"].status == "completed" + assert node_results["output"].output["publication_mode"] == "awaiting_graph_authoritative_save" + assert node_results["output"].output["artifact_count"] == 1 + assert node_results["output"].output["upstream_artifacts"] == [ + { + "node_id": "turntable", + "artifact_role": "turntable_output", + "predicted_output_path": str(tmp_path / "cad" / "renders" / "turntable.mp4"), + "predicted_asset_type": "turntable", + "publish_asset_enabled": False, + "graph_authoritative_output_enabled": True, + "graph_output_node_ids": ["output"], + "notify_handoff_enabled": False, + "task_id": "task-turntable-output-save", + } + ] + + +def test_execute_graph_workflow_routes_output_save_handoffs_per_connected_branch( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id=f"task-branch-{len(send_calls)}") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "still", "step": "blender_still", "params": {"width": 1024, "height": 768}}, + {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, + {"id": "still_output", "step": "output_save", "params": {}}, + {"id": "turntable_output", "step": "output_save", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "still"}, + {"from": "template", "to": "turntable"}, + {"from": "still", "to": "still_output"}, + {"from": "turntable", "to": "turntable_output"}, + ], + }, + context_id=str(line.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == ["task-branch-1", "task-branch-2"] + assert len(send_calls) == 2 + assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" + assert send_calls[0][2]["graph_output_node_ids"] == ["still_output"] + assert send_calls[1][0] == "app.domains.rendering.tasks.render_turntable_task" + assert send_calls[1][2]["graph_output_node_ids"] == ["turntable_output"] + assert node_results["still_output"].output["artifact_count"] == 1 + assert node_results["still_output"].output["upstream_artifacts"] == [ + { + "node_id": "still", + "artifact_role": "render_output", + "predicted_output_path": str(tmp_path / "cad" / "renders" / f"line_{line.id}.png"), + "predicted_asset_type": "still", + "publish_asset_enabled": False, + "graph_authoritative_output_enabled": True, + "graph_output_node_ids": ["still_output"], + "notify_handoff_enabled": False, + "task_id": "task-branch-1", + } + ] + assert node_results["turntable_output"].output["artifact_count"] == 1 + assert node_results["turntable_output"].output["upstream_artifacts"] == [ + { + "node_id": "turntable", + "artifact_role": "turntable_output", + "predicted_output_path": str(tmp_path / "cad" / "renders" / "turntable.mp4"), + "predicted_asset_type": "turntable", + "publish_asset_enabled": False, + "graph_authoritative_output_enabled": True, + "graph_output_node_ids": ["turntable_output"], + "notify_handoff_enabled": False, + "task_id": "task-branch-2", + } + ] + + +def test_execute_graph_workflow_keeps_self_publish_when_no_output_save_node( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id="task-no-output-save") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "render", "step": "blender_still", "params": {"width": 1024, "height": 768}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + ], + }, + context_id=str(line.id), + execution_mode="graph", + ) + create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + assert dispatch_result.task_ids == ["task-no-output-save"] + assert len(send_calls) == 1 + assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" + assert send_calls[0][2].get("publish_asset_enabled", True) is True + assert send_calls[0][2].get("graph_authoritative_output_enabled") in (None, False) + assert send_calls[0][2].get("graph_output_node_ids") in (None, []) + + +def test_execute_graph_workflow_arms_notify_handoff_for_graph_render_task( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id="task-notify-handoff") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "render", "step": "blender_still", "params": {}}, + {"id": "notify", "step": "notify", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + {"from": "render", "to": "notify"}, + ], + }, + context_id=str(line.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == ["task-notify-handoff"] + assert len(send_calls) == 1 + assert send_calls[0][2]["emit_legacy_notifications"] is True + assert send_calls[0][2]["graph_notify_node_ids"] == ["notify"] + assert node_results["render"].output["graph_notify_node_ids"] == ["notify"] + assert node_results["notify"].status == "completed" + assert node_results["notify"].output["notification_mode"] == "deferred_to_render_task" + assert node_results["notify"].output["armed_node_ids"] == ["render"] + + +def test_execute_graph_workflow_routes_notify_handoffs_per_connected_branch( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id=f"task-notify-branch-{len(send_calls)}") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "still", "step": "blender_still", "params": {}}, + {"id": "turntable", "step": "blender_turntable", "params": {"fps": 24, "frame_count": 96}}, + {"id": "still_notify", "step": "notify", "params": {}}, + {"id": "turntable_notify", "step": "notify", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "still"}, + {"from": "template", "to": "turntable"}, + {"from": "still", "to": "still_notify"}, + {"from": "turntable", "to": "turntable_notify"}, + ], + }, + context_id=str(line.id), + execution_mode="graph", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == ["task-notify-branch-1", "task-notify-branch-2"] + assert len(send_calls) == 2 + assert send_calls[0][0] == "app.domains.rendering.tasks.render_order_line_still_task" + assert send_calls[0][2]["emit_legacy_notifications"] is True + assert send_calls[0][2]["graph_notify_node_ids"] == ["still_notify"] + assert send_calls[1][0] == "app.domains.rendering.tasks.render_turntable_task" + assert send_calls[1][2]["emit_legacy_notifications"] is True + assert send_calls[1][2]["graph_notify_node_ids"] == ["turntable_notify"] + assert node_results["still"].output["graph_notify_node_ids"] == ["still_notify"] + assert node_results["turntable"].output["graph_notify_node_ids"] == ["turntable_notify"] + assert node_results["still_notify"].status == "completed" + assert node_results["still_notify"].output["armed_node_ids"] == ["still"] + assert node_results["turntable_notify"].status == "completed" + assert node_results["turntable_notify"].output["armed_node_ids"] == ["turntable"] + + +def test_execute_graph_workflow_suppresses_notify_node_in_shadow_mode( + sync_session, + tmp_path, + monkeypatch, +): + line = _seed_renderable_order_line(sync_session, tmp_path) + + send_calls: list[tuple[str, list[str], dict[str, object]]] = [] + + def _fake_send_task(task_name: str, args: list[str], kwargs: dict[str, object]): + send_calls.append((task_name, args, kwargs)) + return SimpleNamespace(id="task-shadow-notify") + + monkeypatch.setattr( + "app.tasks.celery_app.celery_app.send_task", + _fake_send_task, + ) + + workflow_context = prepare_workflow_context( + { + "version": 1, + "nodes": [ + {"id": "setup", "step": "order_line_setup", "params": {}}, + {"id": "template", "step": "resolve_template", "params": {}}, + {"id": "render", "step": "blender_still", "params": {}}, + {"id": "notify", "step": "notify", "params": {}}, + ], + "edges": [ + {"from": "setup", "to": "template"}, + {"from": "template", "to": "render"}, + {"from": "render", "to": "notify"}, + ], + }, + context_id=str(line.id), + execution_mode="shadow", + ) + run = create_workflow_run( + sync_session, + workflow_def_id=None, + order_line_id=line.id, + workflow_context=workflow_context, + ) + + dispatch_result = execute_graph_workflow(sync_session, workflow_context) + sync_session.commit() + + refreshed_run = sync_session.execute( + select(WorkflowRun) + .where(WorkflowRun.id == run.id) + .options(selectinload(WorkflowRun.node_results)) + ).scalar_one() + node_results = {node_result.node_name: node_result for node_result in refreshed_run.node_results} + + assert dispatch_result.task_ids == ["task-shadow-notify"] + assert len(send_calls) == 1 + assert send_calls[0][2].get("emit_legacy_notifications", False) is False + assert node_results["notify"].status == "skipped" + assert node_results["notify"].output["notification_mode"] == "shadow_suppressed" diff --git a/backend/tests/domains/test_workflow_runtime_services.py b/backend/tests/domains/test_workflow_runtime_services.py index 29ad629..d10c079 100644 --- a/backend/tests/domains/test_workflow_runtime_services.py +++ b/backend/tests/domains/test_workflow_runtime_services.py @@ -17,28 +17,30 @@ from app.domains.products.models import CadFile, Product from app.domains.rendering.models import OutputType, RenderTemplate from app.domains.rendering.workflow_runtime_services import ( auto_populate_materials_for_cad, + build_order_line_render_invocation, emit_order_line_render_notifications, + MaterialResolutionResult, + OrderLineRenderSetupResult, + persist_order_line_media_asset, persist_order_line_output, resolve_cad_bbox, prepare_order_line_render_context, resolve_order_line_material_map, resolve_order_line_template_context, + RenderPositionContext, + TemplateResolutionResult, ) from app.domains.tenants.models import Tenant import app.models # noqa: F401 - - -TEST_DB_URL = os.environ.get( - "TEST_DATABASE_URL", - "postgresql+asyncpg://hartomat:hartomat@localhost:5432/hartomat_test", -).replace("+asyncpg", "") +from tests.db_test_utils import reset_public_schema_sync, resolve_test_db_url @pytest.fixture def sync_session(): - engine = create_engine(TEST_DB_URL) + engine = create_engine(resolve_test_db_url(async_driver=False)) with engine.begin() as conn: + reset_public_schema_sync(conn) Base.metadata.create_all(conn) session = Session(engine) @@ -47,8 +49,7 @@ def sync_session(): finally: session.close() with engine.begin() as conn: - conn.execute(text("DROP SCHEMA public CASCADE")) - conn.execute(text("CREATE SCHEMA public")) + reset_public_schema_sync(conn) engine.dispose() @@ -121,6 +122,13 @@ def test_prepare_order_line_render_context_marks_line_processing_and_prefers_usd upload_dir.mkdir(parents=True, exist_ok=True) line = _seed_order_line_graph(sync_session, tmp_path) + line.product.cad_file.resolved_material_assignments = { + "inner_ring": { + "source_name": "InnerRing", + "prim_path": "/Root/Assembly/inner_ring", + "canonical_material": "HARTOMAT_010101_Steel-Bare", + } + } usd_asset_path = upload_dir / "usd" / "bearing.usd" usd_asset_path.parent.mkdir(parents=True, exist_ok=True) usd_asset_path.write_text("USD", encoding="utf-8") @@ -159,6 +167,82 @@ def test_prepare_order_line_render_context_marks_line_processing_and_prefers_usd assert any("Using USD master for render" in message for message in messages) +def test_prepare_order_line_render_context_queues_refresh_for_legacy_usd(sync_session, tmp_path, monkeypatch): + from app.config import settings + + monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) + upload_dir = Path(settings.upload_dir) + upload_dir.mkdir(parents=True, exist_ok=True) + + line = _seed_order_line_graph(sync_session, tmp_path) + line.product.cad_file.resolved_material_assignments = { + "inner_ring": { + "source_name": "InnerRing", + "prim_path": "/Root/Assembly/inner_ring", + "canonical_material": "SCHAEFFLER_010101_Steel-Bare", + } + } + + usd_asset_path = upload_dir / "usd" / "bearing.usd" + usd_asset_path.parent.mkdir(parents=True, exist_ok=True) + usd_asset_path.write_text("USD", encoding="utf-8") + glb_asset_path = upload_dir / "step_files" / "bearing_thumbnail.glb" + glb_asset_path.parent.mkdir(parents=True, exist_ok=True) + glb_asset_path.write_text("GLB", encoding="utf-8") + + sync_session.add_all( + [ + MediaAsset( + id=uuid.uuid4(), + cad_file_id=line.product.cad_file_id, + product_id=line.product_id, + asset_type=MediaAssetType.usd_master, + storage_key="usd/bearing.usd", + ), + MediaAsset( + id=uuid.uuid4(), + cad_file_id=line.product.cad_file_id, + product_id=line.product_id, + asset_type=MediaAssetType.gltf_geometry, + storage_key="step_files/bearing_thumbnail.glb", + ), + ] + ) + sync_session.commit() + + queued: list[str] = [] + messages: list[str] = [] + + class _Task: + @staticmethod + def delay(cad_file_id: str) -> None: + queued.append(cad_file_id) + + monkeypatch.setattr( + "app.tasks.step_tasks.generate_usd_master_task", + _Task(), + ) + + result = prepare_order_line_render_context( + sync_session, + str(line.id), + emit=lambda order_line_id, message, level=None: messages.append(message), + ) + + sync_session.refresh(line) + + expected_glb = tmp_path / "parts" / "bearing_thumbnail.glb" + assert result.is_ready + assert result.usd_render_path is None + assert result.glb_reuse_path == expected_glb + assert expected_glb.exists() + assert queued == [str(line.product.cad_file_id)] + assert any("stale" in message for message in messages) + assert any("Queued USD master regeneration" in message for message in messages) + assert any("Reusing cached GLB geometry" in message for message in messages) + assert line.render_status == "processing" + + def test_prepare_order_line_render_context_skips_closed_orders(sync_session, tmp_path, monkeypatch): from app.config import settings @@ -175,18 +259,262 @@ def test_prepare_order_line_render_context_skips_closed_orders(sync_session, tmp assert line.render_status == "cancelled" +def test_build_order_line_render_invocation_applies_output_and_line_overrides(tmp_path): + step_path = tmp_path / "parts" / "bearing.step" + step_path.parent.mkdir(parents=True, exist_ok=True) + step_path.write_text("STEP", encoding="utf-8") + + output_type = OutputType( + id=uuid.uuid4(), + name="Still Preview", + renderer="blender", + output_format="png", + render_settings={"width": 1600, "height": 900}, + transparent_bg=False, + cycles_device="cpu", + ) + output_type.invocation_overrides = { + "engine": "cycles", + "samples": 128, + "bg_color": "#202020", + "turntable_axis": "world_y", + "noise_threshold": "0.05", + } + cad_file = CadFile( + id=uuid.uuid4(), + original_name="bearing.step", + stored_path=str(step_path), + file_hash="hash-1", + parsed_objects={"objects": ["InnerRing", "OuterRing"]}, + ) + product = Product( + id=uuid.uuid4(), + pim_id="P-1000", + name="Bearing A", + category_key="bearings", + cad_file_id=cad_file.id, + cad_file=cad_file, + ) + line = OrderLine( + id=uuid.uuid4(), + order_id=uuid.uuid4(), + product_id=product.id, + product=product, + output_type_id=output_type.id, + output_type=output_type, + render_overrides={ + "height": 800, + "samples": 48, + "transparent_bg": True, + "cycles_device": "cuda", + "denoiser": "OPENIMAGEDENOISE", + "output_format": "webp", + }, + ) + setup = OrderLineRenderSetupResult( + status="ready", + order_line=line, + cad_file=cad_file, + part_colors={"InnerRing": "Steel raw", "OuterRing": "Steel raw"}, + ) + template = RenderTemplate( + id=uuid.uuid4(), + name="Studio", + blend_file_path="/templates/studio.blend", + original_filename="studio.blend", + target_collection="Assembly", + lighting_only=True, + shadow_catcher_enabled=True, + camera_orbit=False, + ) + invocation = build_order_line_render_invocation( + setup, + template_context=TemplateResolutionResult( + template=template, + material_library="/libraries/materials.blend", + material_map={"InnerRing": "SteelPolished"}, + use_materials=True, + override_material="Studio White", + category_key="bearings", + output_type_id=str(output_type.id), + ), + position_context=RenderPositionContext( + rotation_x=12.0, + rotation_y=24.0, + rotation_z=36.0, + focal_length_mm=50.0, + sensor_width_mm=36.0, + ), + ) + + assert invocation.output_extension == "webp" + assert invocation.output_filename.endswith(".webp") + assert invocation.width == 1600 + assert invocation.height == 800 + assert invocation.engine == "cycles" + assert invocation.samples == 48 + assert invocation.noise_threshold == "0.05" + assert invocation.denoiser == "OPENIMAGEDENOISE" + assert invocation.transparent_bg is True + assert invocation.cycles_device == "cuda" + assert invocation.bg_color == "#202020" + assert invocation.turntable_axis == "world_y" + assert invocation.template_path == "/templates/studio.blend" + assert invocation.target_collection == "Assembly" + assert invocation.material_library_path == "/libraries/materials.blend" + assert invocation.material_map == {"InnerRing": "SteelPolished"} + assert invocation.material_override == "Studio White" + assert invocation.lighting_only is True + assert invocation.shadow_catcher is True + assert invocation.camera_orbit is False + assert invocation.part_names_ordered == ["InnerRing", "OuterRing"] + assert invocation.rotation_x == 12.0 + assert invocation.focal_length_mm == 50.0 + + still_kwargs = invocation.as_still_renderer_kwargs( + step_path=str(step_path), + output_path=str(tmp_path / "renders" / "bearing.webp"), + job_id="job-1", + order_line_id="line-1", + ) + + assert still_kwargs["step_path"] == str(step_path) + assert still_kwargs["output_path"].endswith("bearing.webp") + assert still_kwargs["width"] == 1600 + assert still_kwargs["height"] == 800 + assert still_kwargs["engine"] == "cycles" + assert still_kwargs["samples"] == 48 + assert still_kwargs["cycles_device"] == "cuda" + assert still_kwargs["material_library_path"] == "/libraries/materials.blend" + assert still_kwargs["material_override"] == "Studio White" + assert still_kwargs["job_id"] == "job-1" + assert still_kwargs["order_line_id"] == "line-1" + + +def test_build_order_line_render_invocation_autoscales_samples_and_prefers_material_context( + tmp_path, +): + step_path = tmp_path / "parts" / "bearing.step" + step_path.parent.mkdir(parents=True, exist_ok=True) + step_path.write_text("STEP", encoding="utf-8") + + output_type = OutputType( + id=uuid.uuid4(), + name="Still Preview", + renderer="blender", + output_format="png", + render_settings={"width": 1024, "height": 512}, + ) + output_type.invocation_overrides = {"samples": 128, "engine": "eevee"} + cad_file = CadFile( + id=uuid.uuid4(), + original_name="bearing.step", + stored_path=str(step_path), + file_hash="hash-2", + parsed_objects={"objects": ["InnerRing", "OuterRing"]}, + ) + product = Product( + id=uuid.uuid4(), + pim_id="P-1001", + name="Bearing B", + category_key="bearings", + cad_file_id=cad_file.id, + cad_file=cad_file, + ) + line = OrderLine( + id=uuid.uuid4(), + order_id=uuid.uuid4(), + product_id=product.id, + product=product, + output_type_id=output_type.id, + output_type=output_type, + ) + setup = OrderLineRenderSetupResult( + status="ready", + order_line=line, + cad_file=cad_file, + part_colors={"InnerRing": "Steel raw"}, + ) + template = RenderTemplate( + id=uuid.uuid4(), + name="Studio", + blend_file_path="/templates/studio.blend", + original_filename="studio.blend", + target_collection="Product", + ) + invocation = build_order_line_render_invocation( + setup, + template_context=TemplateResolutionResult( + template=template, + material_library="/libraries/materials.blend", + material_map={"InnerRing": "TemplateSteel"}, + use_materials=True, + override_material="Template White", + category_key="bearings", + output_type_id=str(output_type.id), + ), + material_context=MaterialResolutionResult( + material_map={"InnerRing": "ResolvedSteel"}, + use_materials=False, + override_material="Resolved White", + source_material_count=2, + resolved_material_count=1, + ), + ) + + assert invocation.engine == "eevee" + assert invocation.samples == 64 + assert invocation.material_map == {"InnerRing": "ResolvedSteel"} + assert invocation.material_override == "Resolved White" + assert invocation.material_library_path is None + + turntable_kwargs = invocation.as_turntable_renderer_kwargs( + step_path=step_path, + output_path=tmp_path / "renders" / "bearing.mp4", + smooth_angle=30, + default_width=1920, + default_height=1920, + default_engine="cycles", + default_samples=256, + ) + cinematic_kwargs = invocation.as_cinematic_renderer_kwargs( + step_path=step_path, + output_path=tmp_path / "renders" / "bearing-cinematic.mp4", + smooth_angle=30, + default_width=1920, + default_height=1080, + default_engine="cycles", + default_samples=256, + ) + + assert turntable_kwargs["width"] == 1024 + assert turntable_kwargs["height"] == 512 + assert turntable_kwargs["engine"] == "eevee" + assert turntable_kwargs["samples"] == 64 + assert turntable_kwargs["material_map"] == {"InnerRing": "ResolvedSteel"} + assert turntable_kwargs["material_library_path"] is None + assert cinematic_kwargs["width"] == 1024 + assert cinematic_kwargs["height"] == 512 + assert cinematic_kwargs["engine"] == "eevee" + assert cinematic_kwargs["samples"] == 64 + assert cinematic_kwargs["material_override"] == "Resolved White" + + def test_resolve_order_line_template_context_uses_exact_template_and_override(sync_session, tmp_path, monkeypatch): from app.config import settings monkeypatch.setattr(settings, "upload_dir", str(tmp_path / "uploads")) line = _seed_order_line_graph(sync_session, tmp_path) line.material_override = "HARTOMAT_OVERRIDE" + material_library_path = tmp_path / "libraries" / "materials.blend" + material_library_path.parent.mkdir(parents=True, exist_ok=True) + material_library_path.write_text("BLEND", encoding="utf-8") sync_session.add( AssetLibrary( id=uuid.uuid4(), name="Default Library", - blend_file_path="/libraries/materials.blend", + blend_file_path=str(material_library_path), is_active=True, ) ) @@ -215,7 +543,7 @@ def test_resolve_order_line_template_context_uses_exact_template_and_override(sy assert result.template is not None assert result.template.name == "Bearing Studio" - assert result.material_library == "/libraries/materials.blend" + assert result.material_library == str(material_library_path) assert result.override_material == "HARTOMAT_OVERRIDE" assert result.use_materials is True assert result.material_map == { @@ -522,6 +850,79 @@ def test_persist_order_line_output_reuses_existing_asset(sync_session, tmp_path, assert len(assets) == 1 +def test_persist_order_line_output_canonicalizes_step_file_outputs(sync_session, tmp_path, monkeypatch): + from app.config import settings + + upload_dir = tmp_path / "uploads" + monkeypatch.setattr(settings, "upload_dir", str(upload_dir)) + line = _seed_order_line_graph(sync_session, tmp_path) + step_render_path = upload_dir / "step_files" / "renders" / f"line_{line.id}.png" + step_render_path.parent.mkdir(parents=True, exist_ok=True) + step_render_path.write_text("PNGDATA", encoding="utf-8") + + existing = MediaAsset( + id=uuid.uuid4(), + order_line_id=line.id, + product_id=line.product_id, + asset_type=MediaAssetType.still, + storage_key=f"renders/{line.id}/bearing.png", + ) + sync_session.add(existing) + sync_session.commit() + + result = persist_order_line_output( + sync_session, + line, + success=True, + output_path=str(step_render_path), + render_log={"renderer": "blender", "engine_used": "cycles"}, + workflow_run_id=str(uuid.uuid4()), + ) + + sync_session.refresh(line) + expected_path = Path(result.result_path or "") + asset = sync_session.execute( + select(MediaAsset).where(MediaAsset.id == existing.id) + ).scalar_one() + + assert expected_path.exists() + assert expected_path.read_text(encoding="utf-8") == "PNGDATA" + assert expected_path.parent == upload_dir / "renders" / str(line.id) + assert expected_path.name.startswith("Bearing_A_Still-") + assert expected_path.suffix == ".png" + assert result.result_path == str(expected_path) + assert result.storage_key == f"renders/{line.id}/{expected_path.name}" + assert line.result_path == str(expected_path) + assert result.asset_id == str(existing.id) + assert asset.storage_key == f"renders/{line.id}/{expected_path.name}" + + +def test_persist_order_line_output_checks_order_completion(sync_session, tmp_path, monkeypatch): + from app.config import settings + + upload_dir = tmp_path / "uploads" + monkeypatch.setattr(settings, "upload_dir", str(upload_dir)) + line = _seed_order_line_graph(sync_session, tmp_path) + rendered = tmp_path / "rendered.png" + rendered.write_text("PNGDATA", encoding="utf-8") + + calls: list[str] = [] + monkeypatch.setattr( + "app.domains.orders.service.check_order_completion", + lambda order_id: calls.append(order_id) or True, + ) + + persist_order_line_output( + sync_session, + line, + success=True, + output_path=str(rendered), + render_log={"renderer": "blender"}, + ) + + assert calls == [str(line.order_id)] + + def test_persist_order_line_output_marks_failure_without_result_path(sync_session, tmp_path): line = _seed_order_line_graph(sync_session, tmp_path) @@ -547,6 +948,47 @@ def test_persist_order_line_output_marks_failure_without_result_path(sync_sessio assert assets == [] +def test_persist_order_line_media_asset_creates_blend_asset_without_touching_order_line(sync_session, tmp_path, monkeypatch): + from app.config import settings + + upload_dir = tmp_path / "uploads" + monkeypatch.setattr(settings, "upload_dir", str(upload_dir)) + line = _seed_order_line_graph(sync_session, tmp_path) + line.render_status = "completed" + line.result_path = str(upload_dir / "renders" / str(line.id) / "bearing.png") + sync_session.commit() + + output_path = upload_dir / "exports" / str(line.id) / "bearing_production.blend" + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text("BLENDDATA", encoding="utf-8") + + result = persist_order_line_media_asset( + sync_session, + line, + success=True, + output_path=str(output_path), + asset_type=MediaAssetType.blend_production, + render_log={"artifact_type": "blend_production"}, + ) + + sync_session.refresh(line) + asset = sync_session.execute( + select(MediaAsset).where(MediaAsset.storage_key == f"exports/{line.id}/bearing_production.blend") + ).scalar_one_or_none() + + assert result.status == "completed" + assert result.result_path == str(output_path) + assert result.storage_key == f"exports/{line.id}/bearing_production.blend" + assert result.asset_type == MediaAssetType.blend_production + assert line.render_status == "completed" + assert line.result_path == str(upload_dir / "renders" / str(line.id) / "bearing.png") + assert asset is not None + assert asset.asset_type == MediaAssetType.blend_production + assert asset.mime_type == "application/x-blender" + assert asset.file_size_bytes == output_path.stat().st_size + assert asset.render_config == {"artifact_type": "blend_production"} + + def test_emit_order_line_render_notifications_emits_websocket_and_activity( sync_session, tmp_path,