feat: performance optimizations + part-materials validation
- @timed_step decorator with wall-clock + RSS tracking (pipeline_logger) - Blender timing laps for sharp edges and material assignment - MeshRegistry pattern: eliminate 13 scene.traverse() calls across viewers - Lazy material cloning (clone-on-first-write in both viewers) - _pipeline_session context manager: 7 create_engine() → 2 in render_thumbnail - KD-tree spatial pre-filter for sharp edge marking (bbox-based pruning) - Batch material library append: N bpy.ops.wm.append → single bpy.data.libraries.load - GMSH single-session batching: compound all solids into one tessellation call - Validate part-materials save endpoints against parsed_objects (prevents bogus keys) - ROADMAP updated with completion status Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -390,6 +390,28 @@ async def get_part_materials(
|
||||
)
|
||||
|
||||
|
||||
def _normalize_part_name(name: str) -> str:
|
||||
"""Strip OCC _AF\\d+ suffixes and lowercase for comparison."""
|
||||
import re
|
||||
n = name.strip().lower()
|
||||
prev = ""
|
||||
while prev != n:
|
||||
prev = n
|
||||
n = re.sub(r"_af\d+(_asm)?$", "", n)
|
||||
return n
|
||||
|
||||
|
||||
def _valid_part_names(cad) -> set[str] | None:
|
||||
"""Return normalized part names from parsed_objects, or None if unavailable."""
|
||||
po = cad.parsed_objects
|
||||
if not po or not isinstance(po, dict):
|
||||
return None
|
||||
objects = po.get("objects")
|
||||
if not objects or not isinstance(objects, list):
|
||||
return None
|
||||
return {_normalize_part_name(n) for n in objects if isinstance(n, str)}
|
||||
|
||||
|
||||
@router.put("/{id}/part-materials", response_model=PartMaterialsResponse)
|
||||
async def save_part_materials(
|
||||
id: uuid.UUID,
|
||||
@@ -401,10 +423,26 @@ async def save_part_materials(
|
||||
|
||||
Accepts a full dict of part-name -> {type, value} and overwrites the existing
|
||||
assignment. Pass an empty dict to clear all assignments.
|
||||
|
||||
Keys are validated against parsed_objects — unknown part names are rejected.
|
||||
"""
|
||||
if not is_privileged(user):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
|
||||
cad = await _get_cad_file(id, db)
|
||||
|
||||
# Validate keys against known part names from STEP extraction
|
||||
valid_names = _valid_part_names(cad)
|
||||
if valid_names is not None and body:
|
||||
invalid_keys = [
|
||||
k for k in body
|
||||
if _normalize_part_name(k) not in valid_names
|
||||
]
|
||||
if invalid_keys:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
detail=f"Unknown part names (not in parsed_objects): {invalid_keys[:10]}",
|
||||
)
|
||||
|
||||
# Serialise Pydantic models to plain dicts for JSONB storage
|
||||
cad.part_materials = {name: entry.model_dump() for name, entry in body.items()}
|
||||
cad.updated_at = datetime.utcnow()
|
||||
@@ -514,6 +552,20 @@ async def save_manual_material_overrides(
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
|
||||
|
||||
cad = await _get_cad_file(id, db)
|
||||
|
||||
# Validate keys against known part names (slugified form)
|
||||
valid_names = _valid_part_names(cad)
|
||||
if valid_names is not None and body.overrides:
|
||||
invalid_keys = [
|
||||
k for k in body.overrides
|
||||
if _normalize_part_name(k) not in valid_names
|
||||
]
|
||||
if invalid_keys:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
detail=f"Unknown part keys (not in parsed_objects): {invalid_keys[:10]}",
|
||||
)
|
||||
|
||||
cad.manual_material_overrides = body.overrides
|
||||
cad.updated_at = datetime.utcnow()
|
||||
await db.commit()
|
||||
|
||||
@@ -5,7 +5,9 @@ from all Celery pipeline tasks. Every method:
|
||||
- emits a Python `logging` line with a [STEP_NAME] prefix
|
||||
- publishes to Redis via log_task_event for SSE streaming in the UI
|
||||
"""
|
||||
import functools
|
||||
import logging
|
||||
import resource
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
@@ -104,3 +106,78 @@ class _StepContext:
|
||||
else:
|
||||
self._pl.step_error(self._name, str(exc_val), exc_val)
|
||||
return False # do not suppress exceptions
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# @timed_step decorator
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def timed_step(step_name: str, pipeline_logger: PipelineLogger | None = None):
|
||||
"""Decorator that auto-times a function and logs via PipelineLogger.
|
||||
|
||||
Captures wall-clock duration and peak RSS delta. If a Redis connection
|
||||
is available, stores metrics to ``pipeline:metrics:{context_id}`` as a
|
||||
hash field ``{step_name}`` → JSON ``{duration_s, rss_delta_kb}``.
|
||||
|
||||
Usage::
|
||||
|
||||
pl = PipelineLogger(task_id=self.request.id)
|
||||
|
||||
@timed_step("extract_objects", pl)
|
||||
def do_extraction(step_path):
|
||||
...
|
||||
|
||||
Or without a logger (metrics still stored to Redis if context_id given)::
|
||||
|
||||
@timed_step("extract_objects")
|
||||
def do_extraction(step_path):
|
||||
...
|
||||
"""
|
||||
def decorator(fn):
|
||||
@functools.wraps(fn)
|
||||
def wrapper(*args, **kwargs):
|
||||
pl = pipeline_logger
|
||||
rss_before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|
||||
t0 = time.monotonic()
|
||||
if pl:
|
||||
pl.step_start(step_name)
|
||||
try:
|
||||
result = fn(*args, **kwargs)
|
||||
duration = round(time.monotonic() - t0, 3)
|
||||
rss_after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|
||||
rss_delta_kb = rss_after - rss_before
|
||||
metrics = {
|
||||
"duration_s": duration,
|
||||
"rss_delta_kb": rss_delta_kb,
|
||||
}
|
||||
if pl:
|
||||
pl.step_done(step_name, duration_s=duration, result=metrics)
|
||||
else:
|
||||
_log.info(f"[{step_name}] done | {duration:.1f}s | rss_delta={rss_delta_kb}KB")
|
||||
_store_metrics(step_name, metrics, kwargs.get("context_id"))
|
||||
return result
|
||||
except Exception as exc:
|
||||
duration = round(time.monotonic() - t0, 3)
|
||||
if pl:
|
||||
pl.step_error(step_name, str(exc), exc)
|
||||
else:
|
||||
_log.exception(f"[{step_name}] ERROR — {exc}")
|
||||
raise
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
def _store_metrics(step_name: str, metrics: dict, context_id: str | None = None) -> None:
|
||||
"""Store step metrics to Redis hash (best-effort, never raises)."""
|
||||
if not context_id:
|
||||
return
|
||||
try:
|
||||
import json
|
||||
from app.config import settings
|
||||
import redis
|
||||
r = redis.from_url(settings.redis_url)
|
||||
key = f"pipeline:metrics:{context_id}"
|
||||
r.hset(key, step_name, json.dumps(metrics))
|
||||
r.expire(key, 86400) # 24h TTL
|
||||
except Exception:
|
||||
pass # metrics storage is non-critical
|
||||
|
||||
@@ -5,6 +5,7 @@ Covers:
|
||||
- regenerate_thumbnail — re-render thumbnail with updated per-part colours
|
||||
"""
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
|
||||
from app.tasks.celery_app import celery_app
|
||||
@@ -14,6 +15,29 @@ from app.core.pipeline_logger import PipelineLogger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _pipeline_session(tenant_id: str | None = None):
|
||||
"""Single DB engine + session for the entire task lifetime.
|
||||
|
||||
Replaces the previous pattern of creating 3-7 separate create_engine()
|
||||
+ Session() pairs per task invocation. Each create_engine() spins up a
|
||||
new connection pool, wasting ~50ms + one PG connection per call.
|
||||
"""
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import Session
|
||||
from app.config import settings as app_settings
|
||||
from app.core.tenant_context import set_tenant_context_sync
|
||||
|
||||
sync_url = app_settings.database_url.replace("+asyncpg", "")
|
||||
engine = create_engine(sync_url)
|
||||
try:
|
||||
with Session(engine) as session:
|
||||
set_tenant_context_sync(session, tenant_id)
|
||||
yield session
|
||||
finally:
|
||||
engine.dispose()
|
||||
|
||||
|
||||
@celery_app.task(bind=True, name="app.tasks.step_tasks.render_step_thumbnail", queue="asset_pipeline")
|
||||
def render_step_thumbnail(self, cad_file_id: str):
|
||||
"""Render the thumbnail for a freshly-processed STEP file.
|
||||
@@ -26,32 +50,24 @@ def render_step_thumbnail(self, cad_file_id: str):
|
||||
pl.step_start("render_step_thumbnail", {"cad_file_id": cad_file_id})
|
||||
logger.info(f"Rendering thumbnail for CAD file: {cad_file_id}")
|
||||
|
||||
# Resolve and log tenant context at task start (required for RLS)
|
||||
from app.core.tenant_context import resolve_tenant_id_for_cad, set_tenant_context_sync
|
||||
from app.core.tenant_context import resolve_tenant_id_for_cad
|
||||
_tenant_id = resolve_tenant_id_for_cad(cad_file_id)
|
||||
|
||||
# Compute and persist STEP file hash for STL cache lookups
|
||||
# ── Pre-render: compute hash ──────────────────────────────────────────
|
||||
try:
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import Session
|
||||
from app.config import settings as app_settings
|
||||
from app.models.cad_file import CadFile
|
||||
from app.domains.products.cache_service import compute_step_hash
|
||||
|
||||
sync_url = app_settings.database_url.replace("+asyncpg", "")
|
||||
_eng = create_engine(sync_url)
|
||||
with Session(_eng) as _sess:
|
||||
set_tenant_context_sync(_sess, _tenant_id)
|
||||
_cad = _sess.get(CadFile, cad_file_id)
|
||||
if _cad and _cad.stored_path and not _cad.step_file_hash:
|
||||
_hash = compute_step_hash(_cad.stored_path)
|
||||
_cad.step_file_hash = _hash
|
||||
_sess.commit()
|
||||
logger.info(f"Saved step_file_hash for {cad_file_id}: {_hash[:12]}…")
|
||||
_eng.dispose()
|
||||
with _pipeline_session(_tenant_id) as session:
|
||||
cad = session.get(CadFile, cad_file_id)
|
||||
if cad and cad.stored_path and not cad.step_file_hash:
|
||||
cad.step_file_hash = compute_step_hash(cad.stored_path)
|
||||
session.commit()
|
||||
logger.info(f"Saved step_file_hash for {cad_file_id}: {cad.step_file_hash[:12]}…")
|
||||
except Exception:
|
||||
logger.warning(f"step_file_hash computation failed for {cad_file_id} (non-fatal)")
|
||||
|
||||
# ── Render thumbnail ──────────────────────────────────────────────────
|
||||
try:
|
||||
from app.services.step_processor import regenerate_cad_thumbnail
|
||||
pl.info("render_step_thumbnail", "Calling regenerate_cad_thumbnail")
|
||||
@@ -63,101 +79,59 @@ def render_step_thumbnail(self, cad_file_id: str):
|
||||
logger.error(f"Thumbnail render failed for {cad_file_id}: {exc}")
|
||||
raise self.retry(exc=exc, countdown=30, max_retries=2)
|
||||
|
||||
# Extract bounding box from the thumbnail GLB generated by the renderer.
|
||||
# GLB bbox via trimesh is fast and avoids re-parsing the STEP file.
|
||||
# Falls back to cadquery STEP re-parse if GLB is not found.
|
||||
# ── Post-render: bbox + sharp edges + materials (single session) ──────
|
||||
try:
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import Session
|
||||
from app.config import settings as _cfg2
|
||||
from app.models.cad_file import CadFile as _CadFile2
|
||||
from app.models.cad_file import CadFile
|
||||
from app.domains.pipeline.tasks.extract_metadata import _bbox_from_glb, _bbox_from_step_cadquery
|
||||
|
||||
_sync_url2 = _cfg2.database_url.replace("+asyncpg", "")
|
||||
_eng2 = create_engine(_sync_url2)
|
||||
with Session(_eng2) as _sess2:
|
||||
set_tenant_context_sync(_sess2, _tenant_id)
|
||||
_cad2 = _sess2.get(_CadFile2, cad_file_id)
|
||||
_step_path = _cad2.stored_path if _cad2 else None
|
||||
_eng2.dispose()
|
||||
with _pipeline_session(_tenant_id) as session:
|
||||
cad = session.get(CadFile, cad_file_id)
|
||||
if not cad:
|
||||
logger.warning(f"CadFile {cad_file_id} not found in post-render phase")
|
||||
else:
|
||||
step_path = cad.stored_path
|
||||
attrs = cad.mesh_attributes or {}
|
||||
|
||||
if _step_path and not (_cad2.mesh_attributes or {}).get("dimensions_mm"):
|
||||
_step = Path(_step_path)
|
||||
_glb = _step.parent / f"{_step.stem}_thumbnail.glb"
|
||||
bbox_data = _bbox_from_glb(str(_glb)) or _bbox_from_step_cadquery(_step_path)
|
||||
if bbox_data:
|
||||
_eng2 = create_engine(_sync_url2)
|
||||
with Session(_eng2) as _sess2:
|
||||
set_tenant_context_sync(_sess2, _tenant_id)
|
||||
_cad2 = _sess2.get(_CadFile2, cad_file_id)
|
||||
if _cad2:
|
||||
_cad2.mesh_attributes = {**( _cad2.mesh_attributes or {}), **bbox_data}
|
||||
_sess2.commit()
|
||||
# Bounding box extraction
|
||||
if step_path and not attrs.get("dimensions_mm"):
|
||||
_step = Path(step_path)
|
||||
_glb = _step.parent / f"{_step.stem}_thumbnail.glb"
|
||||
bbox_data = _bbox_from_glb(str(_glb)) or _bbox_from_step_cadquery(step_path)
|
||||
if bbox_data:
|
||||
cad.mesh_attributes = {**attrs, **bbox_data}
|
||||
attrs = cad.mesh_attributes
|
||||
dims = bbox_data["dimensions_mm"]
|
||||
logger.info(
|
||||
f"bbox for {cad_file_id}: "
|
||||
f"{dims['x']}×{dims['y']}×{dims['z']} mm"
|
||||
)
|
||||
_eng2.dispose()
|
||||
logger.info(f"bbox for {cad_file_id}: {dims['x']}×{dims['y']}×{dims['z']} mm")
|
||||
|
||||
# Sharp edge extraction (PCurve-based, runs on render-worker with OCP)
|
||||
if step_path and "sharp_edge_pairs" not in attrs:
|
||||
try:
|
||||
from app.services.step_processor import extract_mesh_edge_data
|
||||
edge_data = extract_mesh_edge_data(step_path)
|
||||
if edge_data:
|
||||
cad.mesh_attributes = {**attrs, **edge_data}
|
||||
n_pairs = len(edge_data.get("sharp_edge_pairs", []))
|
||||
logger.info(f"Sharp edge data extracted for {cad_file_id}: {n_pairs} sharp edges")
|
||||
except Exception:
|
||||
logger.exception(f"Sharp edge extraction failed for {cad_file_id} (non-fatal)")
|
||||
|
||||
session.commit()
|
||||
|
||||
# WebSocket broadcast
|
||||
_tid = str(cad.tenant_id) if cad.tenant_id else None
|
||||
except Exception:
|
||||
logger.exception(f"bbox extraction failed for {cad_file_id} (non-fatal)")
|
||||
logger.exception(f"Post-render processing failed for {cad_file_id} (non-fatal)")
|
||||
_tid = None
|
||||
|
||||
# Extract sharp edge topology (PCurve-based) if not already present.
|
||||
# This runs on render-worker which has OCP (cadquery's OCC fork).
|
||||
try:
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import Session
|
||||
from app.config import settings as _cfg3
|
||||
from app.models.cad_file import CadFile as _CadFile3
|
||||
from app.services.step_processor import extract_mesh_edge_data
|
||||
|
||||
_sync_url3 = _cfg3.database_url.replace("+asyncpg", "")
|
||||
_eng3 = create_engine(_sync_url3)
|
||||
with Session(_eng3) as _sess3:
|
||||
set_tenant_context_sync(_sess3, _tenant_id)
|
||||
_cad3 = _sess3.get(_CadFile3, cad_file_id)
|
||||
_attrs = _cad3.mesh_attributes or {} if _cad3 else {}
|
||||
_step_path3 = _cad3.stored_path if _cad3 else None
|
||||
_eng3.dispose()
|
||||
|
||||
if _step_path3 and "sharp_edge_pairs" not in _attrs:
|
||||
edge_data = extract_mesh_edge_data(_step_path3)
|
||||
if edge_data:
|
||||
_eng3 = create_engine(_sync_url3)
|
||||
with Session(_eng3) as _sess3:
|
||||
set_tenant_context_sync(_sess3, _tenant_id)
|
||||
_cad3 = _sess3.get(_CadFile3, cad_file_id)
|
||||
if _cad3:
|
||||
_cad3.mesh_attributes = {**(_cad3.mesh_attributes or {}), **edge_data}
|
||||
_sess3.commit()
|
||||
n_pairs = len(edge_data.get("sharp_edge_pairs", []))
|
||||
logger.info(f"Sharp edge data extracted for {cad_file_id}: {n_pairs} sharp edges")
|
||||
_eng3.dispose()
|
||||
except Exception:
|
||||
logger.exception(f"Sharp edge extraction failed for {cad_file_id} (non-fatal)")
|
||||
|
||||
# Auto-populate materials now that parsed_objects are available
|
||||
# Auto-populate materials
|
||||
try:
|
||||
from app.domains.pipeline.tasks.extract_metadata import _auto_populate_materials_for_cad
|
||||
_auto_populate_materials_for_cad(cad_file_id, tenant_id=_tenant_id)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
f"Auto material population failed for cad_file {cad_file_id} (non-fatal)"
|
||||
)
|
||||
logger.exception(f"Auto material population failed for cad_file {cad_file_id} (non-fatal)")
|
||||
|
||||
# Broadcast WebSocket event for live UI updates
|
||||
# Broadcast WebSocket event
|
||||
try:
|
||||
from sqlalchemy import create_engine, select as sql_select2
|
||||
from sqlalchemy.orm import Session as _Session
|
||||
from app.config import settings as _cfg
|
||||
from app.models.cad_file import CadFile as _CadFile
|
||||
_sync_url = _cfg.database_url.replace("+asyncpg", "")
|
||||
_eng = create_engine(_sync_url)
|
||||
with _Session(_eng) as _s:
|
||||
set_tenant_context_sync(_s, _tenant_id)
|
||||
_cad = _s.get(_CadFile, cad_file_id)
|
||||
_tid = str(_cad.tenant_id) if _cad and _cad.tenant_id else None
|
||||
_eng.dispose()
|
||||
if _tid:
|
||||
from app.core.websocket import publish_event_sync
|
||||
publish_event_sync(_tid, {
|
||||
@@ -168,12 +142,11 @@ def render_step_thumbnail(self, cad_file_id: str):
|
||||
except Exception:
|
||||
logger.debug("WebSocket publish for CAD complete skipped (non-fatal)")
|
||||
|
||||
# Auto-generate geometry GLB so the 3D viewer is ready without manual trigger
|
||||
# Auto-generate geometry GLB
|
||||
try:
|
||||
from app.domains.pipeline.tasks.export_glb import generate_gltf_geometry_task
|
||||
generate_gltf_geometry_task.delay(cad_file_id)
|
||||
pl.info("render_step_thumbnail", f"Queued generate_gltf_geometry_task for {cad_file_id}")
|
||||
logger.info("render_step_thumbnail: queued generate_gltf_geometry_task for %s", cad_file_id)
|
||||
except Exception:
|
||||
logger.debug("Could not queue generate_gltf_geometry_task (non-fatal)")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user