""" STEP file processor — Phase 3 implementation. Extracts object names from STEP files using pythonocc-core (OCC), generates thumbnails using trimesh + pyrender, and converts to glTF. This module is invoked from the Celery worker (step_tasks.py). """ import logging import uuid from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: from app.models.cad_file import CadFile logger = logging.getLogger(__name__) def build_part_colors( cad_parsed_objects: list[str], cad_part_materials: list[dict], ) -> dict[str, str]: """ Build {part_name: material_name} for Blender rendering. Returns a mapping of part name → Schaeffler material name (e.g. SCHAEFFLER_010101_Steel-Bare). Parts with no material assignment are omitted; Blender will use the fallback material (SCHAEFFLER_059999_FailedMaterial) for unrecognised parts. Args: cad_parsed_objects: List of part names from cad_file.parsed_objects["objects"]. cad_part_materials: List of {part_name, material} dicts from order_item.cad_part_materials. """ result = {} for m in cad_part_materials: part = m.get("part_name", "").strip() material = m.get("material", "").strip() if part and material: result[part] = material return result def _normalize_stem(name: str) -> str: """Normalize a filename stem for comparison: lowercase, strip .stp/.step extension.""" stem = name.strip() for ext in (".step", ".stp"): if stem.lower().endswith(ext): stem = stem[: -len(ext)] break return stem.lower() def match_cad_to_items( cad_file: "CadFile", item_names: list[str], ) -> list[str]: """ Match a CadFile to a list of OrderItem name_cad_modell values. Matching is case-insensitive and normalizes .stp/.step extensions so that a file named '81113-L_cut.stp' matches an item named '81113-l_cut' or '81113-L_cut.step'. Args: cad_file: A CadFile ORM object (needs .original_name). item_names: List of name_cad_modell strings from OrderItems. Returns: List of matched item names (subset of item_names). """ cad_stem = _normalize_stem(cad_file.original_name or "") matched = [] for name in item_names: if not name: continue if _normalize_stem(name) == cad_stem: matched.append(name) return matched def extract_cad_metadata(cad_file_id: str, tenant_id: str | None = None) -> None: """ Fast metadata extraction for a CAD file (no thumbnail generation). Does everything process_cad_file() does EXCEPT thumbnail rendering: - Sets status to processing - Extracts STEP object names - Converts to glTF - Leaves status as processing (render_step_thumbnail task will complete it) - On exception: sets status to failed """ from app.config import settings from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.models.cad_file import CadFile, ProcessingStatus from app.core.tenant_context import set_tenant_context_sync engine = create_engine(settings.database_url_sync) with Session(engine) as session: set_tenant_context_sync(session, tenant_id) cad_file = session.get(CadFile, uuid.UUID(cad_file_id)) if not cad_file: logger.error(f"CAD file not found: {cad_file_id}") return cad_file.processing_status = ProcessingStatus.processing session.commit() try: step_path = Path(cad_file.stored_path) if not step_path.exists(): raise FileNotFoundError(f"STEP file not found: {step_path}") # Try unified single-read first, fall back to separate reads metadata = extract_step_metadata(str(step_path)) if metadata.objects: objects = metadata.objects cad_file.parsed_objects = {"objects": objects} if metadata.edge_data: cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **metadata.edge_data} else: logger.info(f"[STEP] fallback: separate reads for {cad_file_id}") objects = _extract_step_objects(step_path) cad_file.parsed_objects = {"objects": objects} edge_data = extract_mesh_edge_data(str(step_path)) if edge_data: cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **edge_data} gltf_path = _convert_to_gltf(step_path, cad_file_id, settings.upload_dir) if gltf_path: cad_file.gltf_path = str(gltf_path) # Leave status as processing — render_step_thumbnail will complete it logger.info(f"CAD metadata extracted: {cad_file_id} ({len(objects)} objects)") except Exception as exc: logger.error(f"CAD metadata extraction failed for {cad_file_id}: {exc}") cad_file.processing_status = ProcessingStatus.failed cad_file.error_message = str(exc)[:2000] session.commit() def process_cad_file(cad_file_id: str) -> None: """ Full processing pipeline for a CAD file: 1. Load STEP file with pythonocc 2. Extract part/object names 3. Generate thumbnail PNG 4. Convert to glTF for browser viewer 5. Update DB record """ from app.config import settings # Synchronous DB access for Celery worker from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.models.cad_file import CadFile, ProcessingStatus engine = create_engine(settings.database_url_sync) with Session(engine) as session: cad_file = session.get(CadFile, uuid.UUID(cad_file_id)) if not cad_file: logger.error(f"CAD file not found: {cad_file_id}") return cad_file.processing_status = ProcessingStatus.processing session.commit() try: step_path = Path(cad_file.stored_path) if not step_path.exists(): raise FileNotFoundError(f"STEP file not found: {step_path}") # Step 1: Extract object names + edge data (unified single-read) metadata = extract_step_metadata(str(step_path)) if metadata.objects: objects = metadata.objects cad_file.parsed_objects = {"objects": objects} if metadata.edge_data: cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **metadata.edge_data} else: logger.info(f"[STEP] fallback: separate reads for {cad_file_id}") objects = _extract_step_objects(step_path) cad_file.parsed_objects = {"objects": objects} edge_data = extract_mesh_edge_data(str(step_path)) if edge_data: cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **edge_data} # Step 2: Generate thumbnail — pass empty part_colors so the Three.js # renderer extracts named parts and auto-assigns palette colours. # Other renderers (Blender, Pillow) ignore the part_colors argument. thumb_path, render_log = _generate_thumbnail(step_path, cad_file_id, settings.upload_dir, part_colors={}) if thumb_path: cad_file.thumbnail_path = str(thumb_path) cad_file.render_log = render_log # Step 3: Convert to glTF gltf_path = _convert_to_gltf(step_path, cad_file_id, settings.upload_dir) if gltf_path: cad_file.gltf_path = str(gltf_path) cad_file.processing_status = ProcessingStatus.completed logger.info(f"CAD file processed successfully: {cad_file_id}") except Exception as exc: logger.error(f"CAD processing failed for {cad_file_id}: {exc}") cad_file.processing_status = ProcessingStatus.failed cad_file.error_message = str(exc)[:2000] session.commit() def extract_mesh_edge_data(step_path: str) -> dict: """Extract sharp edge data and suggested smooth angle from STEP topology. Uses PCurve-based normal evaluation: for each shared edge, the 2D curve of the edge on each adjacent face (BRep_Tool.CurveOnSurface) is evaluated at its midpoint to get the exact UV coordinates on that face. BRepLProp_SLProps then computes the surface normal at that precise location — far more accurate than sampling at the face's UV center. Returns dict with: - suggested_smooth_angle: float (degrees) — recommended auto-smooth angle - has_mechanical_edges: bool — True if part has distinct hard edges - sharp_edge_pairs: list of [[x0,y0,z0],[x1,y1,z1]] — vertex pairs of sharp edges in mm (no artificial cap) """ try: # Try OCP first (cadquery's fork, available in render-worker). # Fall back to OCC.Core (standard pythonocc, if installed elsewhere). _using_ocp = False try: from OCP.STEPControl import STEPControl_Reader from OCP.IFSelect import IFSelect_RetDone from OCP.TopAbs import TopAbs_EDGE, TopAbs_FACE, TopAbs_FORWARD from OCP.BRepAdaptor import BRepAdaptor_Surface, BRepAdaptor_Curve, BRepAdaptor_Curve2d from OCP.BRepLProp import BRepLProp_SLProps from OCP.BRepMesh import BRepMesh_IncrementalMesh from OCP.TopTools import TopTools_IndexedDataMapOfShapeListOfShape from OCP.TopExp import TopExp as _TopExp from OCP.TopoDS import TopoDS as _TopoDS _using_ocp = True except ImportError: from OCC.Core.STEPControl import STEPControl_Reader from OCC.Core.IFSelect import IFSelect_RetDone from OCC.Core.TopAbs import TopAbs_EDGE, TopAbs_FACE, TopAbs_FORWARD from OCC.Core.BRepAdaptor import BRepAdaptor_Surface, BRepAdaptor_Curve, BRepAdaptor_Curve2d from OCC.Core.BRepLProp import BRepLProp_SLProps from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh from OCC.Core.TopTools import TopTools_IndexedDataMapOfShapeListOfShape from OCC.Core.TopExp import topexp as _TopExp from OCC.Core.TopoDS import TopoDS as _TopoDS import math # OCP uses _s suffix for static methods; OCC.Core uses module-level callables. def _map_shapes(shape, edge_type, face_type, out_map): if _using_ocp: _TopExp.MapShapesAndAncestors_s(shape, edge_type, face_type, out_map) else: _TopExp.MapShapesAndAncestors(shape, edge_type, face_type, out_map) def _to_edge(s): return _TopoDS.Edge_s(s) if _using_ocp else _TopoDS.Edge(s) def _to_face(s): return _TopoDS.Face_s(s) if _using_ocp else _TopoDS.Face(s) reader = STEPControl_Reader() status = reader.ReadFile(step_path) if status != IFSelect_RetDone: return {} reader.TransferRoots() shape = reader.OneShape() # Mesh at 0.5 mm deflection BRepMesh_IncrementalMesh(shape, 0.5, False, 0.5) # Build edge → adjacent faces map edge_face_map = TopTools_IndexedDataMapOfShapeListOfShape() _map_shapes(shape, TopAbs_EDGE, TopAbs_FACE, edge_face_map) dihedral_angles = [] sharp_pairs = [] SHARP_THRESHOLD_DEG = 20.0 for i in range(1, edge_face_map.Extent() + 1): edge_shape = edge_face_map.FindKey(i) faces = edge_face_map.FindFromIndex(i) if faces.Size() < 2: continue face_shapes = list(faces) if len(face_shapes) < 2: continue try: edge = _to_edge(edge_shape) face1 = _to_face(face_shapes[0]) face2 = _to_face(face_shapes[1]) # 3D edge endpoints in mm curve3d = BRepAdaptor_Curve(edge) pt_start = curve3d.Value(curve3d.FirstParameter()) pt_end = curve3d.Value(curve3d.LastParameter()) # PCurve-based normal evaluation: BRepAdaptor_Curve2d gives UV at the # edge's actual location on the face — far more accurate than UV center. c2d_1 = BRepAdaptor_Curve2d(edge, face1) uv1 = c2d_1.Value((c2d_1.FirstParameter() + c2d_1.LastParameter()) / 2) surf1 = BRepAdaptor_Surface(face1) props1 = BRepLProp_SLProps(surf1, uv1.X(), uv1.Y(), 1, 1e-6) if not props1.IsNormalDefined(): continue n1 = props1.Normal() if face1.Orientation() != TopAbs_FORWARD: n1.Reverse() c2d_2 = BRepAdaptor_Curve2d(edge, face2) uv2 = c2d_2.Value((c2d_2.FirstParameter() + c2d_2.LastParameter()) / 2) surf2 = BRepAdaptor_Surface(face2) props2 = BRepLProp_SLProps(surf2, uv2.X(), uv2.Y(), 1, 1e-6) if not props2.IsNormalDefined(): continue n2 = props2.Normal() if face2.Orientation() != TopAbs_FORWARD: n2.Reverse() cos_angle = max(-1.0, min(1.0, n1.Dot(n2))) angle_deg = math.degrees(math.acos(cos_angle)) # Use exterior angle (supplement when normals point same side) if angle_deg > 90: angle_deg = 180.0 - angle_deg dihedral_angles.append(angle_deg) if angle_deg > SHARP_THRESHOLD_DEG: sharp_pairs.append([ [round(pt_start.X(), 3), round(pt_start.Y(), 3), round(pt_start.Z(), 3)], [round(pt_end.X(), 3), round(pt_end.Y(), 3), round(pt_end.Z(), 3)], ]) except Exception: continue # Bounding box if _using_ocp: from OCP.Bnd import Bnd_Box from OCP.BRepBndLib import BRepBndLib as _brepbndlib_mod def _brepbndlib_add(shape, bbox): _brepbndlib_mod.Add_s(shape, bbox) else: from OCC.Core.Bnd import Bnd_Box from OCC.Core.BRepBndLib import brepbndlib as _brepbndlib_mod def _brepbndlib_add(shape, bbox): _brepbndlib_mod.Add(shape, bbox) try: bbox = Bnd_Box() _brepbndlib_add(shape, bbox) xmin, ymin, zmin, xmax, ymax, zmax = bbox.Get() dimensions_mm = { "x": round(xmax - xmin, 2), "y": round(ymax - ymin, 2), "z": round(zmax - zmin, 2), } bbox_center_mm = { "x": round((xmin + xmax) / 2, 2), "y": round((ymin + ymax) / 2, 2), "z": round((zmin + zmax) / 2, 2), } except Exception: dimensions_mm = None bbox_center_mm = None if not dihedral_angles: result: dict = {} if dimensions_mm: result["dimensions_mm"] = dimensions_mm result["bbox_center_mm"] = bbox_center_mm return result import statistics max_angle = max(dihedral_angles) hard_edges = [a for a in dihedral_angles if a > SHARP_THRESHOLD_DEG] if hard_edges: suggested = max(15.0, min(60.0, statistics.median(hard_edges) * 0.8)) else: suggested = 30.0 result = { "suggested_smooth_angle": round(suggested, 1), "has_mechanical_edges": max_angle > 45, "sharp_edge_pairs": sharp_pairs, } if dimensions_mm: result["dimensions_mm"] = dimensions_mm result["bbox_center_mm"] = bbox_center_mm return result except ImportError: # OCC not available (e.g. in backend container) return {} except Exception as exc: logger.warning("extract_mesh_edge_data failed (non-fatal): %s", exc) return {} @dataclass class StepMetadata: """Result of unified STEP file read — part names + edge data in one pass.""" objects: list[str] = field(default_factory=list) edge_data: dict = field(default_factory=dict) dimensions_mm: dict | None = None bbox_center_mm: dict | None = None def extract_step_metadata(step_path: str) -> StepMetadata: """Read a STEP file once via XCAF and extract both part names and edge topology. Replaces the two-pass pattern of _extract_step_objects() + extract_mesh_edge_data() with a single STEPCAFControl_Reader read. The XCAF reader gives us both the labeled hierarchy (part names) and the TopoDS_Shape (for tessellation and edge analysis). Falls back gracefully: returns StepMetadata with empty fields on ImportError. """ try: # Try OCC.Core first (pythonocc, available in worker container) _using_ocp = False try: from OCC.Core.STEPCAFControl import STEPCAFControl_Reader from OCC.Core.XCAFDoc import XCAFDoc_DocumentTool from OCC.Core.TDocStd import TDocStd_Document from OCC.Core.TDataStd import TDataStd_Name from OCC.Core.TCollection import TCollection_ExtendedString from OCC.Core.TopAbs import TopAbs_EDGE, TopAbs_FACE, TopAbs_FORWARD from OCC.Core.BRepAdaptor import BRepAdaptor_Surface, BRepAdaptor_Curve, BRepAdaptor_Curve2d from OCC.Core.BRepLProp import BRepLProp_SLProps from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh from OCC.Core.TopTools import TopTools_IndexedDataMapOfShapeListOfShape from OCC.Core.TopExp import topexp as _TopExp from OCC.Core.TopoDS import TopoDS as _TopoDS from OCC.Core.Bnd import Bnd_Box from OCC.Core.BRepBndLib import brepbndlib as _brepbndlib_mod def _map_shapes(shape, edge_type, face_type, out_map): _TopExp.MapShapesAndAncestors(shape, edge_type, face_type, out_map) def _to_edge(s): return _TopoDS.Edge(s) def _to_face(s): return _TopoDS.Face(s) def _brepbndlib_add(shape, bbox): _brepbndlib_mod.Add(shape, bbox) except ImportError: # Fall back to OCP (cadquery's fork) from OCP.STEPCAFControl import STEPCAFControl_Reader # type: ignore[no-redef] from OCP.XCAFDoc import XCAFDoc_DocumentTool # type: ignore[no-redef] from OCP.TDocStd import TDocStd_Document # type: ignore[no-redef] from OCP.TDataStd import TDataStd_Name # type: ignore[no-redef] from OCP.TCollection import TCollection_ExtendedString # type: ignore[no-redef] from OCP.TopAbs import TopAbs_EDGE, TopAbs_FACE, TopAbs_FORWARD # type: ignore[no-redef] from OCP.BRepAdaptor import BRepAdaptor_Surface, BRepAdaptor_Curve, BRepAdaptor_Curve2d # type: ignore[no-redef] from OCP.BRepLProp import BRepLProp_SLProps # type: ignore[no-redef] from OCP.BRepMesh import BRepMesh_IncrementalMesh # type: ignore[no-redef] from OCP.TopTools import TopTools_IndexedDataMapOfShapeListOfShape # type: ignore[no-redef] from OCP.TopExp import TopExp as _TopExp # type: ignore[no-redef] from OCP.TopoDS import TopoDS as _TopoDS # type: ignore[no-redef] from OCP.Bnd import Bnd_Box # type: ignore[no-redef] from OCP.BRepBndLib import BRepBndLib as _brepbndlib_mod # type: ignore[no-redef] _using_ocp = True def _map_shapes(shape, edge_type, face_type, out_map): _TopExp.MapShapesAndAncestors_s(shape, edge_type, face_type, out_map) def _to_edge(s): return _TopoDS.Edge_s(s) def _to_face(s): return _TopoDS.Face_s(s) def _brepbndlib_add(shape, bbox): _brepbndlib_mod.Add_s(shape, bbox) import math # ── Step 1: Read STEP via XCAF (single read) ────────────────────── doc = TDocStd_Document(TCollection_ExtendedString("MDTV-CAF")) reader = STEPCAFControl_Reader() reader.SetColorMode(True) reader.SetNameMode(True) status = reader.ReadFile(str(step_path)) if not reader.Transfer(doc): logger.warning("extract_step_metadata: XCAF transfer failed for %s", step_path) return StepMetadata() shape_tool = XCAFDoc_DocumentTool.ShapeTool(doc.Main()) if not _using_ocp \ else XCAFDoc_DocumentTool.ShapeTool_s(doc.Main()) labels = [] shape_tool.GetFreeShapes(labels) # ── Step 2: Extract part names from XCAF labels ─────────────────── names: list[str] = [] for label in labels: name_attr = TDataStd_Name() find_id = TDataStd_Name.GetID() if not _using_ocp else TDataStd_Name.GetID_s() if label.FindAttribute(find_id, name_attr): names.append(name_attr.Get().ToExtString()) # ── Step 3: Get root shape and tessellate ───────────────────────── # Collect all free shapes — usually just one root compound root_shapes = [] for label in labels: s = shape_tool.GetShape(label) if not _using_ocp else shape_tool.GetShape_s(label) if not s.IsNull(): root_shapes.append(s) if not root_shapes: return StepMetadata(objects=names) # Tessellate and extract edges from each root shape SHARP_THRESHOLD_DEG = 20.0 dihedral_angles: list[float] = [] sharp_pairs: list = [] all_shapes_for_bbox = [] for shape in root_shapes: BRepMesh_IncrementalMesh(shape, 0.5, False, 0.5) all_shapes_for_bbox.append(shape) # Build edge → adjacent faces map edge_face_map = TopTools_IndexedDataMapOfShapeListOfShape() _map_shapes(shape, TopAbs_EDGE, TopAbs_FACE, edge_face_map) for i in range(1, edge_face_map.Extent() + 1): edge_shape = edge_face_map.FindKey(i) faces = edge_face_map.FindFromIndex(i) if faces.Size() < 2: continue face_shapes = list(faces) if len(face_shapes) < 2: continue try: edge = _to_edge(edge_shape) face1 = _to_face(face_shapes[0]) face2 = _to_face(face_shapes[1]) curve3d = BRepAdaptor_Curve(edge) pt_start = curve3d.Value(curve3d.FirstParameter()) pt_end = curve3d.Value(curve3d.LastParameter()) c2d_1 = BRepAdaptor_Curve2d(edge, face1) uv1 = c2d_1.Value((c2d_1.FirstParameter() + c2d_1.LastParameter()) / 2) surf1 = BRepAdaptor_Surface(face1) props1 = BRepLProp_SLProps(surf1, uv1.X(), uv1.Y(), 1, 1e-6) if not props1.IsNormalDefined(): continue n1 = props1.Normal() if face1.Orientation() != TopAbs_FORWARD: n1.Reverse() c2d_2 = BRepAdaptor_Curve2d(edge, face2) uv2 = c2d_2.Value((c2d_2.FirstParameter() + c2d_2.LastParameter()) / 2) surf2 = BRepAdaptor_Surface(face2) props2 = BRepLProp_SLProps(surf2, uv2.X(), uv2.Y(), 1, 1e-6) if not props2.IsNormalDefined(): continue n2 = props2.Normal() if face2.Orientation() != TopAbs_FORWARD: n2.Reverse() cos_angle = max(-1.0, min(1.0, n1.Dot(n2))) angle_deg = math.degrees(math.acos(cos_angle)) if angle_deg > 90: angle_deg = 180.0 - angle_deg dihedral_angles.append(angle_deg) if angle_deg > SHARP_THRESHOLD_DEG: sharp_pairs.append([ [round(pt_start.X(), 3), round(pt_start.Y(), 3), round(pt_start.Z(), 3)], [round(pt_end.X(), 3), round(pt_end.Y(), 3), round(pt_end.Z(), 3)], ]) except Exception: continue # ── Step 4: Bounding box ────────────────────────────────────────── dimensions_mm = None bbox_center_mm = None try: bbox = Bnd_Box() for shape in all_shapes_for_bbox: _brepbndlib_add(shape, bbox) xmin, ymin, zmin, xmax, ymax, zmax = bbox.Get() dimensions_mm = { "x": round(xmax - xmin, 2), "y": round(ymax - ymin, 2), "z": round(zmax - zmin, 2), } bbox_center_mm = { "x": round((xmin + xmax) / 2, 2), "y": round((ymin + ymax) / 2, 2), "z": round((zmin + zmax) / 2, 2), } except Exception: pass # ── Step 5: Build edge_data dict ────────────────────────────────── edge_data: dict = {} if dimensions_mm: edge_data["dimensions_mm"] = dimensions_mm edge_data["bbox_center_mm"] = bbox_center_mm if dihedral_angles: import statistics max_angle = max(dihedral_angles) hard_edges = [a for a in dihedral_angles if a > SHARP_THRESHOLD_DEG] if hard_edges: suggested = max(15.0, min(60.0, statistics.median(hard_edges) * 0.8)) else: suggested = 30.0 edge_data["suggested_smooth_angle"] = round(suggested, 1) edge_data["has_mechanical_edges"] = max_angle > 45 edge_data["sharp_edge_pairs"] = sharp_pairs logger.info(f"[STEP] unified read: {len(names)} objects, {len(sharp_pairs)} sharp pairs") return StepMetadata( objects=names, edge_data=edge_data, dimensions_mm=dimensions_mm, bbox_center_mm=bbox_center_mm, ) except ImportError: logger.warning("OCC not available for extract_step_metadata") return StepMetadata() except Exception as exc: logger.warning("extract_step_metadata failed: %s", exc) return StepMetadata() def _extract_step_objects(step_path: Path) -> list[str]: """Extract part names from STEP file using pythonocc.""" try: from OCC.Core.STEPCAFControl import STEPCAFControl_Reader from OCC.Core.XCAFDoc import XCAFDoc_DocumentTool from OCC.Core.TDocStd import TDocStd_Document from OCC.Core.TDataStd import TDataStd_Name from OCC.Core.TCollection import TCollection_ExtendedString doc = TDocStd_Document(TCollection_ExtendedString("MDTV-CAF")) reader = STEPCAFControl_Reader() reader.SetColorMode(True) reader.SetNameMode(True) status = reader.ReadFile(str(step_path)) if not reader.Transfer(doc): return [] shape_tool = XCAFDoc_DocumentTool.ShapeTool(doc.Main()) labels = [] shape_tool.GetFreeShapes(labels) names = [] for label in labels: name_attr = TDataStd_Name() if label.FindAttribute(TDataStd_Name.GetID(), name_attr): names.append(name_attr.Get().ToExtString()) return names except ImportError: logger.warning("pythonocc-core not available; skipping object extraction") return _extract_step_objects_fallback(step_path) except Exception as exc: logger.warning(f"OCC extraction failed: {exc}") return _extract_step_objects_fallback(step_path) def _extract_step_objects_fallback(step_path: Path) -> list[str]: """Simple text-based extraction of part names from STEP file.""" names = [] try: with open(step_path, "r", encoding="utf-8", errors="replace") as f: for line in f: # STEP format: PRODUCT('name','description',... if "PRODUCT(" in line: parts = line.split("PRODUCT(") for part in parts[1:]: if "'" in part: name = part.split("'")[1] if name and name not in names: names.append(name) except Exception: pass return names def _get_all_settings() -> dict[str, str]: """Read all system settings from the database.""" defaults = { "thumbnail_renderer": "pillow", "blender_engine": "cycles", "blender_cycles_samples": "256", "blender_eevee_samples": "64", "thumbnail_format": "jpg", "blender_smooth_angle": "30", "cycles_device": "auto", "tessellation_engine": "occ", } try: from app.config import settings as app_settings from sqlalchemy import create_engine, text from sqlalchemy.orm import Session engine = create_engine(app_settings.database_url_sync) with Session(engine) as session: result = session.execute(text("SELECT key, value FROM system_settings")) stored = {row[0]: row[1] for row in result.fetchall()} return {k: stored.get(k, v) for k, v in defaults.items()} except Exception as exc: logger.warning(f"Could not read settings: {exc}; using defaults") return defaults def _generate_thumbnail( step_path: Path, cad_file_id: str, upload_dir: str, part_colors: dict[str, str] | None = None, ) -> tuple[Path | None, dict]: """Generate thumbnail using the configured renderer. Returns (thumb_path, render_log_dict). render_log_dict contains all settings + timing + blender output. """ import time out_dir = Path(upload_dir) / "thumbnails" out_dir.mkdir(parents=True, exist_ok=True) settings = _get_all_settings() renderer = settings["thumbnail_renderer"] fmt = settings["thumbnail_format"] # "jpg" or "png" ext = "jpg" if fmt == "jpg" else "png" # Clean up any existing thumbnail for this cad_file_id (either extension) for old_ext in ("png", "jpg"): old = out_dir / f"{cad_file_id}.{old_ext}" if old.exists(): old.unlink(missing_ok=True) final_path = out_dir / f"{cad_file_id}.{ext}" # Intermediate PNG used when a service renderer produces PNG before conversion tmp_png = out_dir / f"{cad_file_id}_tmp.png" # Build the base render_log with the settings snapshot render_log: dict = { "renderer": renderer, "format": fmt, "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } if renderer == "blender": engine = settings["blender_engine"] render_log.update({ "engine": engine, "samples": int(settings[f"blender_{engine}_samples"]), "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": settings["cycles_device"], "width": 512, "height": 512, }) logger.info(f"Thumbnail renderer={renderer}, format={fmt}") rendered_png: Path | None = None service_data: dict = {} if renderer == "blender": engine = settings["blender_engine"] samples = int(settings[f"blender_{engine}_samples"]) from app.services.render_blender import is_blender_available, render_still if is_blender_available(): try: service_data = render_still( step_path=step_path, output_path=tmp_png, engine=engine, samples=samples, smooth_angle=int(settings["blender_smooth_angle"]), cycles_device=settings["cycles_device"], tessellation_engine=settings["tessellation_engine"], ) rendered_png = tmp_png if tmp_png.exists() else None except Exception as exc: logger.warning("Blender subprocess render failed: %s", exc) rendered_png = None else: logger.warning("Blender not available in this container") # Merge rich service response data into render_log if service_data: for key in ("total_duration_s", "stl_duration_s", "render_duration_s", "stl_size_bytes", "output_size_bytes", "parts_count", "engine_used", "log_lines"): if key in service_data: render_log[key] = service_data[key] render_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) if rendered_png: result = _finalise_image(rendered_png, final_path) tmp_png.unlink(missing_ok=True) return result, render_log return None, render_log def _finalise_image(src: Path, dst: Path) -> Path | None: """Move src image to dst, always as PNG.""" out = dst.with_suffix(".png") src.rename(out) return out def regenerate_cad_thumbnail(cad_file_id: str, part_colors: dict[str, str]) -> bool: """ Regenerate a thumbnail with per-part colours for an existing CAD file. Called from the `regenerate_thumbnail` Celery task. Returns True on success. """ from app.config import settings as app_settings from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.models.cad_file import CadFile, ProcessingStatus db_engine = create_engine(app_settings.database_url_sync) with Session(db_engine) as session: cad_file = session.get(CadFile, uuid.UUID(cad_file_id)) if not cad_file: logger.error(f"CAD file not found: {cad_file_id}") return False step_path = Path(cad_file.stored_path) if not step_path.exists(): logger.error(f"STEP file not found: {step_path}") return False # Mark as processing so the activity page shows it as active cad_file.processing_status = ProcessingStatus.processing session.commit() try: thumb_path, render_log = _generate_thumbnail( step_path, cad_file_id, app_settings.upload_dir, part_colors=part_colors ) if thumb_path: cad_file.thumbnail_path = str(thumb_path) cad_file.render_log = render_log cad_file.processing_status = ProcessingStatus.completed session.commit() logger.info(f"Thumbnail regenerated for CAD file {cad_file_id}") return True except Exception as exc: logger.error(f"Thumbnail regeneration failed for {cad_file_id}: {exc}") cad_file.processing_status = ProcessingStatus.failed cad_file.error_message = str(exc)[:2000] session.commit() return False def render_to_file( step_path: str, output_path: str, part_colors: dict[str, str] | None = None, width: int | None = None, height: int | None = None, transparent_bg: bool = False, engine: str | None = None, samples: int | None = None, template_path: str | None = None, target_collection: str = "Product", material_library_path: str | None = None, material_map: dict | None = None, part_names_ordered: list | None = None, lighting_only: bool = False, shadow_catcher: bool = False, cycles_device: str | None = None, rotation_x: float = 0.0, rotation_y: float = 0.0, rotation_z: float = 0.0, job_id: str | None = None, noise_threshold: str = "", denoiser: str = "", denoising_input_passes: str = "", denoising_prefilter: str = "", denoising_quality: str = "", denoising_use_gpu: str = "", order_line_id: str | None = None, usd_path: "Path | None" = None, tessellation_engine: str | None = None, focal_length_mm: float | None = None, sensor_width_mm: float | None = None, ) -> tuple[bool, dict]: """Render a STEP file to a specific output path using current system settings. Unlike regenerate_cad_thumbnail, this does NOT modify the shared CadFile record. Used by render_order_line_task for per-order-line render outputs. Args: step_path: Absolute path to the STEP file on disk. output_path: Absolute path for the rendered output file. part_colors: Optional {part_name: hex_color} map. width: Optional render width (overrides system default). height: Optional render height (overrides system default). transparent_bg: If True and renderer=blender+PNG, render with transparent background. engine: Optional per-OT engine override ("cycles" | "eevee"), or None for system default. samples: Optional per-OT samples override, or None for system default. template_path: Optional path to a .blend template file. target_collection: Blender collection name to import geometry into. material_library_path: Optional path to material library .blend file. material_map: Optional {part_name: material_name} for material replacement. order_line_id: Optional order line ID for live log streaming. Returns: (success: bool, render_log: dict) """ import time step = Path(step_path) out = Path(output_path) out.parent.mkdir(parents=True, exist_ok=True) settings = _get_all_settings() renderer = settings["thumbnail_renderer"] fmt = out.suffix.lstrip(".") or settings.get("thumbnail_format", "jpg") if fmt not in ("jpg", "png"): fmt = "jpg" # Temporary PNG for service renderers tmp_png = out.parent / f"_tmp_{out.stem}.png" render_log: dict = { "renderer": renderer, "format": fmt, "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } rendered_png: Path | None = None service_data: dict = {} if renderer == "blender": actual_engine = engine or settings["blender_engine"] actual_samples = samples or int(settings[f"blender_{actual_engine}_samples"]) actual_cycles_device = cycles_device or settings["cycles_device"] w = width or 512 h = height or 512 render_log.update({ "engine": actual_engine, "samples": actual_samples, "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": actual_cycles_device, "width": w, "height": h, }) extra = { "engine": actual_engine, "samples": actual_samples, "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": actual_cycles_device, "width": w, "height": h, "transparent_bg": transparent_bg, } if part_colors is not None: extra["part_colors"] = part_colors if template_path: extra["template_path"] = template_path extra["target_collection"] = target_collection extra["lighting_only"] = lighting_only extra["shadow_catcher"] = shadow_catcher render_log["template"] = template_path render_log["target_collection"] = target_collection if lighting_only: render_log["lighting_only"] = True if shadow_catcher: render_log["shadow_catcher"] = True if material_library_path and material_map: extra["material_library_path"] = material_library_path extra["material_map"] = material_map render_log["material_replace"] = True if part_names_ordered: extra["part_names_ordered"] = part_names_ordered if rotation_x or rotation_y or rotation_z: extra["rotation_x"] = rotation_x extra["rotation_y"] = rotation_y extra["rotation_z"] = rotation_z if noise_threshold: extra["noise_threshold"] = noise_threshold if denoiser: extra["denoiser"] = denoiser if denoising_input_passes: extra["denoising_input_passes"] = denoising_input_passes if denoising_prefilter: extra["denoising_prefilter"] = denoising_prefilter if denoising_quality: extra["denoising_quality"] = denoising_quality if denoising_use_gpu: extra["denoising_use_gpu"] = denoising_use_gpu from app.services.render_blender import is_blender_available, render_still # Build live-log callback for streaming Blender output to Redis _log_cb = None if order_line_id: from app.services import render_log as _rl _log_cb = lambda line: _rl.emit(order_line_id, line) if is_blender_available(): try: service_data = render_still( step_path=step, output_path=tmp_png, engine=actual_engine, samples=actual_samples, smooth_angle=int(settings["blender_smooth_angle"]), cycles_device=actual_cycles_device, width=w, height=h, transparent_bg=transparent_bg, part_colors=part_colors, template_path=template_path, target_collection=target_collection, material_library_path=material_library_path, material_map=material_map, part_names_ordered=part_names_ordered, lighting_only=lighting_only, shadow_catcher=shadow_catcher, rotation_x=rotation_x, rotation_y=rotation_y, rotation_z=rotation_z, noise_threshold=noise_threshold, denoiser=denoiser, denoising_input_passes=denoising_input_passes, denoising_prefilter=denoising_prefilter, denoising_quality=denoising_quality, denoising_use_gpu=denoising_use_gpu, log_callback=_log_cb, usd_path=usd_path, tessellation_engine=tessellation_engine or settings["tessellation_engine"], focal_length_mm=focal_length_mm, sensor_width_mm=sensor_width_mm, ) rendered_png = tmp_png if tmp_png.exists() else None except Exception as exc: logger.warning("Blender subprocess render failed: %s", exc) rendered_png = None else: logger.warning("Blender not available in this container — render skipped") if service_data: for key in ("total_duration_s", "stl_duration_s", "render_duration_s", "stl_size_bytes", "output_size_bytes", "parts_count", "engine_used", "log_lines"): if key in service_data: render_log[key] = service_data[key] render_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) if rendered_png: result = _finalise_image(rendered_png, out) tmp_png.unlink(missing_ok=True) return result is not None, render_log return False, render_log def _convert_to_gltf(step_path: Path, cad_file_id: str, upload_dir: str) -> Path | None: """Convert STEP to glTF for browser 3D viewer.""" out_dir = Path(upload_dir) / "gltf" out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / f"{cad_file_id}.gltf" try: import trimesh mesh = trimesh.load(str(step_path)) if isinstance(mesh, trimesh.Scene): exported = mesh.export(str(out_path)) else: scene = trimesh.Scene(mesh) exported = scene.export(str(out_path)) return out_path if out_path.exists() else None except ImportError: logger.warning("trimesh not available; skipping glTF conversion") except Exception as exc: logger.warning(f"glTF conversion failed: {exc}") return None