""" STEP file processor — Phase 3 implementation. Extracts object names from STEP files using pythonocc-core (OCC), generates thumbnails using trimesh + pyrender, and converts to glTF. This module is invoked from the Celery worker (step_tasks.py). """ import logging import uuid from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: from app.models.cad_file import CadFile logger = logging.getLogger(__name__) MATERIAL_PALETTE = [ "#4C9BE8", "#E85B4C", "#4CBE72", "#E8A84C", "#A04CE8", "#4CD4E8", "#E84CA8", "#7EC850", "#E86B30", "#5088C8", ] def _material_to_color(material_name: str | None, index: int) -> str: """Return a deterministic hex color: hash material name, or use palette by index.""" if material_name and material_name.strip(): i = abs(hash(material_name.strip().lower())) % len(MATERIAL_PALETTE) return MATERIAL_PALETTE[i] return MATERIAL_PALETTE[index % len(MATERIAL_PALETTE)] def build_part_colors( cad_parsed_objects: list[str], cad_part_materials: list[dict], ) -> dict[str, str]: """ Build {part_name: hex_color} for thumbnail rendering. Args: cad_parsed_objects: List of part names from cad_file.parsed_objects["objects"]. cad_part_materials: List of {part_name, material} dicts from order_item.cad_part_materials. """ mat_map = { m["part_name"].lower(): m.get("material") for m in cad_part_materials if m.get("part_name") } return { name: _material_to_color(mat_map.get(name.lower()), i) for i, name in enumerate(cad_parsed_objects) } def _normalize_stem(name: str) -> str: """Normalize a filename stem for comparison: lowercase, strip .stp/.step extension.""" stem = name.strip() for ext in (".step", ".stp"): if stem.lower().endswith(ext): stem = stem[: -len(ext)] break return stem.lower() def match_cad_to_items( cad_file: "CadFile", item_names: list[str], ) -> list[str]: """ Match a CadFile to a list of OrderItem name_cad_modell values. Matching is case-insensitive and normalizes .stp/.step extensions so that a file named '81113-L_cut.stp' matches an item named '81113-l_cut' or '81113-L_cut.step'. Args: cad_file: A CadFile ORM object (needs .original_name). item_names: List of name_cad_modell strings from OrderItems. Returns: List of matched item names (subset of item_names). """ cad_stem = _normalize_stem(cad_file.original_name or "") matched = [] for name in item_names: if not name: continue if _normalize_stem(name) == cad_stem: matched.append(name) return matched def extract_cad_metadata(cad_file_id: str) -> None: """ Fast metadata extraction for a CAD file (no thumbnail generation). Does everything process_cad_file() does EXCEPT thumbnail rendering: - Sets status to processing - Extracts STEP object names - Converts to glTF - Leaves status as processing (render_step_thumbnail task will complete it) - On exception: sets status to failed """ from app.config import settings from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.models.cad_file import CadFile, ProcessingStatus engine = create_engine(settings.database_url_sync) with Session(engine) as session: cad_file = session.get(CadFile, uuid.UUID(cad_file_id)) if not cad_file: logger.error(f"CAD file not found: {cad_file_id}") return cad_file.processing_status = ProcessingStatus.processing session.commit() try: step_path = Path(cad_file.stored_path) if not step_path.exists(): raise FileNotFoundError(f"STEP file not found: {step_path}") objects = _extract_step_objects(step_path) cad_file.parsed_objects = {"objects": objects} edge_data = extract_mesh_edge_data(str(step_path)) if edge_data: cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **edge_data} gltf_path = _convert_to_gltf(step_path, cad_file_id, settings.upload_dir) if gltf_path: cad_file.gltf_path = str(gltf_path) # Leave status as processing — render_step_thumbnail will complete it logger.info(f"CAD metadata extracted: {cad_file_id} ({len(objects)} objects)") except Exception as exc: logger.error(f"CAD metadata extraction failed for {cad_file_id}: {exc}") cad_file.processing_status = ProcessingStatus.failed cad_file.error_message = str(exc)[:2000] session.commit() def process_cad_file(cad_file_id: str) -> None: """ Full processing pipeline for a CAD file: 1. Load STEP file with pythonocc 2. Extract part/object names 3. Generate thumbnail PNG 4. Convert to glTF for browser viewer 5. Update DB record """ from app.config import settings # Synchronous DB access for Celery worker from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.models.cad_file import CadFile, ProcessingStatus engine = create_engine(settings.database_url_sync) with Session(engine) as session: cad_file = session.get(CadFile, uuid.UUID(cad_file_id)) if not cad_file: logger.error(f"CAD file not found: {cad_file_id}") return cad_file.processing_status = ProcessingStatus.processing session.commit() try: step_path = Path(cad_file.stored_path) if not step_path.exists(): raise FileNotFoundError(f"STEP file not found: {step_path}") # Step 1: Extract object names objects = _extract_step_objects(step_path) cad_file.parsed_objects = {"objects": objects} # Step 1b: Extract sharp-edge topology data and merge into mesh_attributes edge_data = extract_mesh_edge_data(str(step_path)) if edge_data: cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **edge_data} # Step 2: Generate thumbnail — pass empty part_colors so the Three.js # renderer extracts named parts and auto-assigns palette colours. # Other renderers (Blender, Pillow) ignore the part_colors argument. thumb_path, render_log = _generate_thumbnail(step_path, cad_file_id, settings.upload_dir, part_colors={}) if thumb_path: cad_file.thumbnail_path = str(thumb_path) cad_file.render_log = render_log # Step 3: Convert to glTF gltf_path = _convert_to_gltf(step_path, cad_file_id, settings.upload_dir) if gltf_path: cad_file.gltf_path = str(gltf_path) cad_file.processing_status = ProcessingStatus.completed logger.info(f"CAD file processed successfully: {cad_file_id}") except Exception as exc: logger.error(f"CAD processing failed for {cad_file_id}: {exc}") cad_file.processing_status = ProcessingStatus.failed cad_file.error_message = str(exc)[:2000] session.commit() def extract_mesh_edge_data(step_path: str) -> dict: """Extract sharp edge metrics and suggested smooth angle from STEP topology. Returns dict with: - suggested_smooth_angle: float (degrees) — recommended auto-smooth angle - has_mechanical_edges: bool — True if part has distinct hard edges (bearings etc.) - sharp_edge_midpoints: list of [x, y, z] — midpoints of sharp edges in mm (max 500) """ try: from OCC.Core.STEPControl import STEPControl_Reader from OCC.Core.IFSelect import IFSelect_RetDone from OCC.Core.TopExp import TopExp_Explorer from OCC.Core.TopAbs import TopAbs_EDGE, TopAbs_FACE from OCC.Core.BRepAdaptor import BRepAdaptor_Surface from OCC.Core.BRep import BRep_Tool from OCC.Core.BRepGProp import brepgprop from OCC.Core.GProp import GProp_GProps from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh from OCC.Core.gp import gp_Pnt import math reader = STEPControl_Reader() status = reader.ReadFile(step_path) if status != IFSelect_RetDone: return {} reader.TransferRoots() shape = reader.OneShape() # Mesh the shape for geometry access BRepMesh_IncrementalMesh(shape, 0.5, False, 0.5) # Collect face normals per edge (for dihedral angle computation) from OCC.Core.TopTools import TopTools_IndexedDataMapOfShapeListOfShape from OCC.Core.TopExp import topexp edge_face_map = TopTools_IndexedDataMapOfShapeListOfShape() topexp.MapShapesAndAncestors(shape, TopAbs_EDGE, TopAbs_FACE, edge_face_map) dihedral_angles = [] sharp_midpoints = [] for i in range(1, edge_face_map.Extent() + 1): edge = edge_face_map.FindKey(i) faces = edge_face_map.FindFromIndex(i) if faces.Size() < 2: continue # Get the two adjacent faces face_list = list(faces) if len(face_list) < 2: continue try: surf1 = BRepAdaptor_Surface(face_list[0]) surf2 = BRepAdaptor_Surface(face_list[1]) # Get normals at midpoint of edge from OCC.Core.BRepAdaptor import BRepAdaptor_Curve curve = BRepAdaptor_Curve(edge) mid_u = (curve.FirstParameter() + curve.LastParameter()) / 2 mid_pt = curve.Value(mid_u) # Sample face normals at UV center u1 = (surf1.FirstUParameter() + surf1.LastUParameter()) / 2 v1 = (surf1.FirstVParameter() + surf1.LastVParameter()) / 2 n1 = surf1.DN(u1, v1, 0, 1).Crossed(surf1.DN(u1, v1, 1, 0)) u2 = (surf2.FirstUParameter() + surf2.LastUParameter()) / 2 v2 = (surf2.FirstVParameter() + surf2.LastVParameter()) / 2 n2 = surf2.DN(u2, v2, 0, 1).Crossed(surf2.DN(u2, v2, 1, 0)) if n1.Magnitude() > 1e-10 and n2.Magnitude() > 1e-10: n1.Normalize() n2.Normalize() cos_angle = max(-1.0, min(1.0, n1.Dot(n2))) angle_deg = math.degrees(math.acos(abs(cos_angle))) dihedral_angles.append(angle_deg) if angle_deg > 20 and len(sharp_midpoints) < 500: sharp_midpoints.append([ round(mid_pt.X(), 3), round(mid_pt.Y(), 3), round(mid_pt.Z(), 3), ]) except Exception: continue # Bounding box extraction (OCC Bnd_Box) from OCC.Core.Bnd import Bnd_Box from OCC.Core.BRepBndLib import brepbndlib try: bbox = Bnd_Box() brepbndlib.Add(shape, bbox) xmin, ymin, zmin, xmax, ymax, zmax = bbox.Get() dimensions_mm = { "x": round(xmax - xmin, 2), "y": round(ymax - ymin, 2), "z": round(zmax - zmin, 2), } bbox_center_mm = { "x": round((xmin + xmax) / 2, 2), "y": round((ymin + ymax) / 2, 2), "z": round((zmin + zmax) / 2, 2), } except Exception: dimensions_mm = None bbox_center_mm = None if not dihedral_angles: result: dict = {} if dimensions_mm: result["dimensions_mm"] = dimensions_mm result["bbox_center_mm"] = bbox_center_mm return result import statistics median_angle = statistics.median(dihedral_angles) max_angle = max(dihedral_angles) # Suggest smooth angle: slightly below the median of hard edges hard_edges = [a for a in dihedral_angles if a > 20] if hard_edges: suggested = max(15.0, min(60.0, statistics.median(hard_edges) * 0.8)) else: suggested = 30.0 result = { "suggested_smooth_angle": round(suggested, 1), "has_mechanical_edges": max_angle > 45, "sharp_edge_midpoints": sharp_midpoints[:500], } if dimensions_mm: result["dimensions_mm"] = dimensions_mm result["bbox_center_mm"] = bbox_center_mm return result except ImportError: # OCC not available (e.g. in backend container) return {} except Exception as exc: logger.warning("extract_mesh_edge_data failed (non-fatal): %s", exc) return {} def _extract_step_objects(step_path: Path) -> list[str]: """Extract part names from STEP file using pythonocc.""" try: from OCC.Core.STEPCAFControl import STEPCAFControl_Reader from OCC.Core.XCAFDoc import XCAFDoc_DocumentTool from OCC.Core.TDocStd import TDocStd_Document from OCC.Core.TDataStd import TDataStd_Name from OCC.Core.TCollection import TCollection_ExtendedString doc = TDocStd_Document(TCollection_ExtendedString("MDTV-CAF")) reader = STEPCAFControl_Reader() reader.SetColorMode(True) reader.SetNameMode(True) status = reader.ReadFile(str(step_path)) if not reader.Transfer(doc): return [] shape_tool = XCAFDoc_DocumentTool.ShapeTool(doc.Main()) labels = [] shape_tool.GetFreeShapes(labels) names = [] for label in labels: name_attr = TDataStd_Name() if label.FindAttribute(TDataStd_Name.GetID(), name_attr): names.append(name_attr.Get().ToExtString()) return names except ImportError: logger.warning("pythonocc-core not available; skipping object extraction") return _extract_step_objects_fallback(step_path) except Exception as exc: logger.warning(f"OCC extraction failed: {exc}") return _extract_step_objects_fallback(step_path) def _extract_step_objects_fallback(step_path: Path) -> list[str]: """Simple text-based extraction of part names from STEP file.""" names = [] try: with open(step_path, "r", encoding="utf-8", errors="replace") as f: for line in f: # STEP format: PRODUCT('name','description',... if "PRODUCT(" in line: parts = line.split("PRODUCT(") for part in parts[1:]: if "'" in part: name = part.split("'")[1] if name and name not in names: names.append(name) except Exception: pass return names def _get_all_settings() -> dict[str, str]: """Read all system settings from the database.""" defaults = { "thumbnail_renderer": "pillow", "blender_engine": "cycles", "blender_cycles_samples": "256", "blender_eevee_samples": "64", "threejs_render_size": "1024", "thumbnail_format": "jpg", "stl_quality": "low", "blender_smooth_angle": "30", "cycles_device": "auto", } try: from app.config import settings as app_settings from sqlalchemy import create_engine, text from sqlalchemy.orm import Session engine = create_engine(app_settings.database_url_sync) with Session(engine) as session: result = session.execute(text("SELECT key, value FROM system_settings")) stored = {row[0]: row[1] for row in result.fetchall()} return {k: stored.get(k, v) for k, v in defaults.items()} except Exception as exc: logger.warning(f"Could not read settings: {exc}; using defaults") return defaults def _generate_thumbnail( step_path: Path, cad_file_id: str, upload_dir: str, part_colors: dict[str, str] | None = None, ) -> tuple[Path | None, dict]: """Generate thumbnail using the configured renderer. Returns (thumb_path, render_log_dict). render_log_dict contains all settings + timing + blender output. """ import time out_dir = Path(upload_dir) / "thumbnails" out_dir.mkdir(parents=True, exist_ok=True) settings = _get_all_settings() renderer = settings["thumbnail_renderer"] fmt = settings["thumbnail_format"] # "jpg" or "png" ext = "jpg" if fmt == "jpg" else "png" # Clean up any existing thumbnail for this cad_file_id (either extension) for old_ext in ("png", "jpg"): old = out_dir / f"{cad_file_id}.{old_ext}" if old.exists(): old.unlink(missing_ok=True) final_path = out_dir / f"{cad_file_id}.{ext}" # Intermediate PNG used when a service renderer produces PNG before conversion tmp_png = out_dir / f"{cad_file_id}_tmp.png" # Build the base render_log with the settings snapshot render_log: dict = { "renderer": renderer, "format": fmt, "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } if renderer == "blender": engine = settings["blender_engine"] render_log.update({ "engine": engine, "samples": int(settings[f"blender_{engine}_samples"]), "stl_quality": settings["stl_quality"], "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": settings["cycles_device"], "width": 512, "height": 512, }) elif renderer == "threejs": # Three.js renderer removed in v2; treat as pillow fallback renderer = "pillow" render_log.update({"renderer": "pillow", "threejs_removed": True}) logger.info(f"Thumbnail renderer={renderer}, format={fmt}") rendered_png: Path | None = None service_data: dict = {} if renderer == "blender": engine = settings["blender_engine"] samples = int(settings[f"blender_{engine}_samples"]) from app.services.render_blender import is_blender_available, render_still if is_blender_available(): try: service_data = render_still( step_path=step_path, output_path=tmp_png, engine=engine, samples=samples, stl_quality=settings["stl_quality"], smooth_angle=int(settings["blender_smooth_angle"]), cycles_device=settings["cycles_device"], ) rendered_png = tmp_png if tmp_png.exists() else None except Exception as exc: logger.warning("Blender subprocess render failed: %s", exc) rendered_png = None else: logger.warning("Blender not available in this container — falling back to Pillow placeholder") # Merge rich service response data into render_log if service_data: for key in ("total_duration_s", "stl_duration_s", "render_duration_s", "stl_size_bytes", "output_size_bytes", "parts_count", "engine_used", "log_lines"): if key in service_data: render_log[key] = service_data[key] render_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) if rendered_png: result = _finalise_image(rendered_png, final_path, fmt) tmp_png.unlink(missing_ok=True) render_log["fallback"] = False return result, render_log # Pillow placeholder render_log["fallback"] = True return _generate_thumbnail_placeholder(step_path, final_path, fmt), render_log def _finalise_image(src: Path, dst: Path, fmt: str) -> Path | None: """Convert src image to dst using the requested format (jpg or png).""" if fmt == "jpg": try: from PIL import Image img = Image.open(src).convert("RGB") img.save(str(dst), "JPEG", quality=92, optimize=True) return dst except Exception as exc: logger.warning(f"JPG conversion failed: {exc}; keeping PNG") src.rename(dst.with_suffix(".png")) return dst.with_suffix(".png") else: src.rename(dst) return dst def _render_via_service( url: str, step_path: Path, out_path: Path, extra: dict | None = None, job_id: str | None = None, ) -> tuple[Path | None, dict]: """Call an external renderer microservice to generate a thumbnail. Returns (path_or_None, response_data_dict). job_id, when provided, is forwarded to the renderer so the render process can be cancelled via the renderer's /cancel/{job_id} endpoint. """ try: import httpx payload = { "step_path": str(step_path), "output_path": str(out_path), "width": 512, "height": 512, **(extra or {}), } if job_id: payload["job_id"] = job_id resp = httpx.post(url, json=payload, timeout=300.0) data = {} try: data = resp.json() except Exception: pass if resp.status_code == 200 and out_path.exists(): return out_path, data logger.warning(f"Renderer service {url} returned {resp.status_code}: {resp.text[:500]}") except Exception as exc: logger.warning(f"Renderer service {url} unreachable: {exc}") return None, {} def _generate_thumbnail_placeholder(step_path: Path, out_path: Path, fmt: str = "png") -> Path | None: """Generate a simple placeholder thumbnail using Pillow.""" try: from PIL import Image, ImageDraw, ImageFont W, H = 512, 512 img = Image.new("RGB", (W, H), color=(245, 246, 248)) draw = ImageDraw.Draw(img) # Subtle grid for i in range(0, W, 32): draw.line([(i, 0), (i, H)], fill=(228, 230, 235), width=1) draw.line([(0, i), (W, i)], fill=(228, 230, 235), width=1) # Isometric box (front / top / right faces) cx, cy = 256, 260 s = 110 # half-size # Front face draw.polygon( [(cx - s, cy), (cx, cy + s // 2), (cx + s, cy), (cx, cy - s // 2)], fill=(195, 208, 220), outline=(90, 110, 130), width=2, ) # Top face draw.polygon( [(cx - s, cy - s), (cx, cy - s - s // 2), (cx + s, cy - s), (cx, cy - s + s // 2)], fill=(220, 230, 240), outline=(90, 110, 130), width=2, ) # Right pillar draw.polygon( [(cx + s, cy - s), (cx + s, cy), (cx, cy + s // 2), (cx, cy - s + s // 2)], fill=(160, 178, 196), outline=(90, 110, 130), width=2, ) # Schaeffler green top bar draw.rectangle([0, 0, W, 10], fill=(0, 137, 61)) # Model name strip at bottom name = step_path.stem draw.rectangle([0, H - 52, W, H], fill=(30, 50, 70)) try: font = ImageFont.load_default(size=15) draw.text((W // 2, H - 26), name, fill=(255, 255, 255), anchor="mm", font=font) except Exception: draw.text((10, H - 38), name, fill=(255, 255, 255)) if fmt == "jpg": img = img.convert("RGB") img.save(str(out_path), "JPEG", quality=92, optimize=True) else: img.save(str(out_path), "PNG") return out_path except Exception as exc: logger.warning(f"Pillow placeholder thumbnail failed: {exc}") return None def regenerate_cad_thumbnail(cad_file_id: str, part_colors: dict[str, str]) -> bool: """ Regenerate a thumbnail with per-part colours for an existing CAD file. Called from the `regenerate_thumbnail` Celery task. Returns True on success. """ from app.config import settings as app_settings from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.models.cad_file import CadFile, ProcessingStatus db_engine = create_engine(app_settings.database_url_sync) with Session(db_engine) as session: cad_file = session.get(CadFile, uuid.UUID(cad_file_id)) if not cad_file: logger.error(f"CAD file not found: {cad_file_id}") return False step_path = Path(cad_file.stored_path) if not step_path.exists(): logger.error(f"STEP file not found: {step_path}") return False # Mark as processing so the activity page shows it as active cad_file.processing_status = ProcessingStatus.processing session.commit() try: thumb_path, render_log = _generate_thumbnail( step_path, cad_file_id, app_settings.upload_dir, part_colors=part_colors ) if thumb_path: cad_file.thumbnail_path = str(thumb_path) cad_file.render_log = render_log cad_file.processing_status = ProcessingStatus.completed session.commit() logger.info(f"Thumbnail regenerated for CAD file {cad_file_id}") return True except Exception as exc: logger.error(f"Thumbnail regeneration failed for {cad_file_id}: {exc}") cad_file.processing_status = ProcessingStatus.failed cad_file.error_message = str(exc)[:2000] session.commit() return False def render_to_file( step_path: str, output_path: str, part_colors: dict[str, str] | None = None, width: int | None = None, height: int | None = None, transparent_bg: bool = False, engine: str | None = None, samples: int | None = None, template_path: str | None = None, target_collection: str = "Product", material_library_path: str | None = None, material_map: dict | None = None, part_names_ordered: list | None = None, lighting_only: bool = False, shadow_catcher: bool = False, cycles_device: str | None = None, rotation_x: float = 0.0, rotation_y: float = 0.0, rotation_z: float = 0.0, job_id: str | None = None, noise_threshold: str = "", denoiser: str = "", denoising_input_passes: str = "", denoising_prefilter: str = "", denoising_quality: str = "", denoising_use_gpu: str = "", order_line_id: str | None = None, ) -> tuple[bool, dict]: """Render a STEP file to a specific output path using current system settings. Unlike regenerate_cad_thumbnail, this does NOT modify the shared CadFile record. Used by render_order_line_task for per-order-line render outputs. Args: step_path: Absolute path to the STEP file on disk. output_path: Absolute path for the rendered output file. part_colors: Optional {part_name: hex_color} map. width: Optional render width (overrides system default). height: Optional render height (overrides system default). transparent_bg: If True and renderer=blender+PNG, render with transparent background. engine: Optional per-OT engine override ("cycles" | "eevee"), or None for system default. samples: Optional per-OT samples override, or None for system default. template_path: Optional path to a .blend template file. target_collection: Blender collection name to import geometry into. material_library_path: Optional path to material library .blend file. material_map: Optional {part_name: material_name} for material replacement. order_line_id: Optional order line ID for live log streaming. Returns: (success: bool, render_log: dict) """ import time step = Path(step_path) out = Path(output_path) out.parent.mkdir(parents=True, exist_ok=True) settings = _get_all_settings() renderer = settings["thumbnail_renderer"] fmt = out.suffix.lstrip(".") or settings.get("thumbnail_format", "jpg") if fmt not in ("jpg", "png"): fmt = "jpg" # Temporary PNG for service renderers tmp_png = out.parent / f"_tmp_{out.stem}.png" render_log: dict = { "renderer": renderer, "format": fmt, "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } rendered_png: Path | None = None service_data: dict = {} if renderer == "blender": actual_engine = engine or settings["blender_engine"] actual_samples = samples or int(settings[f"blender_{actual_engine}_samples"]) actual_cycles_device = cycles_device or settings["cycles_device"] w = width or 512 h = height or 512 render_log.update({ "engine": actual_engine, "samples": actual_samples, "stl_quality": settings["stl_quality"], "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": actual_cycles_device, "width": w, "height": h, }) extra = { "engine": actual_engine, "samples": actual_samples, "stl_quality": settings["stl_quality"], "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": actual_cycles_device, "width": w, "height": h, "transparent_bg": transparent_bg, } if part_colors is not None: extra["part_colors"] = part_colors if template_path: extra["template_path"] = template_path extra["target_collection"] = target_collection extra["lighting_only"] = lighting_only extra["shadow_catcher"] = shadow_catcher render_log["template"] = template_path render_log["target_collection"] = target_collection if lighting_only: render_log["lighting_only"] = True if shadow_catcher: render_log["shadow_catcher"] = True if material_library_path and material_map: extra["material_library_path"] = material_library_path extra["material_map"] = material_map render_log["material_replace"] = True if part_names_ordered: extra["part_names_ordered"] = part_names_ordered if rotation_x or rotation_y or rotation_z: extra["rotation_x"] = rotation_x extra["rotation_y"] = rotation_y extra["rotation_z"] = rotation_z if noise_threshold: extra["noise_threshold"] = noise_threshold if denoiser: extra["denoiser"] = denoiser if denoising_input_passes: extra["denoising_input_passes"] = denoising_input_passes if denoising_prefilter: extra["denoising_prefilter"] = denoising_prefilter if denoising_quality: extra["denoising_quality"] = denoising_quality if denoising_use_gpu: extra["denoising_use_gpu"] = denoising_use_gpu from app.services.render_blender import is_blender_available, render_still # Build live-log callback for streaming Blender output to Redis _log_cb = None if order_line_id: from app.services import render_log as _rl _log_cb = lambda line: _rl.emit(order_line_id, line) if is_blender_available(): try: service_data = render_still( step_path=step, output_path=tmp_png, engine=actual_engine, samples=actual_samples, stl_quality=settings["stl_quality"], smooth_angle=int(settings["blender_smooth_angle"]), cycles_device=actual_cycles_device, width=w, height=h, transparent_bg=transparent_bg, part_colors=part_colors, template_path=template_path, target_collection=target_collection, material_library_path=material_library_path, material_map=material_map, part_names_ordered=part_names_ordered, lighting_only=lighting_only, shadow_catcher=shadow_catcher, rotation_x=rotation_x, rotation_y=rotation_y, rotation_z=rotation_z, noise_threshold=noise_threshold, denoiser=denoiser, denoising_input_passes=denoising_input_passes, denoising_prefilter=denoising_prefilter, denoising_quality=denoising_quality, denoising_use_gpu=denoising_use_gpu, log_callback=_log_cb, ) rendered_png = tmp_png if tmp_png.exists() else None except Exception as exc: logger.warning("Blender subprocess render failed: %s", exc) rendered_png = None else: logger.warning("Blender not available in this container — using Pillow fallback") elif renderer == "threejs": # Three.js renderer removed in v2 — fall through to Pillow placeholder logger.warning("Three.js renderer removed; using Pillow fallback") if service_data: for key in ("total_duration_s", "stl_duration_s", "render_duration_s", "stl_size_bytes", "output_size_bytes", "parts_count", "engine_used", "log_lines"): if key in service_data: render_log[key] = service_data[key] render_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) if rendered_png: result = _finalise_image(rendered_png, out, fmt) tmp_png.unlink(missing_ok=True) render_log["fallback"] = False return result is not None, render_log # Pillow placeholder fallback render_log["fallback"] = True result = _generate_thumbnail_placeholder(step, out, fmt) return result is not None, render_log def _convert_to_gltf(step_path: Path, cad_file_id: str, upload_dir: str) -> Path | None: """Convert STEP to glTF for browser 3D viewer.""" out_dir = Path(upload_dir) / "gltf" out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / f"{cad_file_id}.gltf" try: import trimesh mesh = trimesh.load(str(step_path)) if isinstance(mesh, trimesh.Scene): exported = mesh.export(str(out_path)) else: scene = trimesh.Scene(mesh) exported = scene.export(str(out_path)) return out_path if out_path.exists() else None except ImportError: logger.warning("trimesh not available; skipping glTF conversion") except Exception as exc: logger.warning(f"glTF conversion failed: {exc}") return None