""" STEP file processor — Phase 3 implementation. Extracts object names from STEP files using pythonocc-core (OCC), generates thumbnails using trimesh + pyrender, and converts to glTF. This module is invoked from the Celery worker (step_tasks.py). """ import logging import uuid from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: from app.models.cad_file import CadFile logger = logging.getLogger(__name__) MATERIAL_PALETTE = [ "#4C9BE8", "#E85B4C", "#4CBE72", "#E8A84C", "#A04CE8", "#4CD4E8", "#E84CA8", "#7EC850", "#E86B30", "#5088C8", ] def _material_to_color(material_name: str | None, index: int) -> str: """Return a deterministic hex color: hash material name, or use palette by index.""" if material_name and material_name.strip(): i = abs(hash(material_name.strip().lower())) % len(MATERIAL_PALETTE) return MATERIAL_PALETTE[i] return MATERIAL_PALETTE[index % len(MATERIAL_PALETTE)] def build_part_colors( cad_parsed_objects: list[str], cad_part_materials: list[dict], ) -> dict[str, str]: """ Build {part_name: hex_color} for thumbnail rendering. Args: cad_parsed_objects: List of part names from cad_file.parsed_objects["objects"]. cad_part_materials: List of {part_name, material} dicts from order_item.cad_part_materials. """ mat_map = { m["part_name"].lower(): m.get("material") for m in cad_part_materials if m.get("part_name") } return { name: _material_to_color(mat_map.get(name.lower()), i) for i, name in enumerate(cad_parsed_objects) } def _normalize_stem(name: str) -> str: """Normalize a filename stem for comparison: lowercase, strip .stp/.step extension.""" stem = name.strip() for ext in (".step", ".stp"): if stem.lower().endswith(ext): stem = stem[: -len(ext)] break return stem.lower() def match_cad_to_items( cad_file: "CadFile", item_names: list[str], ) -> list[str]: """ Match a CadFile to a list of OrderItem name_cad_modell values. Matching is case-insensitive and normalizes .stp/.step extensions so that a file named '81113-L_cut.stp' matches an item named '81113-l_cut' or '81113-L_cut.step'. Args: cad_file: A CadFile ORM object (needs .original_name). item_names: List of name_cad_modell strings from OrderItems. Returns: List of matched item names (subset of item_names). """ cad_stem = _normalize_stem(cad_file.original_name or "") matched = [] for name in item_names: if not name: continue if _normalize_stem(name) == cad_stem: matched.append(name) return matched def extract_cad_metadata(cad_file_id: str) -> None: """ Fast metadata extraction for a CAD file (no thumbnail generation). Does everything process_cad_file() does EXCEPT thumbnail rendering: - Sets status to processing - Extracts STEP object names - Converts to glTF - Leaves status as processing (render_step_thumbnail task will complete it) - On exception: sets status to failed """ from app.config import settings from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.models.cad_file import CadFile, ProcessingStatus engine = create_engine(settings.database_url_sync) with Session(engine) as session: cad_file = session.get(CadFile, uuid.UUID(cad_file_id)) if not cad_file: logger.error(f"CAD file not found: {cad_file_id}") return cad_file.processing_status = ProcessingStatus.processing session.commit() try: step_path = Path(cad_file.stored_path) if not step_path.exists(): raise FileNotFoundError(f"STEP file not found: {step_path}") objects = _extract_step_objects(step_path) cad_file.parsed_objects = {"objects": objects} gltf_path = _convert_to_gltf(step_path, cad_file_id, settings.upload_dir) if gltf_path: cad_file.gltf_path = str(gltf_path) # Leave status as processing — render_step_thumbnail will complete it logger.info(f"CAD metadata extracted: {cad_file_id} ({len(objects)} objects)") except Exception as exc: logger.error(f"CAD metadata extraction failed for {cad_file_id}: {exc}") cad_file.processing_status = ProcessingStatus.failed cad_file.error_message = str(exc)[:2000] session.commit() def process_cad_file(cad_file_id: str) -> None: """ Full processing pipeline for a CAD file: 1. Load STEP file with pythonocc 2. Extract part/object names 3. Generate thumbnail PNG 4. Convert to glTF for browser viewer 5. Update DB record """ from app.config import settings # Synchronous DB access for Celery worker from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.models.cad_file import CadFile, ProcessingStatus engine = create_engine(settings.database_url_sync) with Session(engine) as session: cad_file = session.get(CadFile, uuid.UUID(cad_file_id)) if not cad_file: logger.error(f"CAD file not found: {cad_file_id}") return cad_file.processing_status = ProcessingStatus.processing session.commit() try: step_path = Path(cad_file.stored_path) if not step_path.exists(): raise FileNotFoundError(f"STEP file not found: {step_path}") # Step 1: Extract object names objects = _extract_step_objects(step_path) cad_file.parsed_objects = {"objects": objects} # Step 2: Generate thumbnail — pass empty part_colors so the Three.js # renderer extracts named parts and auto-assigns palette colours. # Other renderers (Blender, Pillow) ignore the part_colors argument. thumb_path, render_log = _generate_thumbnail(step_path, cad_file_id, settings.upload_dir, part_colors={}) if thumb_path: cad_file.thumbnail_path = str(thumb_path) cad_file.render_log = render_log # Step 3: Convert to glTF gltf_path = _convert_to_gltf(step_path, cad_file_id, settings.upload_dir) if gltf_path: cad_file.gltf_path = str(gltf_path) cad_file.processing_status = ProcessingStatus.completed logger.info(f"CAD file processed successfully: {cad_file_id}") except Exception as exc: logger.error(f"CAD processing failed for {cad_file_id}: {exc}") cad_file.processing_status = ProcessingStatus.failed cad_file.error_message = str(exc)[:2000] session.commit() def _extract_step_objects(step_path: Path) -> list[str]: """Extract part names from STEP file using pythonocc.""" try: from OCC.Core.STEPCAFControl import STEPCAFControl_Reader from OCC.Core.XCAFDoc import XCAFDoc_DocumentTool from OCC.Core.TDocStd import TDocStd_Document from OCC.Core.TDataStd import TDataStd_Name from OCC.Core.TCollection import TCollection_ExtendedString doc = TDocStd_Document(TCollection_ExtendedString("MDTV-CAF")) reader = STEPCAFControl_Reader() reader.SetColorMode(True) reader.SetNameMode(True) status = reader.ReadFile(str(step_path)) if not reader.Transfer(doc): return [] shape_tool = XCAFDoc_DocumentTool.ShapeTool(doc.Main()) labels = [] shape_tool.GetFreeShapes(labels) names = [] for label in labels: name_attr = TDataStd_Name() if label.FindAttribute(TDataStd_Name.GetID(), name_attr): names.append(name_attr.Get().ToExtString()) return names except ImportError: logger.warning("pythonocc-core not available; skipping object extraction") return _extract_step_objects_fallback(step_path) except Exception as exc: logger.warning(f"OCC extraction failed: {exc}") return _extract_step_objects_fallback(step_path) def _extract_step_objects_fallback(step_path: Path) -> list[str]: """Simple text-based extraction of part names from STEP file.""" names = [] try: with open(step_path, "r", encoding="utf-8", errors="replace") as f: for line in f: # STEP format: PRODUCT('name','description',... if "PRODUCT(" in line: parts = line.split("PRODUCT(") for part in parts[1:]: if "'" in part: name = part.split("'")[1] if name and name not in names: names.append(name) except Exception: pass return names def _get_all_settings() -> dict[str, str]: """Read all system settings from the database.""" defaults = { "thumbnail_renderer": "pillow", "blender_engine": "cycles", "blender_cycles_samples": "256", "blender_eevee_samples": "64", "threejs_render_size": "1024", "thumbnail_format": "jpg", "stl_quality": "low", "blender_smooth_angle": "30", "cycles_device": "auto", } try: from app.config import settings as app_settings from sqlalchemy import create_engine, text from sqlalchemy.orm import Session engine = create_engine(app_settings.database_url_sync) with Session(engine) as session: result = session.execute(text("SELECT key, value FROM system_settings")) stored = {row[0]: row[1] for row in result.fetchall()} return {k: stored.get(k, v) for k, v in defaults.items()} except Exception as exc: logger.warning(f"Could not read settings: {exc}; using defaults") return defaults def _generate_thumbnail( step_path: Path, cad_file_id: str, upload_dir: str, part_colors: dict[str, str] | None = None, ) -> tuple[Path | None, dict]: """Generate thumbnail using the configured renderer. Returns (thumb_path, render_log_dict). render_log_dict contains all settings + timing + blender output. """ import time out_dir = Path(upload_dir) / "thumbnails" out_dir.mkdir(parents=True, exist_ok=True) settings = _get_all_settings() renderer = settings["thumbnail_renderer"] fmt = settings["thumbnail_format"] # "jpg" or "png" ext = "jpg" if fmt == "jpg" else "png" # Clean up any existing thumbnail for this cad_file_id (either extension) for old_ext in ("png", "jpg"): old = out_dir / f"{cad_file_id}.{old_ext}" if old.exists(): old.unlink(missing_ok=True) final_path = out_dir / f"{cad_file_id}.{ext}" # Intermediate PNG used when a service renderer produces PNG before conversion tmp_png = out_dir / f"{cad_file_id}_tmp.png" # Build the base render_log with the settings snapshot render_log: dict = { "renderer": renderer, "format": fmt, "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } if renderer == "blender": engine = settings["blender_engine"] render_log.update({ "engine": engine, "samples": int(settings[f"blender_{engine}_samples"]), "stl_quality": settings["stl_quality"], "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": settings["cycles_device"], "width": 512, "height": 512, }) elif renderer == "threejs": size = int(settings["threejs_render_size"]) render_log.update({"width": size, "height": size}) logger.info(f"Thumbnail renderer={renderer}, format={fmt}") rendered_png: Path | None = None service_data: dict = {} if renderer == "blender": engine = settings["blender_engine"] samples = int(settings[f"blender_{engine}_samples"]) extra = { "engine": engine, "samples": samples, "stl_quality": settings["stl_quality"], "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": settings["cycles_device"], } rendered_png, service_data = _render_via_service( "http://blender-renderer:8100/render", step_path, tmp_png, extra ) if not rendered_png: logger.warning("Blender renderer failed; falling back to Pillow placeholder") elif renderer == "threejs": size = int(settings["threejs_render_size"]) extra2: dict = {"width": size, "height": size} if part_colors is not None: extra2["part_colors"] = part_colors rendered_png, service_data = _render_via_service( "http://threejs-renderer:8101/render", step_path, tmp_png, extra2 ) if not rendered_png: logger.warning("Three.js renderer failed; falling back to Pillow placeholder") # Merge rich service response data into render_log if service_data: for key in ("total_duration_s", "stl_duration_s", "render_duration_s", "stl_size_bytes", "output_size_bytes", "parts_count", "engine_used", "log_lines"): if key in service_data: render_log[key] = service_data[key] render_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) if rendered_png: result = _finalise_image(rendered_png, final_path, fmt) tmp_png.unlink(missing_ok=True) render_log["fallback"] = False return result, render_log # Pillow placeholder render_log["fallback"] = True return _generate_thumbnail_placeholder(step_path, final_path, fmt), render_log def _finalise_image(src: Path, dst: Path, fmt: str) -> Path | None: """Convert src image to dst using the requested format (jpg or png).""" if fmt == "jpg": try: from PIL import Image img = Image.open(src).convert("RGB") img.save(str(dst), "JPEG", quality=92, optimize=True) return dst except Exception as exc: logger.warning(f"JPG conversion failed: {exc}; keeping PNG") src.rename(dst.with_suffix(".png")) return dst.with_suffix(".png") else: src.rename(dst) return dst def _render_via_service( url: str, step_path: Path, out_path: Path, extra: dict | None = None, job_id: str | None = None, ) -> tuple[Path | None, dict]: """Call an external renderer microservice to generate a thumbnail. Returns (path_or_None, response_data_dict). job_id, when provided, is forwarded to the renderer so the render process can be cancelled via the renderer's /cancel/{job_id} endpoint. """ try: import httpx payload = { "step_path": str(step_path), "output_path": str(out_path), "width": 512, "height": 512, **(extra or {}), } if job_id: payload["job_id"] = job_id resp = httpx.post(url, json=payload, timeout=300.0) data = {} try: data = resp.json() except Exception: pass if resp.status_code == 200 and out_path.exists(): return out_path, data logger.warning(f"Renderer service {url} returned {resp.status_code}: {resp.text[:500]}") except Exception as exc: logger.warning(f"Renderer service {url} unreachable: {exc}") return None, {} def _generate_thumbnail_placeholder(step_path: Path, out_path: Path, fmt: str = "png") -> Path | None: """Generate a simple placeholder thumbnail using Pillow.""" try: from PIL import Image, ImageDraw, ImageFont W, H = 512, 512 img = Image.new("RGB", (W, H), color=(245, 246, 248)) draw = ImageDraw.Draw(img) # Subtle grid for i in range(0, W, 32): draw.line([(i, 0), (i, H)], fill=(228, 230, 235), width=1) draw.line([(0, i), (W, i)], fill=(228, 230, 235), width=1) # Isometric box (front / top / right faces) cx, cy = 256, 260 s = 110 # half-size # Front face draw.polygon( [(cx - s, cy), (cx, cy + s // 2), (cx + s, cy), (cx, cy - s // 2)], fill=(195, 208, 220), outline=(90, 110, 130), width=2, ) # Top face draw.polygon( [(cx - s, cy - s), (cx, cy - s - s // 2), (cx + s, cy - s), (cx, cy - s + s // 2)], fill=(220, 230, 240), outline=(90, 110, 130), width=2, ) # Right pillar draw.polygon( [(cx + s, cy - s), (cx + s, cy), (cx, cy + s // 2), (cx, cy - s + s // 2)], fill=(160, 178, 196), outline=(90, 110, 130), width=2, ) # Schaeffler green top bar draw.rectangle([0, 0, W, 10], fill=(0, 137, 61)) # Model name strip at bottom name = step_path.stem draw.rectangle([0, H - 52, W, H], fill=(30, 50, 70)) try: font = ImageFont.load_default(size=15) draw.text((W // 2, H - 26), name, fill=(255, 255, 255), anchor="mm", font=font) except Exception: draw.text((10, H - 38), name, fill=(255, 255, 255)) if fmt == "jpg": img = img.convert("RGB") img.save(str(out_path), "JPEG", quality=92, optimize=True) else: img.save(str(out_path), "PNG") return out_path except Exception as exc: logger.warning(f"Pillow placeholder thumbnail failed: {exc}") return None def regenerate_cad_thumbnail(cad_file_id: str, part_colors: dict[str, str]) -> bool: """ Regenerate a thumbnail with per-part colours for an existing CAD file. Called from the `regenerate_thumbnail` Celery task. Returns True on success. """ from app.config import settings as app_settings from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.models.cad_file import CadFile, ProcessingStatus db_engine = create_engine(app_settings.database_url_sync) with Session(db_engine) as session: cad_file = session.get(CadFile, uuid.UUID(cad_file_id)) if not cad_file: logger.error(f"CAD file not found: {cad_file_id}") return False step_path = Path(cad_file.stored_path) if not step_path.exists(): logger.error(f"STEP file not found: {step_path}") return False # Mark as processing so the activity page shows it as active cad_file.processing_status = ProcessingStatus.processing session.commit() try: thumb_path, render_log = _generate_thumbnail( step_path, cad_file_id, app_settings.upload_dir, part_colors=part_colors ) if thumb_path: cad_file.thumbnail_path = str(thumb_path) cad_file.render_log = render_log cad_file.processing_status = ProcessingStatus.completed session.commit() logger.info(f"Thumbnail regenerated for CAD file {cad_file_id}") return True except Exception as exc: logger.error(f"Thumbnail regeneration failed for {cad_file_id}: {exc}") cad_file.processing_status = ProcessingStatus.failed cad_file.error_message = str(exc)[:2000] session.commit() return False def render_to_file( step_path: str, output_path: str, part_colors: dict[str, str] | None = None, width: int | None = None, height: int | None = None, transparent_bg: bool = False, engine: str | None = None, samples: int | None = None, template_path: str | None = None, target_collection: str = "Product", material_library_path: str | None = None, material_map: dict | None = None, part_names_ordered: list | None = None, lighting_only: bool = False, shadow_catcher: bool = False, cycles_device: str | None = None, rotation_x: float = 0.0, rotation_y: float = 0.0, rotation_z: float = 0.0, job_id: str | None = None, noise_threshold: str = "", denoiser: str = "", denoising_input_passes: str = "", denoising_prefilter: str = "", denoising_quality: str = "", denoising_use_gpu: str = "", ) -> tuple[bool, dict]: """Render a STEP file to a specific output path using current system settings. Unlike regenerate_cad_thumbnail, this does NOT modify the shared CadFile record. Used by render_order_line_task for per-order-line render outputs. Args: step_path: Absolute path to the STEP file on disk. output_path: Absolute path for the rendered output file. part_colors: Optional {part_name: hex_color} map. width: Optional render width (overrides system default). height: Optional render height (overrides system default). transparent_bg: If True and renderer=blender+PNG, render with transparent background. engine: Optional per-OT engine override ("cycles" | "eevee"), or None for system default. samples: Optional per-OT samples override, or None for system default. template_path: Optional path to a .blend template file. target_collection: Blender collection name to import geometry into. material_library_path: Optional path to material library .blend file. material_map: Optional {part_name: material_name} for material replacement. Returns: (success: bool, render_log: dict) """ import time step = Path(step_path) out = Path(output_path) out.parent.mkdir(parents=True, exist_ok=True) settings = _get_all_settings() renderer = settings["thumbnail_renderer"] fmt = out.suffix.lstrip(".") or settings.get("thumbnail_format", "jpg") if fmt not in ("jpg", "png"): fmt = "jpg" # Temporary PNG for service renderers tmp_png = out.parent / f"_tmp_{out.stem}.png" render_log: dict = { "renderer": renderer, "format": fmt, "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } rendered_png: Path | None = None service_data: dict = {} if renderer == "blender": actual_engine = engine or settings["blender_engine"] actual_samples = samples or int(settings[f"blender_{actual_engine}_samples"]) actual_cycles_device = cycles_device or settings["cycles_device"] w = width or 512 h = height or 512 render_log.update({ "engine": actual_engine, "samples": actual_samples, "stl_quality": settings["stl_quality"], "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": actual_cycles_device, "width": w, "height": h, }) extra = { "engine": actual_engine, "samples": actual_samples, "stl_quality": settings["stl_quality"], "smooth_angle": int(settings["blender_smooth_angle"]), "cycles_device": actual_cycles_device, "width": w, "height": h, "transparent_bg": transparent_bg, } if part_colors is not None: extra["part_colors"] = part_colors if template_path: extra["template_path"] = template_path extra["target_collection"] = target_collection extra["lighting_only"] = lighting_only extra["shadow_catcher"] = shadow_catcher render_log["template"] = template_path render_log["target_collection"] = target_collection if lighting_only: render_log["lighting_only"] = True if shadow_catcher: render_log["shadow_catcher"] = True if material_library_path and material_map: extra["material_library_path"] = material_library_path extra["material_map"] = material_map render_log["material_replace"] = True if part_names_ordered: extra["part_names_ordered"] = part_names_ordered if rotation_x or rotation_y or rotation_z: extra["rotation_x"] = rotation_x extra["rotation_y"] = rotation_y extra["rotation_z"] = rotation_z if noise_threshold: extra["noise_threshold"] = noise_threshold if denoiser: extra["denoiser"] = denoiser if denoising_input_passes: extra["denoising_input_passes"] = denoising_input_passes if denoising_prefilter: extra["denoising_prefilter"] = denoising_prefilter if denoising_quality: extra["denoising_quality"] = denoising_quality if denoising_use_gpu: extra["denoising_use_gpu"] = denoising_use_gpu rendered_png, service_data = _render_via_service( "http://blender-renderer:8100/render", step, tmp_png, extra, job_id=job_id ) elif renderer == "threejs": default_size = int(settings["threejs_render_size"]) w = width or default_size h = height or default_size render_log.update({"width": w, "height": h}) extra2: dict = {"width": w, "height": h} if part_colors is not None: extra2["part_colors"] = part_colors rendered_png, service_data = _render_via_service( "http://threejs-renderer:8101/render", step, tmp_png, extra2 ) if service_data: for key in ("total_duration_s", "stl_duration_s", "render_duration_s", "stl_size_bytes", "output_size_bytes", "parts_count", "engine_used", "log_lines"): if key in service_data: render_log[key] = service_data[key] render_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) if rendered_png: result = _finalise_image(rendered_png, out, fmt) tmp_png.unlink(missing_ok=True) render_log["fallback"] = False return result is not None, render_log # Pillow placeholder fallback render_log["fallback"] = True result = _generate_thumbnail_placeholder(step, out, fmt) return result is not None, render_log def _convert_to_gltf(step_path: Path, cad_file_id: str, upload_dir: str) -> Path | None: """Convert STEP to glTF for browser 3D viewer.""" out_dir = Path(upload_dir) / "gltf" out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / f"{cad_file_id}.gltf" try: import trimesh mesh = trimesh.load(str(step_path)) if isinstance(mesh, trimesh.Scene): exported = mesh.export(str(out_path)) else: scene = trimesh.Scene(mesh) exported = scene.export(str(out_path)) return out_path if out_path.exists() else None except ImportError: logger.warning("trimesh not available; skipping glTF conversion") except Exception as exc: logger.warning(f"glTF conversion failed: {exc}") return None