"""Rendering domain tasks — Celery tasks for Blender-based rendering. These tasks run on the `thumbnail_rendering` queue in the render-worker container, which has Blender and cadquery available. Phase A2: Initial implementation replacing the blender-renderer HTTP service. Phase B: This module will be expanded as part of the Domain-Driven restructure. """ import logging from pathlib import Path from app.tasks.celery_app import celery_app logger = logging.getLogger(__name__) @celery_app.task( bind=True, name="app.domains.rendering.tasks.render_still_task", queue="thumbnail_rendering", max_retries=2, ) def render_still_task( self, step_path: str, output_path: str, engine: str = "cycles", samples: int = 256, stl_quality: str = "low", smooth_angle: int = 30, cycles_device: str = "auto", width: int = 512, height: int = 512, transparent_bg: bool = False, template_path: str | None = None, target_collection: str = "Product", material_library_path: str | None = None, material_map: dict | None = None, part_names_ordered: list | None = None, lighting_only: bool = False, shadow_catcher: bool = False, rotation_x: float = 0.0, rotation_y: float = 0.0, rotation_z: float = 0.0, noise_threshold: str = "", denoiser: str = "", denoising_input_passes: str = "", denoising_prefilter: str = "", denoising_quality: str = "", denoising_use_gpu: str = "", mesh_attributes: dict | None = None, ) -> dict: """Render a STEP file to a still PNG via Blender subprocess. Returns render metadata dict on success. Retries up to 2 times on failure (30s countdown). """ try: from app.services.render_blender import render_still result = render_still( step_path=Path(step_path), output_path=Path(output_path), engine=engine, samples=samples, stl_quality=stl_quality, smooth_angle=smooth_angle, cycles_device=cycles_device, width=width, height=height, transparent_bg=transparent_bg, template_path=template_path, target_collection=target_collection, material_library_path=material_library_path, material_map=material_map, part_names_ordered=part_names_ordered, lighting_only=lighting_only, shadow_catcher=shadow_catcher, rotation_x=rotation_x, rotation_y=rotation_y, rotation_z=rotation_z, noise_threshold=noise_threshold, denoiser=denoiser, denoising_input_passes=denoising_input_passes, denoising_prefilter=denoising_prefilter, denoising_quality=denoising_quality, denoising_use_gpu=denoising_use_gpu, mesh_attributes=mesh_attributes or {}, ) logger.info( "render_still_task completed: %s → %s in %.1fs", Path(step_path).name, Path(output_path).name, result.get("total_duration_s", 0), ) return result except Exception as exc: logger.error("render_still_task failed for %s: %s", step_path, exc) raise self.retry(exc=exc, countdown=30) @celery_app.task( bind=True, name="app.domains.rendering.tasks.render_turntable_task", queue="thumbnail_rendering", max_retries=2, ) def render_turntable_task( self, step_path: str, output_dir: str, output_name: str = "turntable", engine: str = "cycles", samples: int = 64, stl_quality: str = "low", smooth_angle: int = 30, cycles_device: str = "auto", width: int = 1920, height: int = 1080, frame_count: int = 120, fps: int = 30, turntable_degrees: float = 360.0, turntable_axis: str = "world_z", bg_color: str = "", template_path: str | None = None, target_collection: str = "Product", material_library_path: str | None = None, material_map: dict | None = None, part_names_ordered: list | None = None, lighting_only: bool = False, shadow_catcher: bool = False, camera_orbit: bool = True, rotation_x: float = 0.0, rotation_y: float = 0.0, rotation_z: float = 0.0, ) -> dict: """Render a STEP file as a turntable animation (frames + FFmpeg composite). Returns render metadata dict on success. """ import json import os import shutil import subprocess from app.services.render_blender import ( find_blender, convert_step_to_stl, export_per_part_stls ) blender_bin = find_blender() if not blender_bin: raise RuntimeError("Blender binary not found in render-worker container") step = Path(step_path) out_dir = Path(output_dir) out_dir.mkdir(parents=True, exist_ok=True) scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) turntable_script = scripts_dir / "turntable_render.py" # STL conversion — try MinIO cache first, then convert locally stl_path = step.parent / f"{step.stem}_{stl_quality}.stl" if not stl_path.exists() or stl_path.stat().st_size == 0: try: from app.domains.products.cache_service import compute_step_hash, check_stl_cache step_hash = compute_step_hash(str(step)) cached = check_stl_cache(step_hash, stl_quality) if cached: stl_path.write_bytes(cached) logger.info("STL restored from MinIO cache: %s", stl_path.name) else: convert_step_to_stl(step, stl_path, stl_quality) except Exception as exc: logger.warning("MinIO cache check failed (non-fatal): %s — falling back to conversion", exc) convert_step_to_stl(step, stl_path, stl_quality) parts_dir = step.parent / f"{step.stem}_{stl_quality}_parts" if not (parts_dir / "manifest.json").exists(): try: export_per_part_stls(step, parts_dir, stl_quality) except Exception as exc: logger.warning("per-part export non-fatal: %s", exc) # Build turntable render arguments frames_dir = out_dir / "frames" frames_dir.mkdir(exist_ok=True) cmd = [ blender_bin, "--background", "--python", str(turntable_script), "--", str(stl_path), str(frames_dir), output_name, str(width), str(height), engine, str(samples), str(smooth_angle), cycles_device, str(frame_count), str(fps), str(turntable_degrees), turntable_axis, template_path or "", target_collection, material_library_path or "", json.dumps(material_map) if material_map else "{}", json.dumps(part_names_ordered) if part_names_ordered else "[]", "1" if lighting_only else "0", "1" if shadow_catcher else "0", "1" if camera_orbit else "0", str(rotation_x), str(rotation_y), str(rotation_z), ] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=3600 ) if result.returncode != 0: raise RuntimeError( f"Blender turntable exited {result.returncode}:\n{result.stdout[-2000:]}" ) except Exception as exc: logger.error("render_turntable_task failed: %s", exc) raise self.retry(exc=exc, countdown=60) # FFmpeg composite: frames → MP4 with optional background output_mp4 = out_dir / f"{output_name}.mp4" ffmpeg_cmd = _build_ffmpeg_cmd( frames_dir, output_mp4, fps=fps, bg_color=bg_color ) try: subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True, timeout=300) except subprocess.CalledProcessError as exc: raise RuntimeError(f"FFmpeg composite failed: {exc.stderr[-500:]}") return { "output_mp4": str(output_mp4), "frame_count": frame_count, "fps": fps, } @celery_app.task( name="rendering.publish_asset", queue="step_processing", ) def publish_asset( order_line_id: str, asset_type: str, storage_key: str, render_config: dict | None = None, ) -> str | None: """Create a MediaAsset record after a successful render.""" import asyncio async def _run() -> str | None: from app.database import AsyncSessionLocal from app.domains.media.models import MediaAsset, MediaAssetType from app.domains.orders.models import OrderLine from sqlalchemy import select async with AsyncSessionLocal() as db: res = await db.execute(select(OrderLine).where(OrderLine.id == order_line_id)) line = res.scalar_one_or_none() if not line: return None asset = MediaAsset( tenant_id=getattr(line, "tenant_id", None), order_line_id=line.id, asset_type=MediaAssetType(asset_type), storage_key=storage_key, render_config=render_config, ) db.add(asset) await db.commit() return str(asset.id) return asyncio.get_event_loop().run_until_complete(_run()) def _resolve_step_path_for_order_line(order_line_id: str) -> tuple[str | None, str | None]: """Sync helper: resolves (step_path, cad_file_id) from an OrderLine via DB.""" import asyncio async def _inner() -> tuple[str | None, str | None]: from app.database import AsyncSessionLocal from app.domains.orders.models import OrderLine from app.domains.products.models import Product from app.models.cad_file import CadFile from sqlalchemy import select from sqlalchemy.orm import selectinload async with AsyncSessionLocal() as db: res = await db.execute( select(OrderLine) .options(selectinload(OrderLine.product)) .where(OrderLine.id == order_line_id) ) line = res.scalar_one_or_none() if not line or not line.product or not line.product.cad_file_id: return None, None cad_res = await db.execute( select(CadFile).where(CadFile.id == line.product.cad_file_id) ) cad = cad_res.scalar_one_or_none() if not cad or not cad.stored_path: return None, None return cad.stored_path, str(line.product.cad_file_id) return asyncio.get_event_loop().run_until_complete(_inner()) @celery_app.task( bind=True, name="app.domains.rendering.tasks.render_order_line_still_task", queue="thumbnail_rendering", max_retries=2, ) def render_order_line_still_task(self, order_line_id: str, **params) -> dict: """Render a still image for an order line, resolving STEP path from DB. Wraps render_still_task logic but accepts order_line_id instead of step_path. On success, creates a MediaAsset record via publish_asset. """ step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id) if not step_path_str: raise RuntimeError( f"Cannot resolve STEP path for order_line {order_line_id}: " "product missing or has no linked CAD file" ) step = Path(step_path_str) output_dir = step.parent / "renders" output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / f"line_{order_line_id}.png" try: from app.services.render_blender import render_still result = render_still( step_path=step, output_path=output_path, **params, ) publish_asset.delay( order_line_id, "still", str(output_path), render_config=result, ) logger.info( "render_order_line_still_task completed for line %s in %.1fs", order_line_id, result.get("total_duration_s", 0), ) return result except Exception as exc: logger.error("render_order_line_still_task failed for %s: %s", order_line_id, exc) raise self.retry(exc=exc, countdown=30) @celery_app.task( bind=True, name="app.domains.rendering.tasks.export_gltf_for_order_line_task", queue="thumbnail_rendering", max_retries=1, ) def export_gltf_for_order_line_task(self, order_line_id: str) -> dict: """Export a geometry-only GLB from the STL cache using trimesh (no Blender). Publishes a MediaAsset with asset_type='gltf_geometry'. Requires the STL low-quality cache to exist. """ step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id) if not step_path_str: raise RuntimeError(f"Cannot resolve STEP path for order_line {order_line_id}") step = Path(step_path_str) stl_path = step.parent / f"{step.stem}_low.stl" if not stl_path.exists(): raise RuntimeError( f"STL cache not found: {stl_path}. Run thumbnail generation first." ) output_path = step.parent / f"{step.stem}_geometry.glb" try: import trimesh mesh = trimesh.load(str(stl_path)) mesh.export(str(output_path)) publish_asset.delay(order_line_id, "gltf_geometry", str(output_path)) logger.info("export_gltf_for_order_line_task completed: %s", output_path.name) return {"glb_path": str(output_path)} except Exception as exc: logger.error("export_gltf_for_order_line_task failed for %s: %s", order_line_id, exc) raise self.retry(exc=exc, countdown=15) @celery_app.task( bind=True, name="app.domains.rendering.tasks.export_blend_for_order_line_task", queue="thumbnail_rendering", max_retries=1, ) def export_blend_for_order_line_task(self, order_line_id: str) -> dict: """Export a production-quality GLB via Blender + asset library (export_gltf.py). Publishes a MediaAsset with asset_type='blend_production'. Requires Blender + the render-scripts directory. """ import os import subprocess step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id) if not step_path_str: raise RuntimeError(f"Cannot resolve STEP path for order_line {order_line_id}") step = Path(step_path_str) stl_path = step.parent / f"{step.stem}_low.stl" if not stl_path.exists(): raise RuntimeError(f"STL cache not found: {stl_path}") output_path = step.parent / f"{step.stem}_production.glb" scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) export_script = scripts_dir / "export_gltf.py" from app.services.render_blender import find_blender blender_bin = find_blender() if not blender_bin: raise RuntimeError("Blender binary not found — cannot run export_blend task") try: cmd = [ blender_bin, "--background", "--python", str(export_script), "--", "--stl_path", str(stl_path), "--output_path", str(output_path), ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode != 0: raise RuntimeError( f"export_gltf.py exited {result.returncode}:\n{result.stderr[-500:]}" ) publish_asset.delay(order_line_id, "blend_production", str(output_path)) logger.info("export_blend_for_order_line_task completed: %s", output_path.name) return {"glb_path": str(output_path)} except Exception as exc: logger.error("export_blend_for_order_line_task failed for %s: %s", order_line_id, exc) raise self.retry(exc=exc, countdown=30) def _build_ffmpeg_cmd( frames_dir: Path, output_mp4: Path, fps: int = 30, bg_color: str = "" ) -> list: """Build FFmpeg command for compositing turntable frames to MP4.""" import shutil as _shutil ffmpeg = _shutil.which("ffmpeg") or "ffmpeg" frame_pattern = str(frames_dir / "%04d.png") if bg_color: # Overlay transparent frames onto solid color background r = int(bg_color[1:3], 16) if bg_color.startswith("#") else 255 g = int(bg_color[3:5], 16) if bg_color.startswith("#") else 255 b = int(bg_color[5:7], 16) if bg_color.startswith("#") else 255 color_str = f"color=c=0x{r:02x}{g:02x}{b:02x}:s=1920x1080:r={fps}" return [ ffmpeg, "-y", "-f", "lavfi", "-i", color_str, "-framerate", str(fps), "-i", frame_pattern, "-filter_complex", "[0:v][1:v]overlay=0:0", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-movflags", "+faststart", str(output_mp4), ] else: return [ ffmpeg, "-y", "-framerate", str(fps), "-i", frame_pattern, "-c:v", "libx264", "-pix_fmt", "yuv420p", "-movflags", "+faststart", str(output_mp4), ]