Files
HartOMat/backend/app/services/step_processor.py
T
Hartmut 89c44b846f feat(phase5.1+6): fallback material cleanup + notification batch refactor
Phase 5.1 — MATERIAL_PALETTE removal:
- Remove MATERIAL_PALETTE + _material_to_color() from step_processor.py
- build_part_colors() now returns {part→material_name} for Blender resolver

Phase 6 — Notification Center Refactor:
- Migration 051: add channel (activity|notification|alert) to audit_log,
  add frequency (immediate|daily|never) to notification_configs
- Three notification channels: activity (per-render), notification (batch
  order summaries), alert (admin infrastructure)
- Per-render emit_notification_sync calls demoted to channel=activity
- New emit_batch_render_notification_sync(): single summary notification
  when all order lines reach terminal state ("47/50 succeeded, 3 failed")
- Beat task batch_render_notifications every 60s: safety-net for missed
  batch notifications after order completion
- GET /notifications: defaults to channel IN (notification, alert);
  accepts ?channel=activity for activity feed
- Unread count badge counts only notification+alert channels
- Notifications.tsx: three tabs (Notifications | Activity | Alerts)
- NotificationSettings.tsx: frequency dropdown per event type

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 20:20:07 +01:00

896 lines
34 KiB
Python

"""
STEP file processor — Phase 3 implementation.
Extracts object names from STEP files using pythonocc-core (OCC),
generates thumbnails using trimesh + pyrender, and converts to glTF.
This module is invoked from the Celery worker (step_tasks.py).
"""
import logging
import uuid
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from app.models.cad_file import CadFile
logger = logging.getLogger(__name__)
def build_part_colors(
cad_parsed_objects: list[str],
cad_part_materials: list[dict],
) -> dict[str, str]:
"""
Build {part_name: material_name} for Blender rendering.
Returns a mapping of part name → Schaeffler material name (e.g. SCHAEFFLER_010101_Steel-Bare).
Parts with no material assignment are omitted; Blender will use the fallback material
(SCHAEFFLER_059999_FailedMaterial) for unrecognised parts.
Args:
cad_parsed_objects: List of part names from cad_file.parsed_objects["objects"].
cad_part_materials: List of {part_name, material} dicts from order_item.cad_part_materials.
"""
result = {}
for m in cad_part_materials:
part = m.get("part_name", "").strip()
material = m.get("material", "").strip()
if part and material:
result[part] = material
return result
def _normalize_stem(name: str) -> str:
"""Normalize a filename stem for comparison: lowercase, strip .stp/.step extension."""
stem = name.strip()
for ext in (".step", ".stp"):
if stem.lower().endswith(ext):
stem = stem[: -len(ext)]
break
return stem.lower()
def match_cad_to_items(
cad_file: "CadFile",
item_names: list[str],
) -> list[str]:
"""
Match a CadFile to a list of OrderItem name_cad_modell values.
Matching is case-insensitive and normalizes .stp/.step extensions so that
a file named '81113-L_cut.stp' matches an item named '81113-l_cut' or
'81113-L_cut.step'.
Args:
cad_file: A CadFile ORM object (needs .original_name).
item_names: List of name_cad_modell strings from OrderItems.
Returns:
List of matched item names (subset of item_names).
"""
cad_stem = _normalize_stem(cad_file.original_name or "")
matched = []
for name in item_names:
if not name:
continue
if _normalize_stem(name) == cad_stem:
matched.append(name)
return matched
def extract_cad_metadata(cad_file_id: str) -> None:
"""
Fast metadata extraction for a CAD file (no thumbnail generation).
Does everything process_cad_file() does EXCEPT thumbnail rendering:
- Sets status to processing
- Extracts STEP object names
- Converts to glTF
- Leaves status as processing (render_step_thumbnail task will complete it)
- On exception: sets status to failed
"""
from app.config import settings
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.models.cad_file import CadFile, ProcessingStatus
engine = create_engine(settings.database_url_sync)
with Session(engine) as session:
cad_file = session.get(CadFile, uuid.UUID(cad_file_id))
if not cad_file:
logger.error(f"CAD file not found: {cad_file_id}")
return
cad_file.processing_status = ProcessingStatus.processing
session.commit()
try:
step_path = Path(cad_file.stored_path)
if not step_path.exists():
raise FileNotFoundError(f"STEP file not found: {step_path}")
objects = _extract_step_objects(step_path)
cad_file.parsed_objects = {"objects": objects}
edge_data = extract_mesh_edge_data(str(step_path))
if edge_data:
cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **edge_data}
gltf_path = _convert_to_gltf(step_path, cad_file_id, settings.upload_dir)
if gltf_path:
cad_file.gltf_path = str(gltf_path)
# Leave status as processing — render_step_thumbnail will complete it
logger.info(f"CAD metadata extracted: {cad_file_id} ({len(objects)} objects)")
except Exception as exc:
logger.error(f"CAD metadata extraction failed for {cad_file_id}: {exc}")
cad_file.processing_status = ProcessingStatus.failed
cad_file.error_message = str(exc)[:2000]
session.commit()
def process_cad_file(cad_file_id: str) -> None:
"""
Full processing pipeline for a CAD file:
1. Load STEP file with pythonocc
2. Extract part/object names
3. Generate thumbnail PNG
4. Convert to glTF for browser viewer
5. Update DB record
"""
from app.config import settings
# Synchronous DB access for Celery worker
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.models.cad_file import CadFile, ProcessingStatus
engine = create_engine(settings.database_url_sync)
with Session(engine) as session:
cad_file = session.get(CadFile, uuid.UUID(cad_file_id))
if not cad_file:
logger.error(f"CAD file not found: {cad_file_id}")
return
cad_file.processing_status = ProcessingStatus.processing
session.commit()
try:
step_path = Path(cad_file.stored_path)
if not step_path.exists():
raise FileNotFoundError(f"STEP file not found: {step_path}")
# Step 1: Extract object names
objects = _extract_step_objects(step_path)
cad_file.parsed_objects = {"objects": objects}
# Step 1b: Extract sharp-edge topology data and merge into mesh_attributes
edge_data = extract_mesh_edge_data(str(step_path))
if edge_data:
cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **edge_data}
# Step 2: Generate thumbnail — pass empty part_colors so the Three.js
# renderer extracts named parts and auto-assigns palette colours.
# Other renderers (Blender, Pillow) ignore the part_colors argument.
thumb_path, render_log = _generate_thumbnail(step_path, cad_file_id, settings.upload_dir, part_colors={})
if thumb_path:
cad_file.thumbnail_path = str(thumb_path)
cad_file.render_log = render_log
# Step 3: Convert to glTF
gltf_path = _convert_to_gltf(step_path, cad_file_id, settings.upload_dir)
if gltf_path:
cad_file.gltf_path = str(gltf_path)
cad_file.processing_status = ProcessingStatus.completed
logger.info(f"CAD file processed successfully: {cad_file_id}")
except Exception as exc:
logger.error(f"CAD processing failed for {cad_file_id}: {exc}")
cad_file.processing_status = ProcessingStatus.failed
cad_file.error_message = str(exc)[:2000]
session.commit()
def extract_mesh_edge_data(step_path: str) -> dict:
"""Extract sharp edge metrics and suggested smooth angle from STEP topology.
Returns dict with:
- suggested_smooth_angle: float (degrees) — recommended auto-smooth angle
- has_mechanical_edges: bool — True if part has distinct hard edges (bearings etc.)
- sharp_edge_midpoints: list of [x, y, z] — midpoints of sharp edges in mm (max 500)
"""
try:
from OCC.Core.STEPControl import STEPControl_Reader
from OCC.Core.IFSelect import IFSelect_RetDone
from OCC.Core.TopExp import TopExp_Explorer
from OCC.Core.TopAbs import TopAbs_EDGE, TopAbs_FACE
from OCC.Core.BRepAdaptor import BRepAdaptor_Surface
from OCC.Core.BRep import BRep_Tool
from OCC.Core.BRepGProp import brepgprop
from OCC.Core.GProp import GProp_GProps
from OCC.Core.BRepMesh import BRepMesh_IncrementalMesh
from OCC.Core.gp import gp_Pnt
import math
reader = STEPControl_Reader()
status = reader.ReadFile(step_path)
if status != IFSelect_RetDone:
return {}
reader.TransferRoots()
shape = reader.OneShape()
# Mesh the shape for geometry access
BRepMesh_IncrementalMesh(shape, 0.5, False, 0.5)
# Collect face normals per edge (for dihedral angle computation)
from OCC.Core.TopTools import TopTools_IndexedDataMapOfShapeListOfShape
from OCC.Core.TopExp import topexp
edge_face_map = TopTools_IndexedDataMapOfShapeListOfShape()
topexp.MapShapesAndAncestors(shape, TopAbs_EDGE, TopAbs_FACE, edge_face_map)
dihedral_angles = []
sharp_midpoints = []
for i in range(1, edge_face_map.Extent() + 1):
edge = edge_face_map.FindKey(i)
faces = edge_face_map.FindFromIndex(i)
if faces.Size() < 2:
continue
# Get the two adjacent faces
face_list = list(faces)
if len(face_list) < 2:
continue
try:
surf1 = BRepAdaptor_Surface(face_list[0])
surf2 = BRepAdaptor_Surface(face_list[1])
# Get normals at midpoint of edge
from OCC.Core.BRepAdaptor import BRepAdaptor_Curve
curve = BRepAdaptor_Curve(edge)
mid_u = (curve.FirstParameter() + curve.LastParameter()) / 2
mid_pt = curve.Value(mid_u)
# Sample face normals at UV center
u1 = (surf1.FirstUParameter() + surf1.LastUParameter()) / 2
v1 = (surf1.FirstVParameter() + surf1.LastVParameter()) / 2
n1 = surf1.DN(u1, v1, 0, 1).Crossed(surf1.DN(u1, v1, 1, 0))
u2 = (surf2.FirstUParameter() + surf2.LastUParameter()) / 2
v2 = (surf2.FirstVParameter() + surf2.LastVParameter()) / 2
n2 = surf2.DN(u2, v2, 0, 1).Crossed(surf2.DN(u2, v2, 1, 0))
if n1.Magnitude() > 1e-10 and n2.Magnitude() > 1e-10:
n1.Normalize()
n2.Normalize()
cos_angle = max(-1.0, min(1.0, n1.Dot(n2)))
angle_deg = math.degrees(math.acos(abs(cos_angle)))
dihedral_angles.append(angle_deg)
if angle_deg > 20 and len(sharp_midpoints) < 500:
sharp_midpoints.append([
round(mid_pt.X(), 3),
round(mid_pt.Y(), 3),
round(mid_pt.Z(), 3),
])
except Exception:
continue
# Bounding box extraction (OCC Bnd_Box)
from OCC.Core.Bnd import Bnd_Box
from OCC.Core.BRepBndLib import brepbndlib
try:
bbox = Bnd_Box()
brepbndlib.Add(shape, bbox)
xmin, ymin, zmin, xmax, ymax, zmax = bbox.Get()
dimensions_mm = {
"x": round(xmax - xmin, 2),
"y": round(ymax - ymin, 2),
"z": round(zmax - zmin, 2),
}
bbox_center_mm = {
"x": round((xmin + xmax) / 2, 2),
"y": round((ymin + ymax) / 2, 2),
"z": round((zmin + zmax) / 2, 2),
}
except Exception:
dimensions_mm = None
bbox_center_mm = None
if not dihedral_angles:
result: dict = {}
if dimensions_mm:
result["dimensions_mm"] = dimensions_mm
result["bbox_center_mm"] = bbox_center_mm
return result
import statistics
median_angle = statistics.median(dihedral_angles)
max_angle = max(dihedral_angles)
# Suggest smooth angle: slightly below the median of hard edges
hard_edges = [a for a in dihedral_angles if a > 20]
if hard_edges:
suggested = max(15.0, min(60.0, statistics.median(hard_edges) * 0.8))
else:
suggested = 30.0
result = {
"suggested_smooth_angle": round(suggested, 1),
"has_mechanical_edges": max_angle > 45,
"sharp_edge_midpoints": sharp_midpoints[:500],
}
if dimensions_mm:
result["dimensions_mm"] = dimensions_mm
result["bbox_center_mm"] = bbox_center_mm
return result
except ImportError:
# OCC not available (e.g. in backend container)
return {}
except Exception as exc:
logger.warning("extract_mesh_edge_data failed (non-fatal): %s", exc)
return {}
def _extract_step_objects(step_path: Path) -> list[str]:
"""Extract part names from STEP file using pythonocc."""
try:
from OCC.Core.STEPCAFControl import STEPCAFControl_Reader
from OCC.Core.XCAFDoc import XCAFDoc_DocumentTool
from OCC.Core.TDocStd import TDocStd_Document
from OCC.Core.TDataStd import TDataStd_Name
from OCC.Core.TCollection import TCollection_ExtendedString
doc = TDocStd_Document(TCollection_ExtendedString("MDTV-CAF"))
reader = STEPCAFControl_Reader()
reader.SetColorMode(True)
reader.SetNameMode(True)
status = reader.ReadFile(str(step_path))
if not reader.Transfer(doc):
return []
shape_tool = XCAFDoc_DocumentTool.ShapeTool(doc.Main())
labels = []
shape_tool.GetFreeShapes(labels)
names = []
for label in labels:
name_attr = TDataStd_Name()
if label.FindAttribute(TDataStd_Name.GetID(), name_attr):
names.append(name_attr.Get().ToExtString())
return names
except ImportError:
logger.warning("pythonocc-core not available; skipping object extraction")
return _extract_step_objects_fallback(step_path)
except Exception as exc:
logger.warning(f"OCC extraction failed: {exc}")
return _extract_step_objects_fallback(step_path)
def _extract_step_objects_fallback(step_path: Path) -> list[str]:
"""Simple text-based extraction of part names from STEP file."""
names = []
try:
with open(step_path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
# STEP format: PRODUCT('name','description',...
if "PRODUCT(" in line:
parts = line.split("PRODUCT(")
for part in parts[1:]:
if "'" in part:
name = part.split("'")[1]
if name and name not in names:
names.append(name)
except Exception:
pass
return names
def _get_all_settings() -> dict[str, str]:
"""Read all system settings from the database."""
defaults = {
"thumbnail_renderer": "pillow",
"blender_engine": "cycles",
"blender_cycles_samples": "256",
"blender_eevee_samples": "64",
"threejs_render_size": "1024",
"thumbnail_format": "jpg",
"stl_quality": "low",
"blender_smooth_angle": "30",
"cycles_device": "auto",
}
try:
from app.config import settings as app_settings
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
engine = create_engine(app_settings.database_url_sync)
with Session(engine) as session:
result = session.execute(text("SELECT key, value FROM system_settings"))
stored = {row[0]: row[1] for row in result.fetchall()}
return {k: stored.get(k, v) for k, v in defaults.items()}
except Exception as exc:
logger.warning(f"Could not read settings: {exc}; using defaults")
return defaults
def _generate_thumbnail(
step_path: Path,
cad_file_id: str,
upload_dir: str,
part_colors: dict[str, str] | None = None,
) -> tuple[Path | None, dict]:
"""Generate thumbnail using the configured renderer.
Returns (thumb_path, render_log_dict).
render_log_dict contains all settings + timing + blender output.
"""
import time
out_dir = Path(upload_dir) / "thumbnails"
out_dir.mkdir(parents=True, exist_ok=True)
settings = _get_all_settings()
renderer = settings["thumbnail_renderer"]
fmt = settings["thumbnail_format"] # "jpg" or "png"
ext = "jpg" if fmt == "jpg" else "png"
# Clean up any existing thumbnail for this cad_file_id (either extension)
for old_ext in ("png", "jpg"):
old = out_dir / f"{cad_file_id}.{old_ext}"
if old.exists():
old.unlink(missing_ok=True)
final_path = out_dir / f"{cad_file_id}.{ext}"
# Intermediate PNG used when a service renderer produces PNG before conversion
tmp_png = out_dir / f"{cad_file_id}_tmp.png"
# Build the base render_log with the settings snapshot
render_log: dict = {
"renderer": renderer,
"format": fmt,
"started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
if renderer == "blender":
engine = settings["blender_engine"]
render_log.update({
"engine": engine,
"samples": int(settings[f"blender_{engine}_samples"]),
"stl_quality": settings["stl_quality"],
"smooth_angle": int(settings["blender_smooth_angle"]),
"cycles_device": settings["cycles_device"],
"width": 512,
"height": 512,
})
elif renderer == "threejs":
# Three.js renderer removed in v2; treat as pillow fallback
renderer = "pillow"
render_log.update({"renderer": "pillow", "threejs_removed": True})
logger.info(f"Thumbnail renderer={renderer}, format={fmt}")
rendered_png: Path | None = None
service_data: dict = {}
if renderer == "blender":
engine = settings["blender_engine"]
samples = int(settings[f"blender_{engine}_samples"])
from app.services.render_blender import is_blender_available, render_still
if is_blender_available():
try:
service_data = render_still(
step_path=step_path,
output_path=tmp_png,
engine=engine,
samples=samples,
stl_quality=settings["stl_quality"],
smooth_angle=int(settings["blender_smooth_angle"]),
cycles_device=settings["cycles_device"],
)
rendered_png = tmp_png if tmp_png.exists() else None
except Exception as exc:
logger.warning("Blender subprocess render failed: %s", exc)
rendered_png = None
else:
logger.warning("Blender not available in this container — falling back to Pillow placeholder")
# Merge rich service response data into render_log
if service_data:
for key in ("total_duration_s", "stl_duration_s", "render_duration_s",
"stl_size_bytes", "output_size_bytes", "parts_count",
"engine_used", "log_lines"):
if key in service_data:
render_log[key] = service_data[key]
render_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
if rendered_png:
result = _finalise_image(rendered_png, final_path, fmt)
tmp_png.unlink(missing_ok=True)
render_log["fallback"] = False
return result, render_log
# Pillow placeholder
render_log["fallback"] = True
return _generate_thumbnail_placeholder(step_path, final_path, fmt), render_log
def _finalise_image(src: Path, dst: Path, fmt: str) -> Path | None:
"""Convert src image to dst using the requested format (jpg or png)."""
if fmt == "jpg":
try:
from PIL import Image
img = Image.open(src).convert("RGB")
img.save(str(dst), "JPEG", quality=92, optimize=True)
return dst
except Exception as exc:
logger.warning(f"JPG conversion failed: {exc}; keeping PNG")
src.rename(dst.with_suffix(".png"))
return dst.with_suffix(".png")
else:
src.rename(dst)
return dst
def _render_via_service(
url: str, step_path: Path, out_path: Path, extra: dict | None = None,
job_id: str | None = None,
) -> tuple[Path | None, dict]:
"""Call an external renderer microservice to generate a thumbnail.
Returns (path_or_None, response_data_dict).
job_id, when provided, is forwarded to the renderer so the render process
can be cancelled via the renderer's /cancel/{job_id} endpoint.
"""
try:
import httpx
payload = {
"step_path": str(step_path),
"output_path": str(out_path),
"width": 512,
"height": 512,
**(extra or {}),
}
if job_id:
payload["job_id"] = job_id
resp = httpx.post(url, json=payload, timeout=300.0)
data = {}
try:
data = resp.json()
except Exception:
pass
if resp.status_code == 200 and out_path.exists():
return out_path, data
logger.warning(f"Renderer service {url} returned {resp.status_code}: {resp.text[:500]}")
except Exception as exc:
logger.warning(f"Renderer service {url} unreachable: {exc}")
return None, {}
def _generate_thumbnail_placeholder(step_path: Path, out_path: Path, fmt: str = "png") -> Path | None:
"""Generate a simple placeholder thumbnail using Pillow."""
try:
from PIL import Image, ImageDraw, ImageFont
W, H = 512, 512
img = Image.new("RGB", (W, H), color=(245, 246, 248))
draw = ImageDraw.Draw(img)
# Subtle grid
for i in range(0, W, 32):
draw.line([(i, 0), (i, H)], fill=(228, 230, 235), width=1)
draw.line([(0, i), (W, i)], fill=(228, 230, 235), width=1)
# Isometric box (front / top / right faces)
cx, cy = 256, 260
s = 110 # half-size
# Front face
draw.polygon(
[(cx - s, cy), (cx, cy + s // 2), (cx + s, cy), (cx, cy - s // 2)],
fill=(195, 208, 220), outline=(90, 110, 130), width=2,
)
# Top face
draw.polygon(
[(cx - s, cy - s), (cx, cy - s - s // 2), (cx + s, cy - s), (cx, cy - s + s // 2)],
fill=(220, 230, 240), outline=(90, 110, 130), width=2,
)
# Right pillar
draw.polygon(
[(cx + s, cy - s), (cx + s, cy), (cx, cy + s // 2), (cx, cy - s + s // 2)],
fill=(160, 178, 196), outline=(90, 110, 130), width=2,
)
# Schaeffler green top bar
draw.rectangle([0, 0, W, 10], fill=(0, 137, 61))
# Model name strip at bottom
name = step_path.stem
draw.rectangle([0, H - 52, W, H], fill=(30, 50, 70))
try:
font = ImageFont.load_default(size=15)
draw.text((W // 2, H - 26), name, fill=(255, 255, 255), anchor="mm", font=font)
except Exception:
draw.text((10, H - 38), name, fill=(255, 255, 255))
if fmt == "jpg":
img = img.convert("RGB")
img.save(str(out_path), "JPEG", quality=92, optimize=True)
else:
img.save(str(out_path), "PNG")
return out_path
except Exception as exc:
logger.warning(f"Pillow placeholder thumbnail failed: {exc}")
return None
def regenerate_cad_thumbnail(cad_file_id: str, part_colors: dict[str, str]) -> bool:
"""
Regenerate a thumbnail with per-part colours for an existing CAD file.
Called from the `regenerate_thumbnail` Celery task.
Returns True on success.
"""
from app.config import settings as app_settings
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.models.cad_file import CadFile, ProcessingStatus
db_engine = create_engine(app_settings.database_url_sync)
with Session(db_engine) as session:
cad_file = session.get(CadFile, uuid.UUID(cad_file_id))
if not cad_file:
logger.error(f"CAD file not found: {cad_file_id}")
return False
step_path = Path(cad_file.stored_path)
if not step_path.exists():
logger.error(f"STEP file not found: {step_path}")
return False
# Mark as processing so the activity page shows it as active
cad_file.processing_status = ProcessingStatus.processing
session.commit()
try:
thumb_path, render_log = _generate_thumbnail(
step_path, cad_file_id, app_settings.upload_dir, part_colors=part_colors
)
if thumb_path:
cad_file.thumbnail_path = str(thumb_path)
cad_file.render_log = render_log
cad_file.processing_status = ProcessingStatus.completed
session.commit()
logger.info(f"Thumbnail regenerated for CAD file {cad_file_id}")
return True
except Exception as exc:
logger.error(f"Thumbnail regeneration failed for {cad_file_id}: {exc}")
cad_file.processing_status = ProcessingStatus.failed
cad_file.error_message = str(exc)[:2000]
session.commit()
return False
def render_to_file(
step_path: str,
output_path: str,
part_colors: dict[str, str] | None = None,
width: int | None = None,
height: int | None = None,
transparent_bg: bool = False,
engine: str | None = None,
samples: int | None = None,
template_path: str | None = None,
target_collection: str = "Product",
material_library_path: str | None = None,
material_map: dict | None = None,
part_names_ordered: list | None = None,
lighting_only: bool = False,
shadow_catcher: bool = False,
cycles_device: str | None = None,
rotation_x: float = 0.0,
rotation_y: float = 0.0,
rotation_z: float = 0.0,
job_id: str | None = None,
noise_threshold: str = "",
denoiser: str = "",
denoising_input_passes: str = "",
denoising_prefilter: str = "",
denoising_quality: str = "",
denoising_use_gpu: str = "",
order_line_id: str | None = None,
) -> tuple[bool, dict]:
"""Render a STEP file to a specific output path using current system settings.
Unlike regenerate_cad_thumbnail, this does NOT modify the shared CadFile record.
Used by render_order_line_task for per-order-line render outputs.
Args:
step_path: Absolute path to the STEP file on disk.
output_path: Absolute path for the rendered output file.
part_colors: Optional {part_name: hex_color} map.
width: Optional render width (overrides system default).
height: Optional render height (overrides system default).
transparent_bg: If True and renderer=blender+PNG, render with transparent background.
engine: Optional per-OT engine override ("cycles" | "eevee"), or None for system default.
samples: Optional per-OT samples override, or None for system default.
template_path: Optional path to a .blend template file.
target_collection: Blender collection name to import geometry into.
material_library_path: Optional path to material library .blend file.
material_map: Optional {part_name: material_name} for material replacement.
order_line_id: Optional order line ID for live log streaming.
Returns:
(success: bool, render_log: dict)
"""
import time
step = Path(step_path)
out = Path(output_path)
out.parent.mkdir(parents=True, exist_ok=True)
settings = _get_all_settings()
renderer = settings["thumbnail_renderer"]
fmt = out.suffix.lstrip(".") or settings.get("thumbnail_format", "jpg")
if fmt not in ("jpg", "png"):
fmt = "jpg"
# Temporary PNG for service renderers
tmp_png = out.parent / f"_tmp_{out.stem}.png"
render_log: dict = {
"renderer": renderer,
"format": fmt,
"started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
rendered_png: Path | None = None
service_data: dict = {}
if renderer == "blender":
actual_engine = engine or settings["blender_engine"]
actual_samples = samples or int(settings[f"blender_{actual_engine}_samples"])
actual_cycles_device = cycles_device or settings["cycles_device"]
w = width or 512
h = height or 512
render_log.update({
"engine": actual_engine, "samples": actual_samples,
"stl_quality": settings["stl_quality"],
"smooth_angle": int(settings["blender_smooth_angle"]),
"cycles_device": actual_cycles_device,
"width": w, "height": h,
})
extra = {
"engine": actual_engine, "samples": actual_samples,
"stl_quality": settings["stl_quality"],
"smooth_angle": int(settings["blender_smooth_angle"]),
"cycles_device": actual_cycles_device,
"width": w, "height": h,
"transparent_bg": transparent_bg,
}
if part_colors is not None:
extra["part_colors"] = part_colors
if template_path:
extra["template_path"] = template_path
extra["target_collection"] = target_collection
extra["lighting_only"] = lighting_only
extra["shadow_catcher"] = shadow_catcher
render_log["template"] = template_path
render_log["target_collection"] = target_collection
if lighting_only:
render_log["lighting_only"] = True
if shadow_catcher:
render_log["shadow_catcher"] = True
if material_library_path and material_map:
extra["material_library_path"] = material_library_path
extra["material_map"] = material_map
render_log["material_replace"] = True
if part_names_ordered:
extra["part_names_ordered"] = part_names_ordered
if rotation_x or rotation_y or rotation_z:
extra["rotation_x"] = rotation_x
extra["rotation_y"] = rotation_y
extra["rotation_z"] = rotation_z
if noise_threshold:
extra["noise_threshold"] = noise_threshold
if denoiser:
extra["denoiser"] = denoiser
if denoising_input_passes:
extra["denoising_input_passes"] = denoising_input_passes
if denoising_prefilter:
extra["denoising_prefilter"] = denoising_prefilter
if denoising_quality:
extra["denoising_quality"] = denoising_quality
if denoising_use_gpu:
extra["denoising_use_gpu"] = denoising_use_gpu
from app.services.render_blender import is_blender_available, render_still
# Build live-log callback for streaming Blender output to Redis
_log_cb = None
if order_line_id:
from app.services import render_log as _rl
_log_cb = lambda line: _rl.emit(order_line_id, line)
if is_blender_available():
try:
service_data = render_still(
step_path=step,
output_path=tmp_png,
engine=actual_engine,
samples=actual_samples,
stl_quality=settings["stl_quality"],
smooth_angle=int(settings["blender_smooth_angle"]),
cycles_device=actual_cycles_device,
width=w, height=h,
transparent_bg=transparent_bg,
part_colors=part_colors,
template_path=template_path,
target_collection=target_collection,
material_library_path=material_library_path,
material_map=material_map,
part_names_ordered=part_names_ordered,
lighting_only=lighting_only,
shadow_catcher=shadow_catcher,
rotation_x=rotation_x, rotation_y=rotation_y, rotation_z=rotation_z,
noise_threshold=noise_threshold, denoiser=denoiser,
denoising_input_passes=denoising_input_passes,
denoising_prefilter=denoising_prefilter,
denoising_quality=denoising_quality,
denoising_use_gpu=denoising_use_gpu,
log_callback=_log_cb,
)
rendered_png = tmp_png if tmp_png.exists() else None
except Exception as exc:
logger.warning("Blender subprocess render failed: %s", exc)
rendered_png = None
else:
logger.warning("Blender not available in this container — using Pillow fallback")
elif renderer == "threejs":
# Three.js renderer removed in v2 — fall through to Pillow placeholder
logger.warning("Three.js renderer removed; using Pillow fallback")
if service_data:
for key in ("total_duration_s", "stl_duration_s", "render_duration_s",
"stl_size_bytes", "output_size_bytes", "parts_count",
"engine_used", "log_lines"):
if key in service_data:
render_log[key] = service_data[key]
render_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
if rendered_png:
result = _finalise_image(rendered_png, out, fmt)
tmp_png.unlink(missing_ok=True)
render_log["fallback"] = False
return result is not None, render_log
# Pillow placeholder fallback
render_log["fallback"] = True
result = _generate_thumbnail_placeholder(step, out, fmt)
return result is not None, render_log
def _convert_to_gltf(step_path: Path, cad_file_id: str, upload_dir: str) -> Path | None:
"""Convert STEP to glTF for browser 3D viewer."""
out_dir = Path(upload_dir) / "gltf"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{cad_file_id}.gltf"
try:
import trimesh
mesh = trimesh.load(str(step_path))
if isinstance(mesh, trimesh.Scene):
exported = mesh.export(str(out_path))
else:
scene = trimesh.Scene(mesh)
exported = scene.export(str(out_path))
return out_path if out_path.exists() else None
except ImportError:
logger.warning("trimesh not available; skipping glTF conversion")
except Exception as exc:
logger.warning(f"glTF conversion failed: {exc}")
return None