1321ef2bd4
The queue handles far more than thumbnails: OCC tessellation, USD master generation, GLB production, order line renders, and workflow renders. asset_pipeline better reflects its role as the render-worker's primary queue. Updated all references in: task decorators, celery_app.py, beat_tasks.py, docker-compose.yml worker command, worker.py MONITORED_QUEUES, admin.py, CLAUDE.md, LEARNINGS.md, Dockerfile, helpTexts.ts, test files, and all .claude/commands/*.md skill files. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
263 lines
11 KiB
Python
263 lines
11 KiB
Python
"""STEP metadata extraction tasks.
|
||
|
||
Covers:
|
||
- process_step_file — extract OCC objects + queue thumbnail
|
||
- reextract_cad_metadata — re-compute bounding-box for completed files
|
||
- _auto_populate_materials_for_cad — helper: fill cad_part_materials from Excel
|
||
- _bbox_from_glb / _bbox_from_step_cadquery — bbox helpers
|
||
"""
|
||
import logging
|
||
from pathlib import Path
|
||
|
||
from app.tasks.celery_app import celery_app
|
||
from app.core.task_logs import log_task_event
|
||
from app.core.pipeline_logger import PipelineLogger
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _bbox_from_glb(glb_path: str) -> dict | None:
|
||
"""Extract bounding box from a GLB file (meters → converted to mm).
|
||
|
||
Returns {"dimensions_mm": {x,y,z}, "bbox_center_mm": {x,y,z}} or None on failure.
|
||
OCC GLB output is in meters; multiply by 1000 to get mm.
|
||
"""
|
||
try:
|
||
import trimesh
|
||
p = Path(glb_path)
|
||
if not p.exists():
|
||
return None
|
||
scene = trimesh.load(str(p), force="scene")
|
||
bounds = getattr(scene, "bounds", None)
|
||
if bounds is None:
|
||
return None
|
||
mins, maxs = bounds
|
||
dims = maxs - mins
|
||
return {
|
||
"dimensions_mm": {
|
||
"x": round(float(dims[0]) * 1000, 2),
|
||
"y": round(float(dims[1]) * 1000, 2),
|
||
"z": round(float(dims[2]) * 1000, 2),
|
||
},
|
||
"bbox_center_mm": {
|
||
"x": round(float((mins[0] + maxs[0]) / 2) * 1000, 2),
|
||
"y": round(float((mins[1] + maxs[1]) / 2) * 1000, 2),
|
||
"z": round(float((mins[2] + maxs[2]) / 2) * 1000, 2),
|
||
},
|
||
}
|
||
except Exception as exc:
|
||
logger.debug(f"_bbox_from_glb failed for {glb_path}: {exc}")
|
||
return None
|
||
|
||
|
||
def _bbox_from_step_cadquery(step_path: str) -> dict | None:
|
||
"""Fallback: extract bounding box by re-parsing STEP via cadquery."""
|
||
try:
|
||
import cadquery as cq
|
||
bb = cq.importers.importStep(step_path).val().BoundingBox()
|
||
return {
|
||
"dimensions_mm": {
|
||
"x": round(bb.xlen, 2),
|
||
"y": round(bb.ylen, 2),
|
||
"z": round(bb.zlen, 2),
|
||
},
|
||
"bbox_center_mm": {
|
||
"x": round((bb.xmin + bb.xmax) / 2, 2),
|
||
"y": round((bb.ymin + bb.ymax) / 2, 2),
|
||
"z": round((bb.zmin + bb.zmax) / 2, 2),
|
||
},
|
||
}
|
||
except Exception as exc:
|
||
logger.debug(f"_bbox_from_step_cadquery failed for {step_path}: {exc}")
|
||
return None
|
||
|
||
|
||
@celery_app.task(bind=True, name="app.tasks.step_tasks.process_step_file", queue="step_processing")
|
||
def process_step_file(self, cad_file_id: str):
|
||
"""Process a STEP file: extract objects, generate thumbnail, convert to glTF.
|
||
|
||
After processing completes, auto-populate cad_part_materials from Excel
|
||
component data for any linked products that don't yet have materials assigned.
|
||
|
||
A per-file Redis lock (TTL = 10 min) prevents duplicate tasks from processing
|
||
the same file concurrently — e.g. when 'Process Unprocessed' is clicked while
|
||
a file is already being processed.
|
||
"""
|
||
import redis as redis_lib
|
||
from app.config import settings as app_settings
|
||
|
||
pl = PipelineLogger(task_id=self.request.id)
|
||
pl.step_start("process_step_file", {"cad_file_id": cad_file_id})
|
||
|
||
# Resolve and log tenant context at task start (required for RLS)
|
||
from app.core.tenant_context import resolve_tenant_id_for_cad
|
||
_tenant_id = resolve_tenant_id_for_cad(cad_file_id)
|
||
|
||
lock_key = f"step_processing_lock:{cad_file_id}"
|
||
r = redis_lib.from_url(app_settings.redis_url)
|
||
acquired = r.set(lock_key, "1", nx=True, ex=600) # 10-minute TTL
|
||
if not acquired:
|
||
logger.warning(f"STEP file {cad_file_id} is already being processed — skipping duplicate task")
|
||
return
|
||
|
||
try:
|
||
pl.info("process_step_file", f"Processing STEP file (metadata only): {cad_file_id}")
|
||
try:
|
||
from app.services.step_processor import extract_cad_metadata
|
||
extract_cad_metadata(cad_file_id, tenant_id=_tenant_id)
|
||
except Exception as exc:
|
||
pl.step_error("process_step_file", f"STEP metadata extraction failed: {exc}", exc)
|
||
r.delete(lock_key) # release lock so a retry can proceed
|
||
raise self.retry(exc=exc, countdown=60, max_retries=3)
|
||
finally:
|
||
r.delete(lock_key) # always release on completion or unhandled error
|
||
|
||
pl.step_done("process_step_file")
|
||
|
||
# Queue thumbnail rendering on the dedicated single-concurrency worker
|
||
from app.domains.pipeline.tasks.render_thumbnail import render_step_thumbnail
|
||
render_step_thumbnail.delay(cad_file_id)
|
||
|
||
|
||
def _auto_populate_materials_for_cad(cad_file_id: str, tenant_id: str | None = None) -> None:
|
||
"""Sync helper: auto-populate cad_part_materials from Excel for newly-processed CAD files.
|
||
|
||
Only fills products where cad_part_materials is empty or all-blank,
|
||
preventing overwrites of manually assigned materials.
|
||
"""
|
||
from sqlalchemy import create_engine, select as sql_select, update as sql_update
|
||
from sqlalchemy.orm import Session
|
||
from app.config import settings as app_settings
|
||
from app.models.cad_file import CadFile
|
||
from app.models.product import Product
|
||
from app.api.routers.products import build_materials_from_excel
|
||
from app.services.step_processor import build_part_colors
|
||
from app.core.tenant_context import set_tenant_context_sync
|
||
|
||
sync_url = app_settings.database_url.replace("+asyncpg", "")
|
||
eng = create_engine(sync_url)
|
||
with Session(eng) as session:
|
||
set_tenant_context_sync(session, tenant_id)
|
||
# Load the CAD file to get parsed objects
|
||
cad_file = session.execute(
|
||
sql_select(CadFile).where(CadFile.id == cad_file_id)
|
||
).scalar_one_or_none()
|
||
if cad_file is None:
|
||
return
|
||
|
||
parsed_objects = cad_file.parsed_objects or {}
|
||
cad_parts: list[str] = parsed_objects.get("objects", [])
|
||
if not cad_parts:
|
||
return
|
||
|
||
# Find products linked to this CAD file that have Excel components
|
||
products = session.execute(
|
||
sql_select(Product).where(
|
||
Product.cad_file_id == cad_file.id,
|
||
Product.is_active.is_(True),
|
||
)
|
||
).scalars().all()
|
||
|
||
final_part_colors = None
|
||
for product in products:
|
||
excel_components: list[dict] = product.components or []
|
||
if not excel_components:
|
||
continue
|
||
|
||
# Only auto-fill when cad_part_materials is empty or all-blank
|
||
existing = product.cad_part_materials or []
|
||
if existing and any(m.get("material", "").strip() for m in existing):
|
||
continue # has at least one real material — don't overwrite
|
||
|
||
new_materials = build_materials_from_excel(cad_parts, excel_components)
|
||
session.execute(
|
||
sql_update(Product)
|
||
.where(Product.id == product.id)
|
||
.values(cad_part_materials=new_materials)
|
||
)
|
||
session.flush()
|
||
|
||
# Compute part colors; thumbnail queued once after the loop
|
||
try:
|
||
final_part_colors = build_part_colors(cad_parts, new_materials)
|
||
except Exception:
|
||
logger.exception(f"Part colors build failed for product {product.id}")
|
||
|
||
logger.info(
|
||
f"Auto-populated {len(new_materials)} materials for product {product.id} "
|
||
f"from {len(excel_components)} Excel components"
|
||
)
|
||
|
||
session.commit()
|
||
|
||
# Queue exactly ONE thumbnail regeneration per CAD file regardless of how many
|
||
# products were auto-populated. Queuing once-per-product multiplies the task
|
||
# count needlessly and causes the Redis queue depth to grow instead of shrink.
|
||
if final_part_colors is not None:
|
||
try:
|
||
from app.domains.pipeline.tasks.render_thumbnail import regenerate_thumbnail
|
||
regenerate_thumbnail.delay(str(cad_file_id), final_part_colors)
|
||
except Exception:
|
||
logger.exception(f"Thumbnail regen queue failed for cad_file {cad_file_id}")
|
||
|
||
eng.dispose()
|
||
|
||
|
||
@celery_app.task(name="app.tasks.step_tasks.reextract_cad_metadata", queue="asset_pipeline")
|
||
def reextract_cad_metadata(cad_file_id: str):
|
||
"""Re-extract bounding-box dimensions for an already-completed CAD file.
|
||
|
||
Uses cadquery (available in render-worker) to compute dimensions_mm.
|
||
Updates mesh_attributes without changing processing_status or re-rendering.
|
||
Safe to run on completed files.
|
||
"""
|
||
from sqlalchemy import create_engine
|
||
from sqlalchemy.orm import Session
|
||
from app.config import settings as app_settings
|
||
from app.models.cad_file import CadFile
|
||
|
||
pl = PipelineLogger(task_id=None)
|
||
pl.step_start("reextract_cad_metadata", {"cad_file_id": cad_file_id})
|
||
|
||
# Resolve and log tenant context at task start (required for RLS)
|
||
from app.core.tenant_context import resolve_tenant_id_for_cad, set_tenant_context_sync
|
||
_tenant_id = resolve_tenant_id_for_cad(cad_file_id)
|
||
|
||
sync_url = app_settings.database_url.replace("+asyncpg", "")
|
||
eng = create_engine(sync_url)
|
||
with Session(eng) as session:
|
||
set_tenant_context_sync(session, _tenant_id)
|
||
cad_file = session.get(CadFile, cad_file_id)
|
||
if not cad_file or not cad_file.stored_path:
|
||
logger.warning(f"reextract_cad_metadata: file not found {cad_file_id}")
|
||
eng.dispose()
|
||
return
|
||
step_path = cad_file.stored_path
|
||
|
||
try:
|
||
p = Path(step_path)
|
||
glb_path = p.parent / f"{p.stem}_thumbnail.glb"
|
||
patch = _bbox_from_glb(str(glb_path)) or _bbox_from_step_cadquery(step_path)
|
||
if patch:
|
||
with Session(eng) as session:
|
||
set_tenant_context_sync(session, _tenant_id)
|
||
cad_file = session.get(CadFile, cad_file_id)
|
||
if cad_file:
|
||
cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **patch}
|
||
session.commit()
|
||
dims = patch["dimensions_mm"]
|
||
pl.step_done("reextract_cad_metadata", result={
|
||
"dimensions_mm": f"{dims['x']}×{dims['y']}×{dims['z']} mm"
|
||
})
|
||
logger.info(
|
||
f"reextract_cad_metadata: {cad_file_id} → "
|
||
f"{dims['x']}×{dims['y']}×{dims['z']} mm"
|
||
)
|
||
else:
|
||
logger.warning(f"reextract_cad_metadata: no bbox data for {cad_file_id}")
|
||
except Exception as exc:
|
||
pl.step_error("reextract_cad_metadata", str(exc), exc)
|
||
logger.error(f"reextract_cad_metadata failed for {cad_file_id}: {exc}")
|
||
finally:
|
||
eng.dispose()
|