Files
HartOMat/backend/app/domains/pipeline/tasks/extract_metadata.py
T
Hartmut cfccdd5397 feat: rich product metadata extraction from STEP files
Extract volume, surface area, part count, assembly hierarchy, and
complexity from STEP files via OCC B-rep analysis.

Backend:
- extract_rich_metadata() in step_processor.py: computes per-part volume
  (BRepGProp), surface area, triangle/vertex count, assembly depth,
  instance count, complexity score, largest part identification
- cad_metadata JSONB column on Product model (DB migration)
- Auto-populated during STEP processing (non-fatal, 10s timeout)
- Also stored in cad_files.mesh_attributes["rich_metadata"]
- Batch re-extract endpoint: POST /admin/settings/reextract-rich-metadata

AI Agent:
- search_products returns part_count, volume_cm3, complexity, largest_part
- query_database tool description documents cad_metadata schema

Frontend:
- ProductDetail page: CAD Metadata section with stat cards
  (parts, volume, surface area, complexity, triangles, assembly depth)
- Admin System Tools: "Re-extract Rich Metadata" button for backfill

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 18:49:50 +01:00

378 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""STEP metadata extraction tasks.
Covers:
- process_step_file — extract OCC objects + queue thumbnail
- reextract_cad_metadata — re-compute bounding-box for completed files
- _auto_populate_materials_for_cad — helper: fill cad_part_materials from Excel
- _bbox_from_glb / _bbox_from_step_cadquery — bbox helpers
"""
import logging
from pathlib import Path
from app.tasks.celery_app import celery_app
from app.core.task_logs import log_task_event
from app.core.pipeline_logger import PipelineLogger
logger = logging.getLogger(__name__)
def _bbox_from_glb(glb_path: str) -> dict | None:
"""Extract bounding box from a GLB file (meters → converted to mm).
Returns {"dimensions_mm": {x,y,z}, "bbox_center_mm": {x,y,z}} or None on failure.
OCC GLB output is in meters; multiply by 1000 to get mm.
"""
try:
import trimesh
p = Path(glb_path)
if not p.exists():
return None
scene = trimesh.load(str(p), force="scene")
bounds = getattr(scene, "bounds", None)
if bounds is None:
return None
mins, maxs = bounds
dims = maxs - mins
return {
"dimensions_mm": {
"x": round(float(dims[0]) * 1000, 2),
"y": round(float(dims[1]) * 1000, 2),
"z": round(float(dims[2]) * 1000, 2),
},
"bbox_center_mm": {
"x": round(float((mins[0] + maxs[0]) / 2) * 1000, 2),
"y": round(float((mins[1] + maxs[1]) / 2) * 1000, 2),
"z": round(float((mins[2] + maxs[2]) / 2) * 1000, 2),
},
}
except Exception as exc:
logger.debug(f"_bbox_from_glb failed for {glb_path}: {exc}")
return None
def _bbox_from_step_cadquery(step_path: str) -> dict | None:
"""Fallback: extract bounding box by re-parsing STEP via cadquery."""
try:
import cadquery as cq
bb = cq.importers.importStep(step_path).val().BoundingBox()
return {
"dimensions_mm": {
"x": round(bb.xlen, 2),
"y": round(bb.ylen, 2),
"z": round(bb.zlen, 2),
},
"bbox_center_mm": {
"x": round((bb.xmin + bb.xmax) / 2, 2),
"y": round((bb.ymin + bb.ymax) / 2, 2),
"z": round((bb.zmin + bb.zmax) / 2, 2),
},
}
except Exception as exc:
logger.debug(f"_bbox_from_step_cadquery failed for {step_path}: {exc}")
return None
@celery_app.task(bind=True, name="app.tasks.step_tasks.process_step_file", queue="step_processing")
def process_step_file(self, cad_file_id: str):
"""Process a STEP file: extract objects, generate thumbnail, convert to glTF.
After processing completes, auto-populate cad_part_materials from Excel
component data for any linked products that don't yet have materials assigned.
A per-file Redis lock (TTL = 10 min) prevents duplicate tasks from processing
the same file concurrently — e.g. when 'Process Unprocessed' is clicked while
a file is already being processed.
"""
import redis as redis_lib
from app.config import settings as app_settings
pl = PipelineLogger(task_id=self.request.id)
pl.step_start("process_step_file", {"cad_file_id": cad_file_id})
# Resolve and log tenant context at task start (required for RLS)
from app.core.tenant_context import resolve_tenant_id_for_cad
_tenant_id = resolve_tenant_id_for_cad(cad_file_id)
lock_key = f"step_processing_lock:{cad_file_id}"
r = redis_lib.from_url(app_settings.redis_url)
acquired = r.set(lock_key, "1", nx=True, ex=600) # 10-minute TTL
if not acquired:
logger.warning(f"STEP file {cad_file_id} is already being processed — skipping duplicate task")
return
try:
pl.info("process_step_file", f"Processing STEP file (metadata only): {cad_file_id}")
try:
from app.services.step_processor import extract_cad_metadata
extract_cad_metadata(cad_file_id, tenant_id=_tenant_id)
except Exception as exc:
pl.step_error("process_step_file", f"STEP metadata extraction failed: {exc}", exc)
r.delete(lock_key) # release lock so a retry can proceed
raise self.retry(exc=exc, countdown=60, max_retries=3)
# Extract rich metadata (volume, surface area, complexity, etc.) — non-fatal
try:
from sqlalchemy import create_engine, update as sql_update
from sqlalchemy.orm import Session as SyncSession
from app.config import settings as cfg
from app.services.step_processor import extract_rich_metadata
from app.models.cad_file import CadFile
from app.models.product import Product
from app.core.tenant_context import set_tenant_context_sync
eng = create_engine(cfg.database_url_sync)
try:
# Load stored_path for the cad file
with SyncSession(eng) as session:
set_tenant_context_sync(session, _tenant_id)
cad_file = session.get(CadFile, cad_file_id)
step_path = cad_file.stored_path if cad_file else None
if step_path:
rich_meta = extract_rich_metadata(str(step_path))
if rich_meta and rich_meta.get("part_count", 0) > 0:
with SyncSession(eng) as session:
set_tenant_context_sync(session, _tenant_id)
# Merge into cad_files.mesh_attributes
cad_file = session.get(CadFile, cad_file_id)
if cad_file:
existing_attrs = cad_file.mesh_attributes or {}
existing_attrs["rich_metadata"] = rich_meta
session.execute(
sql_update(CadFile)
.where(CadFile.id == cad_file_id)
.values(mesh_attributes=existing_attrs)
)
# Update all active products linked to this CAD file
session.execute(
sql_update(Product)
.where(Product.cad_file_id == cad_file_id, Product.is_active.is_(True))
.values(cad_metadata=rich_meta)
)
session.commit()
logger.info(
f"Rich metadata extracted for cad_file {cad_file_id}: "
f"{rich_meta.get('part_count')} parts, "
f"{rich_meta.get('total_volume_cm3', 0):.1f} cm³"
)
finally:
eng.dispose()
except Exception:
logger.exception(f"Rich metadata extraction failed for cad_file {cad_file_id} (non-fatal)")
finally:
r.delete(lock_key) # always release on completion or unhandled error
pl.step_done("process_step_file")
# Queue thumbnail rendering on the dedicated single-concurrency worker
from app.domains.pipeline.tasks.render_thumbnail import render_step_thumbnail
render_step_thumbnail.delay(cad_file_id)
def _auto_populate_materials_for_cad(cad_file_id: str, tenant_id: str | None = None) -> None:
"""Sync helper: auto-populate cad_part_materials from Excel for newly-processed CAD files.
Only fills products where cad_part_materials is empty or all-blank,
preventing overwrites of manually assigned materials.
"""
from sqlalchemy import create_engine, select as sql_select, update as sql_update
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
from app.models.product import Product
from app.api.routers.products import build_materials_from_excel
from app.services.step_processor import build_part_colors
from app.core.tenant_context import set_tenant_context_sync
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
set_tenant_context_sync(session, tenant_id)
# Load the CAD file to get parsed objects
cad_file = session.execute(
sql_select(CadFile).where(CadFile.id == cad_file_id)
).scalar_one_or_none()
if cad_file is None:
return
parsed_objects = cad_file.parsed_objects or {}
cad_parts: list[str] = parsed_objects.get("objects", [])
if not cad_parts:
return
# Find products linked to this CAD file that have Excel components
products = session.execute(
sql_select(Product).where(
Product.cad_file_id == cad_file.id,
Product.is_active.is_(True),
)
).scalars().all()
final_part_colors = None
for product in products:
excel_components: list[dict] = product.components or []
if not excel_components:
continue
# Only auto-fill when cad_part_materials is empty or all-blank
existing = product.cad_part_materials or []
if existing and any(m.get("material", "").strip() for m in existing):
continue # has at least one real material — don't overwrite
new_materials = build_materials_from_excel(cad_parts, excel_components)
session.execute(
sql_update(Product)
.where(Product.id == product.id)
.values(cad_part_materials=new_materials)
)
session.flush()
# Compute part colors; thumbnail queued once after the loop
try:
final_part_colors = build_part_colors(cad_parts, new_materials)
except Exception:
logger.exception(f"Part colors build failed for product {product.id}")
logger.info(
f"Auto-populated {len(new_materials)} materials for product {product.id} "
f"from {len(excel_components)} Excel components"
)
session.commit()
# Queue exactly ONE thumbnail regeneration per CAD file regardless of how many
# products were auto-populated. Queuing once-per-product multiplies the task
# count needlessly and causes the Redis queue depth to grow instead of shrink.
if final_part_colors is not None:
try:
from app.domains.pipeline.tasks.render_thumbnail import regenerate_thumbnail
regenerate_thumbnail.delay(str(cad_file_id), final_part_colors)
except Exception:
logger.exception(f"Thumbnail regen queue failed for cad_file {cad_file_id}")
eng.dispose()
@celery_app.task(name="app.tasks.step_tasks.reextract_rich_metadata_task", queue="step_processing")
def reextract_rich_metadata_task():
"""Batch re-extract rich metadata (volume, surface area, complexity) for all completed CAD files."""
from sqlalchemy import create_engine, select as sql_select, update as sql_update
from sqlalchemy.orm import Session as SyncSession
from app.config import settings as cfg
from app.models.cad_file import CadFile, ProcessingStatus
from app.models.product import Product
from app.core.tenant_context import set_tenant_context_sync
sync_url = cfg.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
updated = 0
failed = 0
try:
with SyncSession(eng) as session:
cad_files = session.execute(
sql_select(CadFile).where(
CadFile.processing_status == ProcessingStatus.completed,
CadFile.stored_path.isnot(None),
)
).scalars().all()
cad_entries = [(str(cf.id), cf.stored_path, cf.tenant_id) for cf in cad_files]
for cad_file_id, step_path, tenant_id in cad_entries:
try:
from app.services.step_processor import extract_rich_metadata
rich_meta = extract_rich_metadata(str(step_path))
if rich_meta and rich_meta.get("part_count", 0) > 0:
with SyncSession(eng) as session:
set_tenant_context_sync(session, tenant_id)
# Update mesh_attributes on cad_file
cad_file = session.get(CadFile, cad_file_id)
if cad_file:
existing_attrs = cad_file.mesh_attributes or {}
existing_attrs["rich_metadata"] = rich_meta
session.execute(
sql_update(CadFile)
.where(CadFile.id == cad_file_id)
.values(mesh_attributes=existing_attrs)
)
# Update all active products linked to this CAD file
session.execute(
sql_update(Product)
.where(Product.cad_file_id == cad_file_id, Product.is_active.is_(True))
.values(cad_metadata=rich_meta)
)
session.commit()
updated += 1
logger.info(
f"reextract_rich_metadata: {cad_file_id} -> "
f"{rich_meta.get('part_count')} parts, "
f"{rich_meta.get('total_volume_cm3', 0):.1f} cm3"
)
except Exception:
failed += 1
logger.exception(f"reextract_rich_metadata failed for cad_file {cad_file_id}")
finally:
eng.dispose()
logger.info(f"reextract_rich_metadata_task complete: {updated} updated, {failed} failed")
@celery_app.task(name="app.tasks.step_tasks.reextract_cad_metadata", queue="asset_pipeline")
def reextract_cad_metadata(cad_file_id: str):
"""Re-extract bounding-box dimensions for an already-completed CAD file.
Uses cadquery (available in render-worker) to compute dimensions_mm.
Updates mesh_attributes without changing processing_status or re-rendering.
Safe to run on completed files.
"""
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
pl = PipelineLogger(task_id=None)
pl.step_start("reextract_cad_metadata", {"cad_file_id": cad_file_id})
# Resolve and log tenant context at task start (required for RLS)
from app.core.tenant_context import resolve_tenant_id_for_cad, set_tenant_context_sync
_tenant_id = resolve_tenant_id_for_cad(cad_file_id)
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
set_tenant_context_sync(session, _tenant_id)
cad_file = session.get(CadFile, cad_file_id)
if not cad_file or not cad_file.stored_path:
logger.warning(f"reextract_cad_metadata: file not found {cad_file_id}")
eng.dispose()
return
step_path = cad_file.stored_path
try:
p = Path(step_path)
glb_path = p.parent / f"{p.stem}_thumbnail.glb"
patch = _bbox_from_glb(str(glb_path)) or _bbox_from_step_cadquery(step_path)
if patch:
with Session(eng) as session:
set_tenant_context_sync(session, _tenant_id)
cad_file = session.get(CadFile, cad_file_id)
if cad_file:
cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **patch}
session.commit()
dims = patch["dimensions_mm"]
pl.step_done("reextract_cad_metadata", result={
"dimensions_mm": f"{dims['x']}×{dims['y']}×{dims['z']} mm"
})
logger.info(
f"reextract_cad_metadata: {cad_file_id}"
f"{dims['x']}×{dims['y']}×{dims['z']} mm"
)
else:
logger.warning(f"reextract_cad_metadata: no bbox data for {cad_file_id}")
except Exception as exc:
pl.step_error("reextract_cad_metadata", str(exc), exc)
logger.error(f"reextract_cad_metadata failed for {cad_file_id}: {exc}")
finally:
eng.dispose()