refactor(section-a+c): decompose step_tasks.py into pipeline domain

Section A — 1172-line step_tasks.py split into 4 focused modules under
backend/app/domains/pipeline/tasks/:
- extract_metadata.py: process_step_file, reextract_cad_metadata,
  _auto_populate_materials_for_cad, bbox helpers + PipelineLogger
- render_thumbnail.py: render_step_thumbnail, regenerate_thumbnail
  + PipelineLogger instrumentation
- render_order_line.py: dispatch_order_line_render, render_order_line_task
  (full still+turntable paths) + PipelineLogger step tracking
- export_glb.py: generate_gltf_geometry_task, generate_gltf_production_task

All task names preserved (app.tasks.step_tasks.*) for backward compat.
step_tasks.py is now a 23-line import-only shim.

Section C — celery_app.py task routing:
- app.domains.pipeline.tasks.* → step_processing (primary route)
- All 4 new modules added to include list for Celery discovery
- app.tasks.step_tasks.* kept as legacy fallback for in-flight tasks

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-08 20:43:56 +01:00
parent 07e3d1e026
commit c6556434d6
8 changed files with 1307 additions and 1175 deletions
@@ -0,0 +1,250 @@
"""STEP metadata extraction tasks.
Covers:
- process_step_file — extract OCC objects + queue thumbnail
- reextract_cad_metadata — re-compute bounding-box for completed files
- _auto_populate_materials_for_cad — helper: fill cad_part_materials from Excel
- _bbox_from_glb / _bbox_from_step_cadquery — bbox helpers
"""
import logging
from pathlib import Path
from app.tasks.celery_app import celery_app
from app.core.task_logs import log_task_event
from app.core.pipeline_logger import PipelineLogger
logger = logging.getLogger(__name__)
def _bbox_from_glb(glb_path: str) -> dict | None:
"""Extract bounding box from a GLB file (meters → converted to mm).
Returns {"dimensions_mm": {x,y,z}, "bbox_center_mm": {x,y,z}} or None on failure.
OCC GLB output is in meters; multiply by 1000 to get mm.
"""
try:
import trimesh
p = Path(glb_path)
if not p.exists():
return None
scene = trimesh.load(str(p), force="scene")
bounds = getattr(scene, "bounds", None)
if bounds is None:
return None
mins, maxs = bounds
dims = maxs - mins
return {
"dimensions_mm": {
"x": round(float(dims[0]) * 1000, 2),
"y": round(float(dims[1]) * 1000, 2),
"z": round(float(dims[2]) * 1000, 2),
},
"bbox_center_mm": {
"x": round(float((mins[0] + maxs[0]) / 2) * 1000, 2),
"y": round(float((mins[1] + maxs[1]) / 2) * 1000, 2),
"z": round(float((mins[2] + maxs[2]) / 2) * 1000, 2),
},
}
except Exception as exc:
logger.debug(f"_bbox_from_glb failed for {glb_path}: {exc}")
return None
def _bbox_from_step_cadquery(step_path: str) -> dict | None:
"""Fallback: extract bounding box by re-parsing STEP via cadquery."""
try:
import cadquery as cq
bb = cq.importers.importStep(step_path).val().BoundingBox()
return {
"dimensions_mm": {
"x": round(bb.xlen, 2),
"y": round(bb.ylen, 2),
"z": round(bb.zlen, 2),
},
"bbox_center_mm": {
"x": round((bb.xmin + bb.xmax) / 2, 2),
"y": round((bb.ymin + bb.ymax) / 2, 2),
"z": round((bb.zmin + bb.zmax) / 2, 2),
},
}
except Exception as exc:
logger.debug(f"_bbox_from_step_cadquery failed for {step_path}: {exc}")
return None
@celery_app.task(bind=True, name="app.tasks.step_tasks.process_step_file", queue="step_processing")
def process_step_file(self, cad_file_id: str):
"""Process a STEP file: extract objects, generate thumbnail, convert to glTF.
After processing completes, auto-populate cad_part_materials from Excel
component data for any linked products that don't yet have materials assigned.
A per-file Redis lock (TTL = 10 min) prevents duplicate tasks from processing
the same file concurrently — e.g. when 'Process Unprocessed' is clicked while
a file is already being processed.
"""
import redis as redis_lib
from app.config import settings as app_settings
pl = PipelineLogger(task_id=self.request.id)
pl.step_start("process_step_file", {"cad_file_id": cad_file_id})
lock_key = f"step_processing_lock:{cad_file_id}"
r = redis_lib.from_url(app_settings.redis_url)
acquired = r.set(lock_key, "1", nx=True, ex=600) # 10-minute TTL
if not acquired:
logger.warning(f"STEP file {cad_file_id} is already being processed — skipping duplicate task")
return
try:
pl.info("process_step_file", f"Processing STEP file (metadata only): {cad_file_id}")
try:
from app.services.step_processor import extract_cad_metadata
extract_cad_metadata(cad_file_id)
except Exception as exc:
pl.step_error("process_step_file", f"STEP metadata extraction failed: {exc}", exc)
r.delete(lock_key) # release lock so a retry can proceed
raise self.retry(exc=exc, countdown=60, max_retries=3)
finally:
r.delete(lock_key) # always release on completion or unhandled error
pl.step_done("process_step_file")
# Queue thumbnail rendering on the dedicated single-concurrency worker
from app.domains.pipeline.tasks.render_thumbnail import render_step_thumbnail
render_step_thumbnail.delay(cad_file_id)
def _auto_populate_materials_for_cad(cad_file_id: str) -> None:
"""Sync helper: auto-populate cad_part_materials from Excel for newly-processed CAD files.
Only fills products where cad_part_materials is empty or all-blank,
preventing overwrites of manually assigned materials.
"""
from sqlalchemy import create_engine, select as sql_select, update as sql_update
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
from app.models.product import Product
from app.api.routers.products import build_materials_from_excel
from app.services.step_processor import build_part_colors
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
# Load the CAD file to get parsed objects
cad_file = session.execute(
sql_select(CadFile).where(CadFile.id == cad_file_id)
).scalar_one_or_none()
if cad_file is None:
return
parsed_objects = cad_file.parsed_objects or {}
cad_parts: list[str] = parsed_objects.get("objects", [])
if not cad_parts:
return
# Find products linked to this CAD file that have Excel components
products = session.execute(
sql_select(Product).where(
Product.cad_file_id == cad_file.id,
Product.is_active.is_(True),
)
).scalars().all()
final_part_colors = None
for product in products:
excel_components: list[dict] = product.components or []
if not excel_components:
continue
# Only auto-fill when cad_part_materials is empty or all-blank
existing = product.cad_part_materials or []
if existing and any(m.get("material", "").strip() for m in existing):
continue # has at least one real material — don't overwrite
new_materials = build_materials_from_excel(cad_parts, excel_components)
session.execute(
sql_update(Product)
.where(Product.id == product.id)
.values(cad_part_materials=new_materials)
)
session.flush()
# Compute part colors; thumbnail queued once after the loop
try:
final_part_colors = build_part_colors(cad_parts, new_materials)
except Exception:
logger.exception(f"Part colors build failed for product {product.id}")
logger.info(
f"Auto-populated {len(new_materials)} materials for product {product.id} "
f"from {len(excel_components)} Excel components"
)
session.commit()
# Queue exactly ONE thumbnail regeneration per CAD file regardless of how many
# products were auto-populated. Queuing once-per-product multiplies the task
# count needlessly and causes the Redis queue depth to grow instead of shrink.
if final_part_colors is not None:
try:
from app.domains.pipeline.tasks.render_thumbnail import regenerate_thumbnail
regenerate_thumbnail.delay(str(cad_file_id), final_part_colors)
except Exception:
logger.exception(f"Thumbnail regen queue failed for cad_file {cad_file_id}")
eng.dispose()
@celery_app.task(name="app.tasks.step_tasks.reextract_cad_metadata", queue="thumbnail_rendering")
def reextract_cad_metadata(cad_file_id: str):
"""Re-extract bounding-box dimensions for an already-completed CAD file.
Uses cadquery (available in render-worker) to compute dimensions_mm.
Updates mesh_attributes without changing processing_status or re-rendering.
Safe to run on completed files.
"""
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
pl = PipelineLogger(task_id=None)
pl.step_start("reextract_cad_metadata", {"cad_file_id": cad_file_id})
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
cad_file = session.get(CadFile, cad_file_id)
if not cad_file or not cad_file.stored_path:
logger.warning(f"reextract_cad_metadata: file not found {cad_file_id}")
eng.dispose()
return
step_path = cad_file.stored_path
try:
p = Path(step_path)
glb_path = p.parent / f"{p.stem}_thumbnail.glb"
patch = _bbox_from_glb(str(glb_path)) or _bbox_from_step_cadquery(step_path)
if patch:
with Session(eng) as session:
cad_file = session.get(CadFile, cad_file_id)
if cad_file:
cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **patch}
session.commit()
dims = patch["dimensions_mm"]
pl.step_done("reextract_cad_metadata", result={
"dimensions_mm": f"{dims['x']}×{dims['y']}×{dims['z']} mm"
})
logger.info(
f"reextract_cad_metadata: {cad_file_id}"
f"{dims['x']}×{dims['y']}×{dims['z']} mm"
)
else:
logger.warning(f"reextract_cad_metadata: no bbox data for {cad_file_id}")
except Exception as exc:
pl.step_error("reextract_cad_metadata", str(exc), exc)
logger.error(f"reextract_cad_metadata failed for {cad_file_id}: {exc}")
finally:
eng.dispose()