refactor: rename thumbnail_rendering queue to asset_pipeline

The queue handles far more than thumbnails: OCC tessellation, USD master
generation, GLB production, order line renders, and workflow renders.
asset_pipeline better reflects its role as the render-worker's primary queue.

Updated all references in: task decorators, celery_app.py, beat_tasks.py,
docker-compose.yml worker command, worker.py MONITORED_QUEUES, admin.py,
CLAUDE.md, LEARNINGS.md, Dockerfile, helpTexts.ts, test files,
and all .claude/commands/*.md skill files.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-12 22:28:38 +01:00
parent e7b70a35ea
commit 1321ef2bd4
39 changed files with 540 additions and 122 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ CATALOG_SCRIPT = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) /
@celery_app.task(
name="app.domains.materials.tasks.refresh_asset_library_catalog",
queue="thumbnail_rendering",
queue="asset_pipeline",
bind=True,
max_retries=2,
default_retry_delay=30,
+2 -2
View File
@@ -14,8 +14,8 @@ class MediaAssetType(str, enum.Enum):
turntable = "turntable"
stl_low = "stl_low"
stl_high = "stl_high"
gltf_geometry = "gltf_geometry"
gltf_production = "gltf_production"
gltf_geometry = "gltf_geometry" # DEPRECATED: use usd_master — viewer GLB auto-generated as part of USD pipeline
gltf_production = "gltf_production" # DEPRECATED: use usd_master — high-quality production GLB superseded by USD master
blend_production = "blend_production"
usd_master = "usd_master"
@@ -13,7 +13,7 @@ from app.core.pipeline_logger import PipelineLogger
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name="app.tasks.step_tasks.generate_gltf_geometry_task", queue="thumbnail_rendering", max_retries=1)
@celery_app.task(bind=True, name="app.tasks.step_tasks.generate_gltf_geometry_task", queue="asset_pipeline", max_retries=1)
def generate_gltf_geometry_task(self, cad_file_id: str):
"""Export a geometry GLB directly from STEP via OCC (no STL intermediary).
@@ -83,25 +83,47 @@ def generate_gltf_geometry_task(self, cad_file_id: str):
settings_rows = session.execute(_select(_SysSetting)).scalars().all()
sys_settings = {s.key: s.value for s in settings_rows}
# Hash-based cache check: skip tessellation if file hasn't changed
step_file_hash = cad_file.step_file_hash
if step_file_hash:
from app.domains.media.models import MediaAsset, MediaAssetType
import uuid as _uuid_check
linear_deflection = float(sys_settings.get("scene_linear_deflection", "0.1"))
angular_deflection = float(sys_settings.get("scene_angular_deflection", "0.1"))
tessellation_engine = sys_settings.get("tessellation_engine", "occ")
# Hash-based cache check: skip tessellation if file and settings haven't changed
from app.domains.products.cache_service import compute_step_hash as _compute_step_hash
from app.domains.media.models import MediaAsset, MediaAssetType
import uuid as _uuid_check
_current_hash = _compute_step_hash(str(step_path_str))
_cache_hit_asset_id = None
# Composite cache key includes deflection settings so changing them invalidates cache
effective_cache_key = (
f"{_current_hash}:{linear_deflection}:{angular_deflection}:{tessellation_engine}"
if _current_hash else None
)
if effective_cache_key:
existing_geo = session.execute(
_select(MediaAsset).where(
MediaAsset.cad_file_id == _uuid_check.UUID(cad_file_id),
MediaAsset.asset_type == MediaAssetType.gltf_geometry,
)
).scalars().first()
if existing_geo:
logger.info("[CACHE] hash match — skipping geometry GLB tessellation for %s", cad_file_id)
pl.step_done("export_glb_geometry", result={"cached": True, "asset_id": str(existing_geo.id)})
_cache_hit_asset_id = str(existing_geo.id)
stored_key = (existing_geo.render_config or {}).get("cache_key", "") if existing_geo else ""
if stored_key == effective_cache_key:
_asset_disk_path = _Path(app_settings.upload_dir) / existing_geo.storage_key
if _asset_disk_path.exists():
logger.info("[CACHE] cache key match — skipping geometry GLB tessellation for %s", cad_file_id)
pl.step_done("export_glb_geometry", result={"cached": True, "asset_id": str(existing_geo.id)})
_cache_hit_asset_id = str(existing_geo.id)
else:
logger.info("[CACHE] cache key match but asset missing on disk — re-running tessellation for %s", cad_file_id)
else:
_cache_hit_asset_id = None
# Cache miss: update stored hash so next run can use it
cad_file.step_file_hash = _current_hash
session.commit()
else:
_cache_hit_asset_id = None
# No hash available: update stored hash and proceed
cad_file.step_file_hash = _current_hash
session.commit()
eng.dispose()
if _cache_hit_asset_id is not None:
@@ -112,10 +134,6 @@ def generate_gltf_geometry_task(self, cad_file_id: str):
logger.debug("Could not queue generate_usd_master_task from cache-hit path (non-fatal)")
return {"cached": True, "asset_id": _cache_hit_asset_id}
linear_deflection = float(sys_settings.get("scene_linear_deflection", "0.1"))
angular_deflection = float(sys_settings.get("scene_angular_deflection", "0.1"))
tessellation_engine = sys_settings.get("tessellation_engine", "occ")
step = _Path(step_path_str)
if not step.exists():
@@ -197,6 +215,7 @@ def generate_gltf_geometry_task(self, cad_file_id: str):
existing.storage_key = _key
existing.mime_type = "model/gltf-binary"
existing.file_size_bytes = _file_size
existing.render_config = {"cache_key": effective_cache_key}
if product_id:
existing.product_id = _uuid.UUID(product_id)
_sess.commit()
@@ -209,6 +228,7 @@ def generate_gltf_geometry_task(self, cad_file_id: str):
storage_key=_key,
mime_type="model/gltf-binary",
file_size_bytes=_file_size,
render_config={"cache_key": effective_cache_key},
)
_sess.add(asset)
_sess.commit()
@@ -234,7 +254,7 @@ def generate_gltf_geometry_task(self, cad_file_id: str):
@celery_app.task(
bind=True,
name="app.tasks.step_tasks.generate_gltf_production_task",
queue="thumbnail_rendering",
queue="asset_pipeline",
max_retries=2,
)
def generate_gltf_production_task(self, cad_file_id: str, product_id: str | None = None) -> dict:
@@ -511,7 +531,7 @@ def generate_gltf_production_task(self, cad_file_id: str, product_id: str | None
@celery_app.task(
bind=True,
name="app.tasks.step_tasks.generate_usd_master_task",
queue="thumbnail_rendering",
queue="asset_pipeline", # needs pxr (usd-core) + OCC — both only in render-worker
max_retries=1,
)
def generate_usd_master_task(self, cad_file_id: str) -> dict:
@@ -583,19 +603,44 @@ def generate_usd_master_task(self, cad_file_id: str) -> dict:
settings_rows = sess.execute(_sel(SystemSetting)).scalars().all()
sys_settings = {s.key: s.value for s in settings_rows}
# Hash-based cache check: skip tessellation if file hasn't changed
step_file_hash = cad_file.step_file_hash
if step_file_hash:
linear_deflection = float(sys_settings.get("render_linear_deflection", "0.03"))
angular_deflection = float(sys_settings.get("render_angular_deflection", "0.05"))
sharp_threshold = float(sys_settings.get("sharp_edge_threshold", "20.0"))
# Hash-based cache check: skip tessellation if file and settings haven't changed
from app.domains.products.cache_service import compute_step_hash as _compute_step_hash_usd
_current_hash_usd = _compute_step_hash_usd(str(step_path))
# Composite cache key includes deflection settings so changing them invalidates cache
effective_cache_key = (
f"{_current_hash_usd}:{linear_deflection}:{angular_deflection}:{sharp_threshold}"
if _current_hash_usd else None
)
if effective_cache_key:
existing_usd = sess.execute(
_sel(MediaAsset).where(
MediaAsset.cad_file_id == cad_file.id,
MediaAsset.asset_type == MediaAssetType.usd_master,
)
).scalars().first()
if existing_usd:
logger.info("[CACHE] hash match — skipping USD master tessellation for %s", cad_file_id)
pl.step_done("usd_master", result={"cached": True, "asset_id": str(existing_usd.id)})
_cache_hit_asset_id = str(existing_usd.id)
stored_key = (existing_usd.render_config or {}).get("cache_key", "") if existing_usd else ""
if stored_key == effective_cache_key:
_usd_disk_path = _Path(app_settings.upload_dir) / existing_usd.storage_key
if _usd_disk_path.exists():
logger.info("[CACHE] cache key match — skipping USD master tessellation for %s", cad_file_id)
pl.step_done("usd_master", result={"cached": True, "asset_id": str(existing_usd.id)})
_cache_hit_asset_id = str(existing_usd.id)
else:
logger.info("[CACHE] cache key match but USD asset missing on disk — re-running tessellation for %s", cad_file_id)
else:
# Cache miss: update stored hash so next run can use it
cad_file.step_file_hash = _current_hash_usd
sess.commit()
else:
# No hash available: update stored hash and proceed
cad_file.step_file_hash = _current_hash_usd
sess.commit()
eng.dispose()
if _cache_hit_asset_id is not None:
@@ -606,10 +651,6 @@ def generate_usd_master_task(self, cad_file_id: str) -> dict:
pl.step_error("usd_master", err, None)
raise RuntimeError(err)
linear_deflection = float(sys_settings.get("render_linear_deflection", "0.03"))
angular_deflection = float(sys_settings.get("render_angular_deflection", "0.05"))
sharp_threshold = float(sys_settings.get("sharp_edge_threshold", "20.0"))
output_path = step_path.parent / f"{step_path.stem}_master.usd"
scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
script_path = scripts_dir / "export_step_to_usd.py"
@@ -675,6 +716,7 @@ def generate_usd_master_task(self, cad_file_id: str) -> dict:
existing.storage_key = _key
existing.mime_type = "model/vnd.usd"
existing.file_size_bytes = _file_size
existing.render_config = {"cache_key": effective_cache_key}
sess2.commit()
asset_id = str(existing.id)
else:
@@ -684,6 +726,7 @@ def generate_usd_master_task(self, cad_file_id: str) -> dict:
storage_key=_key,
mime_type="model/vnd.usd",
file_size_bytes=_file_size,
render_config={"cache_key": effective_cache_key},
)
sess2.add(asset)
sess2.commit()
@@ -104,7 +104,7 @@ def process_step_file(self, cad_file_id: str):
pl.info("process_step_file", f"Processing STEP file (metadata only): {cad_file_id}")
try:
from app.services.step_processor import extract_cad_metadata
extract_cad_metadata(cad_file_id)
extract_cad_metadata(cad_file_id, tenant_id=_tenant_id)
except Exception as exc:
pl.step_error("process_step_file", f"STEP metadata extraction failed: {exc}", exc)
r.delete(lock_key) # release lock so a retry can proceed
@@ -119,7 +119,7 @@ def process_step_file(self, cad_file_id: str):
render_step_thumbnail.delay(cad_file_id)
def _auto_populate_materials_for_cad(cad_file_id: str) -> None:
def _auto_populate_materials_for_cad(cad_file_id: str, tenant_id: str | None = None) -> None:
"""Sync helper: auto-populate cad_part_materials from Excel for newly-processed CAD files.
Only fills products where cad_part_materials is empty or all-blank,
@@ -132,10 +132,12 @@ def _auto_populate_materials_for_cad(cad_file_id: str) -> None:
from app.models.product import Product
from app.api.routers.products import build_materials_from_excel
from app.services.step_processor import build_part_colors
from app.core.tenant_context import set_tenant_context_sync
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
set_tenant_context_sync(session, tenant_id)
# Load the CAD file to get parsed objects
cad_file = session.execute(
sql_select(CadFile).where(CadFile.id == cad_file_id)
@@ -201,7 +203,7 @@ def _auto_populate_materials_for_cad(cad_file_id: str) -> None:
eng.dispose()
@celery_app.task(name="app.tasks.step_tasks.reextract_cad_metadata", queue="thumbnail_rendering")
@celery_app.task(name="app.tasks.step_tasks.reextract_cad_metadata", queue="asset_pipeline")
def reextract_cad_metadata(cad_file_id: str):
"""Re-extract bounding-box dimensions for an already-completed CAD file.
@@ -20,7 +20,7 @@ def dispatch_order_line_render(order_line_id: str):
render_order_line_task.delay(order_line_id)
@celery_app.task(bind=True, name="app.tasks.step_tasks.render_order_line_task", queue="thumbnail_rendering", max_retries=3)
@celery_app.task(bind=True, name="app.tasks.step_tasks.render_order_line_task", queue="asset_pipeline", max_retries=3)
def render_order_line_task(self, order_line_id: str):
"""Render a specific output type for an order line.
@@ -14,11 +14,11 @@ from app.core.pipeline_logger import PipelineLogger
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name="app.tasks.step_tasks.render_step_thumbnail", queue="thumbnail_rendering")
@celery_app.task(bind=True, name="app.tasks.step_tasks.render_step_thumbnail", queue="asset_pipeline")
def render_step_thumbnail(self, cad_file_id: str):
"""Render the thumbnail for a freshly-processed STEP file.
Runs on the dedicated thumbnail_rendering queue (concurrency=1) so the
Runs on the dedicated asset_pipeline queue (concurrency=1) so the
blender-renderer service is never overwhelmed by concurrent requests.
On success, also auto-populates materials and marks the CadFile as completed.
"""
@@ -139,7 +139,7 @@ def render_step_thumbnail(self, cad_file_id: str):
# Auto-populate materials now that parsed_objects are available
try:
from app.domains.pipeline.tasks.extract_metadata import _auto_populate_materials_for_cad
_auto_populate_materials_for_cad(cad_file_id)
_auto_populate_materials_for_cad(cad_file_id, tenant_id=_tenant_id)
except Exception:
logger.exception(
f"Auto material population failed for cad_file {cad_file_id} (non-fatal)"
@@ -180,7 +180,7 @@ def render_step_thumbnail(self, cad_file_id: str):
pl.step_done("render_step_thumbnail")
@celery_app.task(bind=True, name="app.tasks.step_tasks.regenerate_thumbnail", queue="thumbnail_rendering")
@celery_app.task(bind=True, name="app.tasks.step_tasks.regenerate_thumbnail", queue="asset_pipeline")
def regenerate_thumbnail(self, cad_file_id: str, part_colors: dict):
"""Regenerate thumbnail with per-part colours."""
pl = PipelineLogger(task_id=self.request.id)
+7 -7
View File
@@ -1,6 +1,6 @@
"""Rendering domain tasks — Celery tasks for Blender-based rendering.
These tasks run on the `thumbnail_rendering` queue in the render-worker
These tasks run on the `asset_pipeline` queue in the render-worker
container, which has Blender and cadquery available.
Phase A2: Initial implementation replacing the blender-renderer HTTP service.
@@ -48,7 +48,7 @@ def _update_workflow_run_status(order_line_id: str, status: str, error: str | No
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.render_still_task",
queue="thumbnail_rendering",
queue="asset_pipeline",
max_retries=2,
)
def render_still_task(
@@ -150,7 +150,7 @@ def render_still_task(
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.render_turntable_task",
queue="thumbnail_rendering",
queue="asset_pipeline",
max_retries=2,
)
def render_turntable_task(
@@ -391,7 +391,7 @@ def _resolve_step_path_for_order_line(order_line_id: str) -> tuple[str | None, s
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.render_order_line_still_task",
queue="thumbnail_rendering",
queue="asset_pipeline",
max_retries=2,
)
def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
@@ -509,7 +509,7 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.export_gltf_for_order_line_task",
queue="thumbnail_rendering",
queue="asset_pipeline",
max_retries=1,
)
def export_gltf_for_order_line_task(self, order_line_id: str) -> dict:
@@ -555,7 +555,7 @@ def export_gltf_for_order_line_task(self, order_line_id: str) -> dict:
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.export_blend_for_order_line_task",
queue="thumbnail_rendering",
queue="asset_pipeline",
max_retries=1,
)
def export_blend_for_order_line_task(self, order_line_id: str) -> dict:
@@ -646,7 +646,7 @@ def export_blend_for_order_line_task(self, order_line_id: str) -> dict:
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.apply_asset_library_materials_task",
queue="thumbnail_rendering",
queue="asset_pipeline",
max_retries=1,
)
def apply_asset_library_materials_task(self, order_line_id: str, asset_library_id: str) -> dict: