"""Rendering domain tasks — Celery tasks for Blender-based rendering. These tasks run on the `asset_pipeline` queue in the render-worker container, which has Blender and cadquery available. Phase A2: Initial implementation replacing the blender-renderer HTTP service. Phase B: This module will be expanded as part of the Domain-Driven restructure. """ import logging import uuid from pathlib import Path from app.core.render_paths import ( build_order_line_export_path, build_order_line_step_render_path, ensure_group_writable_dir, ) from app.tasks.celery_app import celery_app from app.core.task_logs import log_task_event logger = logging.getLogger(__name__) _RENDER_STILL_CONTROL_PARAM_KEYS = { "workflow_run_id", "workflow_node_id", "publish_asset_enabled", "observer_output_enabled", "graph_authoritative_output_enabled", "graph_output_node_ids", "graph_notify_node_ids", "emit_events", "job_document_enabled", "emit_legacy_notifications", "output_name_suffix", } def _normalize_render_output_extension(value: object) -> str | None: if value in (None, ""): return None normalized = str(value).strip().lower() if normalized in {"jpeg", "jpg"}: return "jpg" if normalized in {"png", "webp", "mp4", "blend"}: return normalized return None def _resolve_order_line_still_output_extension( order_line_id: str, params: dict | None = None, ) -> str: override_extension = _normalize_render_output_extension((params or {}).get("output_format")) if override_extension in {"png", "jpg", "webp"}: return override_extension try: from sqlalchemy import select from sqlalchemy.orm import selectinload from app.core.db_utils import get_sync_session from app.domains.orders.models import OrderLine from app.domains.rendering.workflow_runtime_services import _resolve_render_output_extension with get_sync_session() as db: line = db.execute( select(OrderLine) .options(selectinload(OrderLine.output_type)) .where(OrderLine.id == order_line_id) ).scalar_one_or_none() if line is None: return "png" resolved = _resolve_render_output_extension(line) return resolved if resolved in {"png", "jpg", "webp"} else "png" except Exception as exc: logger.warning( "Failed to resolve still output extension for order_line %s: %s", order_line_id, exc, ) return "png" def _normalize_order_line_still_params(params: dict) -> dict: """Map legacy workflow/editor params onto render_still kwargs.""" normalized = dict(params) normalized.pop("use_custom_render_settings", None) legacy_engine = normalized.pop("render_engine", None) if legacy_engine is not None and normalized.get("engine") is None: normalized["engine"] = legacy_engine resolution = normalized.pop("resolution", None) if ( isinstance(resolution, (list, tuple)) and len(resolution) == 2 ): normalized.setdefault("width", int(resolution[0])) normalized.setdefault("height", int(resolution[1])) usd_path = normalized.get("usd_path") if isinstance(usd_path, str) and usd_path.strip(): normalized["usd_path"] = Path(usd_path) for key in _RENDER_STILL_CONTROL_PARAM_KEYS: normalized.pop(key, None) return normalized def _update_workflow_run_status( order_line_id: str, status: str, error: str | None = None, *, workflow_run_id: str | None = None, workflow_node_id: str | None = None, ) -> None: """Update WorkflowRun / WorkflowNodeResult state after task completion.""" try: import uuid from datetime import datetime as _dt from sqlalchemy import select as _sel from app.core.db_utils import get_sync_session from app.domains.rendering.models import WorkflowNodeResult, WorkflowRun with get_sync_session() as db: run = None if workflow_run_id: try: resolved_run_id = uuid.UUID(str(workflow_run_id)) except (TypeError, ValueError): resolved_run_id = workflow_run_id run = db.execute( _sel(WorkflowRun).where(WorkflowRun.id == resolved_run_id) ).scalar_one_or_none() else: run = db.execute( _sel(WorkflowRun) .where(WorkflowRun.order_line_id == order_line_id) .order_by(WorkflowRun.created_at.desc()) .limit(1) ).scalar_one_or_none() if run is None: return if workflow_node_id: node_result = db.execute( _sel(WorkflowNodeResult).where( WorkflowNodeResult.run_id == run.id, WorkflowNodeResult.node_name == workflow_node_id, ) ).scalar_one_or_none() if node_result is not None: metadata = dict(node_result.output or {}) if error: metadata["last_error"] = error[:2000] node_result.status = status node_result.log = error[:2000] if error else None node_result.output = metadata node_results = list( db.execute( _sel(WorkflowNodeResult).where(WorkflowNodeResult.run_id == run.id) ).scalars().all() ) if any(node.status == "failed" for node in node_results): run.status = "failed" run.completed_at = _dt.utcnow() if error: run.error_message = error[:2000] elif any(node.status in {"pending", "queued", "running", "retrying"} for node in node_results): run.status = "pending" run.completed_at = None if status != "failed": run.error_message = None else: run.status = status run.completed_at = _dt.utcnow() if status != "failed": run.error_message = None except Exception as _exc: logger.warning("Failed to update WorkflowRun status for line %s: %s", order_line_id, _exc) def _mark_workflow_node_running( order_line_id: str, *, workflow_run_id: str | None = None, workflow_node_id: str | None = None, task_id: str | None = None, ) -> None: if not workflow_node_id: return try: from app.core.db_utils import get_sync_session from app.domains.rendering.models import WorkflowNodeResult from sqlalchemy import select as _sel with get_sync_session() as db: import uuid from app.domains.rendering.models import WorkflowRun run = None if workflow_run_id: try: resolved_run_id = uuid.UUID(str(workflow_run_id)) except (TypeError, ValueError): resolved_run_id = workflow_run_id run = db.execute( _sel(WorkflowRun).where(WorkflowRun.id == resolved_run_id) ).scalar_one_or_none() else: run = db.execute( _sel(WorkflowRun) .where(WorkflowRun.order_line_id == order_line_id) .order_by(WorkflowRun.created_at.desc()) .limit(1) ).scalar_one_or_none() if run is None: return node_result = db.execute( _sel(WorkflowNodeResult).where( WorkflowNodeResult.run_id == run.id, WorkflowNodeResult.node_name == workflow_node_id, ) ).scalar_one_or_none() if node_result is None: return metadata = dict(node_result.output or {}) if task_id: metadata["task_id"] = task_id metadata["runtime_state"] = "running" node_result.status = "running" node_result.log = None node_result.output = metadata run.status = "pending" run.completed_at = None except Exception as _exc: logger.warning( "Failed to mark WorkflowNodeResult running for line %s node %s: %s", order_line_id, workflow_node_id, _exc, ) def _emit_graph_render_notifications( order_line_id: str, *, success: bool, render_log: dict | None = None, ) -> None: try: from sqlalchemy import create_engine, select from sqlalchemy.orm import Session, joinedload from app.config import settings as app_settings from app.domains.orders.models import OrderLine from app.domains.products.models import Product from app.domains.rendering.workflow_runtime_services import ( emit_order_line_render_notifications, ) engine = create_engine(app_settings.database_url_sync) try: with Session(engine) as session: line = session.execute( select(OrderLine) .where(OrderLine.id == order_line_id) .options( joinedload(OrderLine.product).joinedload(Product.cad_file), joinedload(OrderLine.output_type), ) ).scalar_one_or_none() if line is None: return tenant_id = None if line.product and line.product.cad_file and line.product.cad_file.tenant_id: tenant_id = str(line.product.cad_file.tenant_id) emit_order_line_render_notifications( success=success, order_line_id=order_line_id, tenant_id=tenant_id, product_name=line.product.name if line.product else "unknown", output_type_name=line.output_type.name if line.output_type else "unknown", render_log=render_log if isinstance(render_log, dict) else None, session=session, line=line, ) finally: engine.dispose() except Exception as exc: logger.exception( "Failed to emit graph render notifications for order_line %s: %s", order_line_id, exc, ) def _update_graph_output_nodes( *, workflow_run_id: str | None, output_node_ids: list[str], status: str, output_updates: dict | None = None, error: str | None = None, ) -> None: if workflow_run_id is None or not output_node_ids: return import uuid from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from app.config import settings as app_settings from app.domains.rendering.models import WorkflowNodeResult try: resolved_run_id = uuid.UUID(str(workflow_run_id)) except (TypeError, ValueError): resolved_run_id = workflow_run_id engine = create_engine(app_settings.database_url_sync) try: with Session(engine) as session: for node_id in output_node_ids: node_result = session.execute( select(WorkflowNodeResult).where( WorkflowNodeResult.run_id == resolved_run_id, WorkflowNodeResult.node_name == node_id, ) ).scalar_one_or_none() if node_result is None: continue metadata = dict(node_result.output or {}) if output_updates: metadata.update(output_updates) if error: metadata["last_error"] = error[:2000] node_result.status = status node_result.log = error[:2000] if error else None node_result.output = metadata session.commit() finally: engine.dispose() def _update_graph_notify_nodes( *, workflow_run_id: str | None, notify_node_ids: list[str], status: str, output_updates: dict | None = None, error: str | None = None, ) -> None: if workflow_run_id is None or not notify_node_ids: return import uuid from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from app.config import settings as app_settings from app.domains.rendering.models import WorkflowNodeResult try: resolved_run_id = uuid.UUID(str(workflow_run_id)) except (TypeError, ValueError): resolved_run_id = workflow_run_id engine = create_engine(app_settings.database_url_sync) try: with Session(engine) as session: for node_id in notify_node_ids: node_result = session.execute( select(WorkflowNodeResult).where( WorkflowNodeResult.run_id == resolved_run_id, WorkflowNodeResult.node_name == node_id, ) ).scalar_one_or_none() if node_result is None: continue metadata = dict(node_result.output or {}) if output_updates: metadata.update(output_updates) if error: metadata["last_error"] = error[:2000] node_result.status = status node_result.log = error[:2000] if error else None node_result.output = metadata session.commit() finally: engine.dispose() def _finalize_graph_notify_nodes( *, workflow_run_id: str | None = None, notify_node_ids: list[str] | None = None, success: bool, render_node_id: str | None = None, error: str | None = None, ) -> None: notify_node_ids = list(notify_node_ids or []) if workflow_run_id is None or not notify_node_ids: return if success: _update_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=notify_node_ids, status="completed", output_updates={ "notification_mode": "completed_via_render_task", "completed_by_node_id": render_node_id, "render_success": True, }, ) else: _update_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=notify_node_ids, status="failed", output_updates={ "notification_mode": "failed_via_render_task", "completed_by_node_id": render_node_id, "render_success": False, }, error=error, ) def _finalize_graph_still_output( order_line_id: str, *, success: bool, output_path: str, render_log: dict | None = None, workflow_run_id: str | None = None, output_node_ids: list[str] | None = None, render_node_id: str | None = None, error: str | None = None, ) -> None: output_node_ids = list(output_node_ids or []) if workflow_run_id is None or not output_node_ids: return from sqlalchemy import create_engine, select from sqlalchemy.orm import Session, joinedload from app.config import settings as app_settings from app.domains.orders.models import OrderLine from app.domains.products.models import Product from app.domains.rendering.workflow_runtime_services import persist_order_line_output engine = create_engine(app_settings.database_url_sync) try: with Session(engine) as session: line = session.execute( select(OrderLine) .where(OrderLine.id == order_line_id) .options(joinedload(OrderLine.product).joinedload(Product.cad_file)) ).scalar_one_or_none() if line is None: return persisted = persist_order_line_output( session, line, success=success, output_path=output_path, render_log=render_log if isinstance(render_log, dict) else None, workflow_run_id=workflow_run_id, ) finally: engine.dispose() if success: _update_graph_output_nodes( workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, status="completed", output_updates={ "publication_mode": "graph_authoritative", "authoritative_result_path": persisted.result_path, "persisted_result_path": persisted.result_path, "asset_id": persisted.asset_id, "storage_key": persisted.storage_key, "asset_type": persisted.asset_type.value if persisted.asset_type is not None else None, "completed_by_node_id": render_node_id, }, ) else: _update_graph_output_nodes( workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, status="failed", output_updates={ "publication_mode": "blocked_by_render_failure", "authoritative_result_path": None, "persisted_result_path": None, "completed_by_node_id": render_node_id, }, error=error, ) def _finalize_graph_blend_output( order_line_id: str, *, success: bool, output_path: str, render_log: dict | None = None, workflow_run_id: str | None = None, output_node_ids: list[str] | None = None, render_node_id: str | None = None, error: str | None = None, ) -> None: output_node_ids = list(output_node_ids or []) if workflow_run_id is None or not output_node_ids: return from sqlalchemy import create_engine, select from sqlalchemy.orm import Session, joinedload from app.config import settings as app_settings from app.domains.media.models import MediaAssetType from app.domains.orders.models import OrderLine from app.domains.products.models import Product from app.domains.rendering.workflow_runtime_services import persist_order_line_media_asset engine = create_engine(app_settings.database_url_sync) try: with Session(engine) as session: line = session.execute( select(OrderLine) .where(OrderLine.id == order_line_id) .options(joinedload(OrderLine.product).joinedload(Product.cad_file)) ).scalar_one_or_none() if line is None: return persisted = persist_order_line_media_asset( session, line, success=success, output_path=output_path, asset_type=MediaAssetType.blend_production, render_log=render_log if isinstance(render_log, dict) else None, workflow_run_id=workflow_run_id, ) finally: engine.dispose() if success: _update_graph_output_nodes( workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, status="completed", output_updates={ "publication_mode": "graph_authoritative", "authoritative_result_path": persisted.result_path, "persisted_result_path": persisted.result_path, "asset_id": persisted.asset_id, "storage_key": persisted.storage_key, "asset_type": persisted.asset_type.value if persisted.asset_type is not None else None, "completed_by_node_id": render_node_id, }, ) else: _update_graph_output_nodes( workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, status="failed", output_updates={ "publication_mode": "blocked_by_render_failure", "authoritative_result_path": None, "persisted_result_path": None, "asset_id": None, "storage_key": None, "asset_type": None, "completed_by_node_id": render_node_id, }, error=error, ) def _finalize_graph_turntable_output( order_line_id: str, *, success: bool, output_path: str, render_log: dict | None = None, workflow_run_id: str | None = None, output_node_ids: list[str] | None = None, render_node_id: str | None = None, error: str | None = None, ) -> None: _finalize_graph_still_output( order_line_id, success=success, output_path=output_path, render_log=render_log, workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, render_node_id=render_node_id, error=error, ) def _finalize_observer_media_output( order_line_id: str, *, asset_type: str, success: bool, output_path: str, render_log: dict | None = None, workflow_run_id: str | None = None, output_node_ids: list[str] | None = None, render_node_id: str | None = None, error: str | None = None, ) -> None: if workflow_run_id is None: return from app.domains.media.models import MediaAssetType resolved_asset_type = MediaAssetType(asset_type) output_node_ids = list(output_node_ids or []) if success: _update_graph_output_nodes( workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, status="completed", output_updates={ "publication_mode": "shadow_observer_only", "observer_result_path": output_path, "persisted_result_path": output_path, "asset_id": None, "storage_key": None, "asset_type": resolved_asset_type.value, "completed_by_node_id": render_node_id, }, ) else: _update_graph_output_nodes( workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, status="failed", output_updates={ "publication_mode": "shadow_observer_failed", "observer_result_path": None, "persisted_result_path": None, "asset_id": None, "storage_key": None, "asset_type": resolved_asset_type.value, "completed_by_node_id": render_node_id, }, error=error, ) def _finalize_shadow_still_output( order_line_id: str, *, success: bool, output_path: str, render_log: dict | None = None, workflow_run_id: str | None = None, output_node_ids: list[str] | None = None, render_node_id: str | None = None, error: str | None = None, ) -> None: _finalize_observer_media_output( order_line_id, asset_type="still", success=success, output_path=output_path, render_log=render_log, workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, render_node_id=render_node_id, error=error, ) def _finalize_shadow_turntable_output( order_line_id: str, *, success: bool, output_path: str, render_log: dict | None = None, workflow_run_id: str | None = None, output_node_ids: list[str] | None = None, render_node_id: str | None = None, error: str | None = None, ) -> None: _finalize_observer_media_output( order_line_id, asset_type="turntable", success=success, output_path=output_path, render_log=render_log, workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, render_node_id=render_node_id, error=error, ) def _finalize_shadow_blend_output( order_line_id: str, *, success: bool, output_path: str, render_log: dict | None = None, workflow_run_id: str | None = None, output_node_ids: list[str] | None = None, render_node_id: str | None = None, error: str | None = None, ) -> None: _finalize_observer_media_output( order_line_id, asset_type="blend_production", success=success, output_path=output_path, render_log=render_log, workflow_run_id=workflow_run_id, output_node_ids=output_node_ids, render_node_id=render_node_id, error=error, ) @celery_app.task( bind=True, name="app.domains.rendering.tasks.render_still_task", queue="asset_pipeline", max_retries=2, ) def render_still_task( self, step_path: str, output_path: str, engine: str = "cycles", samples: int | None = None, smooth_angle: int = 30, cycles_device: str = "gpu", width: int = 512, height: int = 512, transparent_bg: bool = False, template_path: str | None = None, target_collection: str = "Product", material_library_path: str | None = None, material_map: dict | None = None, part_names_ordered: list | None = None, lighting_only: bool = False, shadow_catcher: bool = False, rotation_x: float = 0.0, rotation_y: float = 0.0, rotation_z: float = 0.0, noise_threshold: str = "", denoiser: str = "", denoising_input_passes: str = "", denoising_prefilter: str = "", denoising_quality: str = "", denoising_use_gpu: str = "", mesh_attributes: dict | None = None, template_inputs: dict | None = None, ) -> dict: """Render a STEP file to a still PNG via Blender subprocess. Returns render metadata dict on success. Retries up to 2 times on failure (30s countdown). """ log_task_event(self.request.id, f"Starting render_still_task: {Path(step_path).name}", "info") try: from app.services.render_blender import render_still result = render_still( step_path=Path(step_path), output_path=Path(output_path), engine=engine, samples=samples, smooth_angle=smooth_angle, cycles_device=cycles_device, width=width, height=height, transparent_bg=transparent_bg, template_path=template_path, target_collection=target_collection, material_library_path=material_library_path, material_map=material_map, part_names_ordered=part_names_ordered, lighting_only=lighting_only, shadow_catcher=shadow_catcher, rotation_x=rotation_x, rotation_y=rotation_y, rotation_z=rotation_z, noise_threshold=noise_threshold, denoiser=denoiser, denoising_input_passes=denoising_input_passes, denoising_prefilter=denoising_prefilter, denoising_quality=denoising_quality, denoising_use_gpu=denoising_use_gpu, mesh_attributes=mesh_attributes or {}, template_inputs=template_inputs, ) log_task_event(self.request.id, f"Completed successfully in {result.get('total_duration_s', 0):.1f}s", "done") logger.info( "render_still_task completed: %s → %s in %.1fs", Path(step_path).name, Path(output_path).name, result.get("total_duration_s", 0), ) try: from app.core.websocket import publish_event_sync publish_event_sync(None, { "type": "render.still.completed", "step_path": Path(step_path).name, "output": Path(output_path).name, }) except Exception: pass return result except Exception as exc: log_task_event(self.request.id, f"Failed: {exc}", "error") logger.error("render_still_task failed for %s: %s", step_path, exc) try: from app.core.websocket import publish_event_sync publish_event_sync(None, { "type": "render.still.failed", "step_path": Path(step_path).name, "error": str(exc), }) except Exception: pass raise self.retry(exc=exc, countdown=30) @celery_app.task( bind=True, name="app.domains.rendering.tasks.render_turntable_task", queue="asset_pipeline", max_retries=2, ) def render_turntable_task( self, context_id_or_step_path: str, output_dir: str | None = None, output_name: str = "turntable", engine: str = "cycles", render_engine: str | None = None, samples: int = 64, smooth_angle: int = 30, cycles_device: str = "gpu", transparent_bg: bool = False, width: int = 1920, height: int = 1080, frame_count: int = 120, fps: int = 30, duration_s: float | None = None, turntable_degrees: float = 360.0, turntable_axis: str = "world_z", bg_color: str = "", template_path: str | None = None, target_collection: str = "Product", material_library_path: str | None = None, material_map: dict | None = None, part_names_ordered: list | None = None, lighting_only: bool = False, shadow_catcher: bool = False, camera_orbit: bool = True, rotation_x: float = 0.0, rotation_y: float = 0.0, rotation_z: float = 0.0, focal_length_mm: float | None = None, sensor_width_mm: float | None = None, material_override: str | None = None, template_inputs: dict | None = None, workflow_run_id: str | None = None, workflow_node_id: str | None = None, publish_asset_enabled: bool = True, observer_output_enabled: bool = False, graph_authoritative_output_enabled: bool = False, graph_output_node_ids: list[str] | None = None, graph_notify_node_ids: list[str] | None = None, emit_legacy_notifications: bool = False, emit_events: bool = True, job_document_enabled: bool = True, output_name_suffix: str | None = None, ) -> dict: """Render a STEP file as a turntable animation (frames + FFmpeg composite). Returns render metadata dict on success. """ del job_document_enabled import json import os import shutil import subprocess import sys from app.services.render_blender import find_blender graph_output_node_ids = list(graph_output_node_ids or []) graph_notify_node_ids = list(graph_notify_node_ids or []) order_line_id: str | None = None step_path = context_id_or_step_path try: uuid.UUID(str(context_id_or_step_path)) except (TypeError, ValueError): resolved_order_line_context = False else: resolved_order_line_context = True if resolved_order_line_context: order_line_id = context_id_or_step_path step_path, _cad_file_id = _resolve_step_path_for_order_line(order_line_id) if not step_path: raise RuntimeError(f"Cannot resolve STEP path for order_line {order_line_id}") step = Path(step_path) canonical_output_dir = build_order_line_step_render_path( step, order_line_id, "placeholder.mp4", ensure_exists=True, ) if output_dir and Path(output_dir) != canonical_output_dir.parent: logger.warning( "render_turntable_task overriding non-canonical output_dir=%s with %s for order_line=%s", output_dir, canonical_output_dir.parent, order_line_id, ) output_dir = str(canonical_output_dir.parent) elif output_dir is None: raise RuntimeError("render_turntable_task requires output_dir when invoked with a STEP path") else: step = Path(step_path) if render_engine not in (None, ""): engine = str(render_engine) if duration_s not in (None, ""): try: normalized_duration_s = float(duration_s) except (TypeError, ValueError): normalized_duration_s = None if normalized_duration_s is not None and normalized_duration_s > 0 and fps > 0: frame_count = max(1, int(round(normalized_duration_s * fps))) if output_name_suffix: output_name = f"{output_name}_{output_name_suffix}" log_task_event(self.request.id, f"Starting render_turntable_task: {step.name}", "info") if order_line_id: _mark_workflow_node_running( order_line_id, workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, task_id=self.request.id, ) blender_bin = find_blender() if not blender_bin: raise RuntimeError("Blender binary not found in render-worker container") out_dir = Path(output_dir) ensure_group_writable_dir(out_dir) output_mp4 = out_dir / f"{output_name}.mp4" logger.info( "render_turntable_task using output_mp4=%s for order_line=%s workflow_run_id=%s", output_mp4, order_line_id, workflow_run_id, ) scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) turntable_script = scripts_dir / "turntable_render.py" # Turntable output is a production render path, so use render-quality tessellation. from app.services.render_blender import build_tessellated_glb_path, resolve_tessellation_settings linear_deflection, angular_deflection, effective_tessellation_engine = resolve_tessellation_settings("render") glb_path = build_tessellated_glb_path( step, "render", effective_tessellation_engine, linear_deflection, angular_deflection, ) if not glb_path.exists() or glb_path.stat().st_size == 0: occ_script = scripts_dir / "export_step_to_gltf.py" occ_cmd = [ sys.executable, str(occ_script), "--step_path", str(step), "--output_path", str(glb_path), "--linear_deflection", str(linear_deflection), "--angular_deflection", str(angular_deflection), "--tessellation_engine", effective_tessellation_engine, ] occ_result = subprocess.run(occ_cmd, capture_output=True, text=True, timeout=120) if occ_result.returncode != 0: raise RuntimeError( f"export_step_to_gltf.py failed:\n{occ_result.stderr[-500:]}" ) logger.info( "render_turntable_task: GLB generated: %s with render tessellation linear=%s angular=%s engine=%s", glb_path.name, linear_deflection, angular_deflection, effective_tessellation_engine, ) # Build turntable render arguments frames_dir = out_dir / f"_frames_{output_name}" if frames_dir.exists(): shutil.rmtree(frames_dir, ignore_errors=True) ensure_group_writable_dir(frames_dir) cmd = [ blender_bin, "--background", "--python", str(turntable_script), "--", str(glb_path), str(frames_dir), str(frame_count), str(int(turntable_degrees)), str(width), str(height), engine, str(samples), "{}", template_path or "", target_collection, material_library_path or "", json.dumps(material_map) if material_map else "{}", json.dumps(part_names_ordered) if part_names_ordered else "[]", "1" if lighting_only else "0", cycles_device, "1" if shadow_catcher else "0", str(rotation_x), str(rotation_y), str(rotation_z), turntable_axis, bg_color, "1" if transparent_bg else "0", ] if camera_orbit: cmd += ["--camera-orbit"] if focal_length_mm is not None: cmd += ["--focal-length", str(focal_length_mm)] if sensor_width_mm is not None: cmd += ["--sensor-width", str(sensor_width_mm)] if material_override: cmd += ["--material-override", material_override] if template_inputs: cmd += ["--template-inputs", json.dumps(template_inputs)] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=3600 ) if result.returncode != 0: raise RuntimeError( f"Blender turntable exited {result.returncode}:\n{result.stdout[-2000:]}" ) except Exception as exc: log_task_event(self.request.id, f"Failed: {exc}", "error") logger.error("render_turntable_task failed: %s", exc) try: from app.core.websocket import publish_event_sync if emit_events: publish_event_sync(None, { "type": "render.turntable.failed", "step_path": step.name, "error": str(exc), }) except Exception: pass if graph_authoritative_output_enabled and order_line_id: _finalize_graph_turntable_output( order_line_id, success=False, output_path=str(output_mp4), render_log={"error": str(exc)}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=str(exc), ) elif observer_output_enabled and order_line_id: _finalize_shadow_turntable_output( order_line_id, success=False, output_path=str(output_mp4), render_log={"error": str(exc)}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=str(exc), ) if order_line_id: _update_workflow_run_status( order_line_id, "failed", str(exc), workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, ) if emit_legacy_notifications: _emit_graph_render_notifications( order_line_id, success=False, render_log={"error": str(exc)}, ) _finalize_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=graph_notify_node_ids, success=False, render_node_id=workflow_node_id, error=str(exc), ) raise self.retry(exc=exc, countdown=60) frame_files = sorted(frames_dir.glob("frame_*.png")) if not frame_files: error_message = f"No frames rendered in {frames_dir}" if result.stdout: error_message = f"{error_message}\nSTDOUT:\n{result.stdout[-2000:]}" if result.stderr: error_message = f"{error_message}\nSTDERR:\n{result.stderr[-2000:]}" if graph_authoritative_output_enabled and order_line_id: _finalize_graph_turntable_output( order_line_id, success=False, output_path=str(output_mp4), render_log={"error": error_message}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=error_message, ) elif observer_output_enabled and order_line_id: _finalize_shadow_turntable_output( order_line_id, success=False, output_path=str(output_mp4), render_log={"error": error_message}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=error_message, ) if order_line_id: _update_workflow_run_status( order_line_id, "failed", error_message, workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, ) if emit_legacy_notifications: _emit_graph_render_notifications( order_line_id, success=False, render_log={"error": error_message}, ) _finalize_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=graph_notify_node_ids, success=False, render_node_id=workflow_node_id, error=error_message, ) raise RuntimeError(error_message) # FFmpeg composite: frames → MP4 with optional background ffmpeg_cmd = _build_ffmpeg_cmd( frames_dir, output_mp4, fps=fps, bg_color=bg_color, width=width, height=height, ) try: subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True, timeout=300) except subprocess.CalledProcessError as exc: error_message = f"FFmpeg composite failed: {exc.stderr[-500:]}" if graph_authoritative_output_enabled and order_line_id: _finalize_graph_turntable_output( order_line_id, success=False, output_path=str(output_mp4), render_log={"error": error_message}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=error_message, ) elif observer_output_enabled and order_line_id: _finalize_shadow_turntable_output( order_line_id, success=False, output_path=str(output_mp4), render_log={"error": error_message}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=error_message, ) if order_line_id: _update_workflow_run_status( order_line_id, "failed", error_message, workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, ) if emit_legacy_notifications: _emit_graph_render_notifications( order_line_id, success=False, render_log={"error": error_message}, ) _finalize_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=graph_notify_node_ids, success=False, render_node_id=workflow_node_id, error=error_message, ) raise RuntimeError(error_message) log_task_event(self.request.id, "Completed successfully", "done") try: from app.core.websocket import publish_event_sync if emit_events: publish_event_sync(None, { "type": "render.turntable.completed", "step_path": step.name, "output": output_mp4.name, }) except Exception: pass result_payload = { "output_mp4": str(output_mp4), "frame_count": frame_count, "fps": fps, } if graph_authoritative_output_enabled and order_line_id: _finalize_graph_turntable_output( order_line_id, success=True, output_path=str(output_mp4), render_log=result_payload, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, ) elif observer_output_enabled and order_line_id: _finalize_shadow_turntable_output( order_line_id, success=True, output_path=str(output_mp4), render_log=result_payload, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, ) elif publish_asset_enabled and order_line_id: publish_asset.delay( order_line_id, "turntable", str(output_mp4), render_config=result_payload, workflow_run_id=workflow_run_id, ) if order_line_id: _update_workflow_run_status( order_line_id, "completed", workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, ) if emit_legacy_notifications: _emit_graph_render_notifications( order_line_id, success=True, render_log=result_payload, ) _finalize_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=graph_notify_node_ids, success=True, render_node_id=workflow_node_id, ) return result_payload @celery_app.task( name="rendering.publish_asset", queue="step_processing", ) def publish_asset( order_line_id: str, asset_type: str, storage_key: str, render_config: dict | None = None, workflow_run_id: str | None = None, ) -> str | None: """Create a MediaAsset record after a successful render.""" from sqlalchemy import select from sqlalchemy.orm import joinedload from app.core.db_utils import get_sync_session from app.domains.media.models import MediaAsset, MediaAssetType from app.domains.orders.models import OrderLine from app.domains.products.models import Product from app.domains.rendering.workflow_runtime_services import ( persist_order_line_media_asset, persist_order_line_output, ) with get_sync_session() as db: line = db.execute( select(OrderLine) .where(OrderLine.id == order_line_id) .options( joinedload(OrderLine.product).joinedload(Product.cad_file), joinedload(OrderLine.output_type), ) ).scalar_one_or_none() if not line: return None resolved_asset_type = MediaAssetType(asset_type) if resolved_asset_type in {MediaAssetType.still, MediaAssetType.turntable}: persisted = persist_order_line_output( db, line, success=True, output_path=storage_key, render_log=render_config if isinstance(render_config, dict) else None, workflow_run_id=workflow_run_id, ) return persisted.asset_id persisted = persist_order_line_media_asset( db, line, success=True, output_path=storage_key, asset_type=resolved_asset_type, render_log=render_config if isinstance(render_config, dict) else None, workflow_run_id=workflow_run_id, ) return persisted.asset_id def _resolve_step_path_for_order_line(order_line_id: str) -> tuple[str | None, str | None]: """Sync helper: resolves (step_path, cad_file_id) from an OrderLine via DB.""" from sqlalchemy import select from sqlalchemy.orm import selectinload from app.core.db_utils import get_sync_session from app.domains.orders.models import OrderLine from app.models.cad_file import CadFile with get_sync_session() as db: line = db.execute( select(OrderLine) .options(selectinload(OrderLine.product)) .where(OrderLine.id == order_line_id) ).scalar_one_or_none() if not line or not line.product or not line.product.cad_file_id: return None, None cad = db.execute( select(CadFile).where(CadFile.id == line.product.cad_file_id) ).scalar_one_or_none() if not cad or not cad.stored_path: return None, None return cad.stored_path, str(line.product.cad_file_id) @celery_app.task( bind=True, name="app.domains.rendering.tasks.render_order_line_still_task", queue="asset_pipeline", max_retries=2, ) def render_order_line_still_task(self, order_line_id: str, **params) -> dict: """Render a still image for an order line, resolving STEP path from DB. Wraps render_still_task logic but accepts order_line_id instead of step_path. On success, creates a MediaAsset record via publish_asset. """ from app.domains.rendering.job_document import RenderJobDocument, JobState from app.core.process_steps import StepName workflow_run_id = params.pop("workflow_run_id", None) workflow_node_id = params.pop("workflow_node_id", None) publish_asset_enabled = bool(params.pop("publish_asset_enabled", True)) observer_output_enabled = bool(params.pop("observer_output_enabled", False)) graph_authoritative_output_enabled = bool(params.pop("graph_authoritative_output_enabled", False)) graph_output_node_ids = list(params.pop("graph_output_node_ids", []) or []) graph_notify_node_ids = list(params.pop("graph_notify_node_ids", []) or []) emit_events = bool(params.pop("emit_events", True)) job_document_enabled = bool(params.pop("job_document_enabled", True)) emit_legacy_notifications = bool(params.pop("emit_legacy_notifications", False)) output_name_suffix = params.pop("output_name_suffix", None) log_task_event(self.request.id, f"Starting render_order_line_still_task: order_line={order_line_id}", "info") _mark_workflow_node_running( order_line_id, workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, task_id=self.request.id, ) # Initialise job document and store real Celery task ID job_doc = RenderJobDocument.new(order_line_id=order_line_id, celery_task_id=self.request.id) job_doc.set_state(JobState.RUNNING) def _save_job_doc(): if not job_document_enabled: return try: from sqlalchemy import update as _upd from app.core.db_utils import get_sync_session from app.domains.orders.models import OrderLine with get_sync_session() as db: db.execute( _upd(OrderLine) .where(OrderLine.id == order_line_id) .values(render_job_doc=job_doc.to_dict()) ) except Exception as _exc: logger.debug("_save_job_doc failed: %s", _exc) _save_job_doc() job_doc.begin_step(StepName.RESOLVE_STEP_PATH) step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id) if not step_path_str: job_doc.fail_step(StepName.RESOLVE_STEP_PATH, "product missing or has no linked CAD file") job_doc.set_state(JobState.FAILED, error="Cannot resolve STEP path") _save_job_doc() log_task_event(self.request.id, f"Failed: cannot resolve STEP path for order_line {order_line_id}", "error") raise RuntimeError( f"Cannot resolve STEP path for order_line {order_line_id}: " "product missing or has no linked CAD file" ) job_doc.finish_step(StepName.RESOLVE_STEP_PATH, output={"step_path": step_path_str}) step = Path(step_path_str) output_extension = _resolve_order_line_still_output_extension(order_line_id, params) output_filename = f"line_{order_line_id}.{output_extension}" if output_name_suffix: output_filename = f"line_{order_line_id}_{output_name_suffix}.{output_extension}" output_path = build_order_line_step_render_path( step, order_line_id, output_filename, ensure_exists=True, ) try: job_doc.begin_step(StepName.BLENDER_STILL) from app.services.step_processor import render_to_file render_params = _normalize_order_line_still_params(params) success, result = render_to_file( step_path=str(step), output_path=str(output_path), order_line_id=order_line_id, **render_params, ) if not success: raise RuntimeError( f"Failed to render still output for order_line {order_line_id}" ) result["output_path"] = str(output_path) job_doc.finish_step( StepName.BLENDER_STILL, output={"output_path": str(output_path), "duration_s": result.get("total_duration_s")}, ) job_doc.set_state(JobState.COMPLETED, result={ "output_path": str(output_path), "duration_s": result.get("total_duration_s"), "engine_used": result.get("engine_used"), }) _save_job_doc() if graph_authoritative_output_enabled: _finalize_graph_still_output( order_line_id, success=True, output_path=str(output_path), render_log=result, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, ) elif observer_output_enabled: _finalize_shadow_still_output( order_line_id, success=True, output_path=str(output_path), render_log=result, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, ) elif publish_asset_enabled: publish_asset.delay( order_line_id, "still", str(output_path), render_config=result, workflow_run_id=workflow_run_id, ) log_task_event(self.request.id, f"Completed successfully in {result.get('total_duration_s', 0):.1f}s", "done") logger.info( "render_order_line_still_task completed for line %s in %.1fs", order_line_id, result.get("total_duration_s", 0), ) try: from app.core.websocket import publish_event_sync if emit_events: publish_event_sync(None, { "type": "render.order_line.completed", "order_line_id": order_line_id, }) except Exception: pass if emit_legacy_notifications: _emit_graph_render_notifications( order_line_id, success=True, render_log=result, ) _finalize_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=graph_notify_node_ids, success=True, render_node_id=workflow_node_id, ) _update_workflow_run_status( order_line_id, "completed", workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, ) return result except Exception as exc: job_doc.fail_step(StepName.BLENDER_STILL, str(exc)) job_doc.set_state(JobState.FAILED, error=str(exc)) _save_job_doc() log_task_event(self.request.id, f"Failed: {exc}", "error") logger.error("render_order_line_still_task failed for %s: %s", order_line_id, exc) try: from app.core.websocket import publish_event_sync if emit_events: publish_event_sync(None, { "type": "render.order_line.failed", "order_line_id": order_line_id, "error": str(exc), }) except Exception: pass if graph_authoritative_output_enabled: _finalize_graph_still_output( order_line_id, success=False, output_path=str(output_path), render_log={"error": str(exc)}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=str(exc), ) elif observer_output_enabled: _finalize_shadow_still_output( order_line_id, success=False, output_path=str(output_path), render_log={"error": str(exc)}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=str(exc), ) if emit_legacy_notifications: _emit_graph_render_notifications( order_line_id, success=False, render_log={"error": str(exc)}, ) _finalize_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=graph_notify_node_ids, success=False, render_node_id=workflow_node_id, error=str(exc), ) _update_workflow_run_status( order_line_id, "failed", str(exc), workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, ) raise self.retry(exc=exc, countdown=30) @celery_app.task( bind=True, name="app.domains.rendering.tasks.export_blend_for_order_line_task", queue="asset_pipeline", max_retries=1, ) def export_blend_for_order_line_task( self, order_line_id: str, workflow_run_id: str | None = None, workflow_node_id: str | None = None, publish_asset_enabled: bool = True, observer_output_enabled: bool = False, graph_authoritative_output_enabled: bool = False, graph_output_node_ids: list[str] | None = None, graph_notify_node_ids: list[str] | None = None, emit_legacy_notifications: bool = False, output_name_suffix: str | None = None, **_kwargs, ) -> dict: """Export a production .blend file via Blender + asset library (export_blend.py). Publishes a MediaAsset with asset_type='blend_production'. Requires Blender + the render-scripts directory. """ import json import os import subprocess graph_output_node_ids = list(graph_output_node_ids or []) graph_notify_node_ids = list(graph_notify_node_ids or []) _mark_workflow_node_running( order_line_id, workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, task_id=self.request.id, ) step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id) if not step_path_str: raise RuntimeError(f"Cannot resolve STEP path for order_line {order_line_id}") step = Path(step_path_str) # Blend export is production-facing, so use render-quality tessellation. from app.services.render_blender import build_tessellated_glb_path, resolve_tessellation_settings linear_deflection, angular_deflection, effective_tessellation_engine = resolve_tessellation_settings("render") glb_path = build_tessellated_glb_path( step, "render", effective_tessellation_engine, linear_deflection, angular_deflection, ) if not glb_path.exists(): import subprocess as _sp import sys as _sys scripts_dir_tmp = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) occ_cmd = [ _sys.executable, str(scripts_dir_tmp / "export_step_to_gltf.py"), "--step_path", str(step), "--output_path", str(glb_path), "--linear_deflection", str(linear_deflection), "--angular_deflection", str(angular_deflection), "--tessellation_engine", effective_tessellation_engine, ] occ_res = _sp.run(occ_cmd, capture_output=True, text=True, timeout=120) if occ_res.returncode != 0: raise RuntimeError(f"GLB generation failed:\n{occ_res.stderr[-500:]}") output_name = f"{step.stem}_production.blend" if output_name_suffix: output_name = f"{step.stem}_production_{output_name_suffix}.blend" output_path = build_order_line_export_path(order_line_id, output_name, ensure_exists=True) ensure_group_writable_dir(output_path.parent) scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) export_script = scripts_dir / "export_blend.py" from app.services.render_blender import find_blender blender_bin = find_blender() if not blender_bin: raise RuntimeError("Blender binary not found — cannot run export_blend task") # Resolve asset library path and material map from DB. asset_lib_path = "" mat_map: dict = {} blend_is_primary_output = False engine = None try: from sqlalchemy import create_engine, select as sql_select from sqlalchemy.orm import Session, joinedload from app.config import settings as app_settings from app.domains.orders.models import OrderLine from app.domains.products.models import Product engine = create_engine(app_settings.database_url_sync) with Session(engine) as s: line = s.execute( sql_select(OrderLine) .where(OrderLine.id == order_line_id) .options( joinedload(OrderLine.product).joinedload(Product.cad_file), joinedload(OrderLine.output_type), ) ).scalar_one_or_none() if line: product = line.product if product: mat_map = { m.get("part_name", ""): m.get("material", "") for m in (product.cad_part_materials or []) } output_type = getattr(line, "output_type", None) if output_type is not None: blend_is_primary_output = ( getattr(output_type, "artifact_kind", None) == "blend_asset" or getattr(output_type, "output_format", None) == "blend" ) except Exception as exc: logger.warning("export_blend_for_order_line_task: DB resolution error (non-fatal): %s", exc) try: ignored_output_overrides = { key: value for key, value in _kwargs.items() if key in {"output_path", "output_dir", "output_name"} } if ignored_output_overrides: logger.warning( "export_blend_for_order_line_task ignoring non-canonical output overrides for %s: %s", order_line_id, ignored_output_overrides, ) cmd = [ blender_bin, "--background", "--python", str(export_script), "--", "--glb_path", str(glb_path), "--output_path", str(output_path), "--asset_library_blend", asset_lib_path, "--material_map", json.dumps(mat_map), ] logger.info( "export_blend_for_order_line_task exporting order_line=%s glb_path=%s output_path=%s workflow_run_id=%s", order_line_id, glb_path, output_path, workflow_run_id, ) result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode != 0: raise RuntimeError( f"export_blend.py exited {result.returncode}:\n{result.stderr[-500:]}" ) result_payload = { "blend_path": str(output_path), "artifact_type": "blend_production", } if graph_authoritative_output_enabled: _finalize_graph_blend_output( order_line_id, success=True, output_path=str(output_path), render_log=result_payload, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, ) elif observer_output_enabled: _finalize_shadow_blend_output( order_line_id, success=True, output_path=str(output_path), render_log=result_payload, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, ) elif blend_is_primary_output: from sqlalchemy import select as sql_select from sqlalchemy.orm import Session, joinedload from app.domains.orders.models import OrderLine from app.domains.products.models import Product from app.domains.rendering.workflow_runtime_services import persist_order_line_output if engine is None: from sqlalchemy import create_engine from app.config import settings as app_settings engine = create_engine(app_settings.database_url_sync) with Session(engine) as s: line = s.execute( sql_select(OrderLine) .where(OrderLine.id == order_line_id) .options( joinedload(OrderLine.product).joinedload(Product.cad_file), joinedload(OrderLine.output_type), ) ).scalar_one_or_none() if line is None: raise RuntimeError(f"Order line {order_line_id} not found during blend persistence") persist_order_line_output( s, line, success=True, output_path=str(output_path), render_log=result_payload, workflow_run_id=workflow_run_id, ) elif publish_asset_enabled: publish_asset.delay( order_line_id, "blend_production", str(output_path), workflow_run_id=workflow_run_id, ) logger.info("export_blend_for_order_line_task completed: %s", output_path.name) _update_workflow_run_status( order_line_id, "completed", workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, ) if emit_legacy_notifications: _emit_graph_render_notifications( order_line_id, success=True, render_log=result_payload, ) _finalize_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=graph_notify_node_ids, success=True, render_node_id=workflow_node_id, ) return result_payload except Exception as exc: logger.error("export_blend_for_order_line_task failed for %s: %s", order_line_id, exc) if graph_authoritative_output_enabled: _finalize_graph_blend_output( order_line_id, success=False, output_path=str(output_path), render_log={"error": str(exc)}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=str(exc), ) elif observer_output_enabled: _finalize_shadow_blend_output( order_line_id, success=False, output_path=str(output_path), render_log={"error": str(exc)}, workflow_run_id=workflow_run_id, output_node_ids=graph_output_node_ids, render_node_id=workflow_node_id, error=str(exc), ) elif blend_is_primary_output: try: from sqlalchemy import select as sql_select from sqlalchemy.orm import Session, joinedload from app.domains.orders.models import OrderLine from app.domains.products.models import Product from app.domains.rendering.workflow_runtime_services import persist_order_line_output if engine is None: from sqlalchemy import create_engine from app.config import settings as app_settings engine = create_engine(app_settings.database_url_sync) with Session(engine) as s: line = s.execute( sql_select(OrderLine) .where(OrderLine.id == order_line_id) .options( joinedload(OrderLine.product).joinedload(Product.cad_file), joinedload(OrderLine.output_type), ) ).scalar_one_or_none() if line is not None: persist_order_line_output( s, line, success=False, output_path=str(output_path), render_log={"error": str(exc)}, workflow_run_id=workflow_run_id, ) except Exception: logger.exception( "export_blend_for_order_line_task: failed to persist primary blend failure for %s", order_line_id, ) _update_workflow_run_status( order_line_id, "failed", str(exc), workflow_run_id=workflow_run_id, workflow_node_id=workflow_node_id, ) if emit_legacy_notifications: _emit_graph_render_notifications( order_line_id, success=False, render_log={"error": str(exc)}, ) _finalize_graph_notify_nodes( workflow_run_id=workflow_run_id, notify_node_ids=graph_notify_node_ids, success=False, render_node_id=workflow_node_id, error=str(exc), ) raise self.retry(exc=exc, countdown=30) finally: if engine is not None: engine.dispose() @celery_app.task( bind=True, name="app.domains.rendering.tasks.apply_asset_library_materials_task", queue="asset_pipeline", max_retries=1, ) def apply_asset_library_materials_task(self, order_line_id: str, asset_library_id: str) -> dict: """Apply Blender asset library materials to a render via the asset_library.py script.""" import json import os import subprocess from pathlib import Path from app.services.render_blender import find_blender blender_bin = find_blender() if not blender_bin: raise RuntimeError("Blender not available") # Resolve paths from DB def _inner(): from sqlalchemy import create_engine, select as sql_select from sqlalchemy.orm import Session from app.config import settings from app.domains.orders.models import OrderLine from app.domains.products.models import CadFile, Product engine = create_engine(settings.database_url_sync) with Session(engine) as s: line = s.execute(sql_select(OrderLine).where(OrderLine.id == order_line_id)).scalar_one_or_none() if not line: return None, None, None product = s.execute(sql_select(Product).where(Product.id == line.product_id)).scalar_one_or_none() if not product or not product.cad_file_id: return None, None, None cad = s.execute(sql_select(CadFile).where(CadFile.id == product.cad_file_id)).scalar_one_or_none() glb_path = str(Path(cad.stored_path).parent / f"{Path(cad.stored_path).stem}_geometry.glb") if cad else None # Resolve asset library blend path try: from app.domains.materials.models import AssetLibrary lib = s.execute(sql_select(AssetLibrary).where(AssetLibrary.id == asset_library_id)).scalar_one_or_none() blend_path = lib.blend_file_path if lib else None except Exception: blend_path = None mat_map = {m.get("part_name", ""): m.get("material", "") for m in (product.cad_part_materials or [])} return glb_path, blend_path, mat_map result = _inner() if result is None or result[0] is None: logger.warning("apply_asset_library_materials_task: could not resolve paths for %s", order_line_id) return {"status": "skipped"} glb_path, blend_path, mat_map = result if not glb_path or not Path(glb_path).exists(): logger.warning("Geometry GLB not found for %s", order_line_id) return {"status": "skipped", "reason": "glb_not_found"} scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) script = scripts_dir / "asset_library.py" cmd = [ blender_bin, "--background", "--python", str(script), "--", "--glb_path", glb_path, "--asset_library_blend", blend_path or "", "--material_map", json.dumps(mat_map), ] try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if proc.returncode != 0: raise RuntimeError(f"asset_library.py failed: {proc.stderr[-500:]}") except Exception as exc: logger.error("apply_asset_library_materials_task failed for %s: %s", order_line_id, exc) raise self.retry(exc=exc, countdown=15) return {"status": "applied", "order_line_id": order_line_id} def _build_ffmpeg_cmd( frames_dir: Path, output_mp4: Path, fps: int = 30, bg_color: str = "", width: int = 1920, height: int = 1080, ) -> list: """Build FFmpeg command for compositing turntable frames to MP4.""" from app.services.render_blender import build_turntable_ffmpeg_cmd return build_turntable_ffmpeg_cmd( frames_dir, output_mp4, fps=fps, bg_color=bg_color, width=width, height=height, )