"""GLB/GLTF and USD export tasks. Covers: - generate_gltf_geometry_task — OCC STEP → geometry GLB (fast preview) - generate_usd_master_task — OCC STEP → USD canonical scene (pxr authoring) """ import logging from app.tasks.celery_app import celery_app from app.core.task_logs import log_task_event from app.core.pipeline_logger import PipelineLogger logger = logging.getLogger(__name__) @celery_app.task(bind=True, name="app.tasks.step_tasks.generate_gltf_geometry_task", queue="asset_pipeline", max_retries=1) def generate_gltf_geometry_task(self, cad_file_id: str): """Export a geometry GLB directly from STEP via OCC (no STL intermediary). Pipeline: 1. Reads STEP file directly (no STL needed) 2. Builds color_map from product.cad_part_materials (hex colors) 3. Runs export_step_to_gltf.py (Python/OCP): STEP → GLB with per-part colors 4. Stores result as gltf_geometry MediaAsset (replaces any existing one) Output is in meters, Y-up (glTF convention). A Redis dedup lock (TTL=30min) prevents concurrent duplicate tasks for the same file. """ import json as _json import os as _os import subprocess as _subprocess import sys as _sys import redis as _redis_lib from pathlib import Path as _Path from sqlalchemy import create_engine, select as _select from sqlalchemy.orm import Session from app.config import settings as app_settings from app.models.cad_file import CadFile from app.models.system_setting import SystemSetting as _SysSetting pl = PipelineLogger(task_id=self.request.id) pl.step_start("export_glb_geometry", {"cad_file_id": cad_file_id}) # Redis dedup lock: prevent concurrent duplicate tasks for the same cad_file_id _lock_key = f"glb_geometry_lock:{cad_file_id}" _r = _redis_lib.from_url(app_settings.redis_url) _acquired = _r.set(_lock_key, "1", nx=True, ex=1800) # 30-min TTL if not _acquired: logger.warning("generate_gltf_geometry_task: %s already in-flight — skipping duplicate", cad_file_id) pl.step_done("export_glb_geometry", result={"skipped": True, "reason": "duplicate"}) return {"skipped": True} try: # Resolve tenant context at task start (required for RLS) from app.core.tenant_context import resolve_tenant_id_for_cad, set_tenant_context_sync _tenant_id = resolve_tenant_id_for_cad(cad_file_id) sync_url = app_settings.database_url.replace("+asyncpg", "") eng = create_engine(sync_url) with Session(eng) as session: set_tenant_context_sync(session, _tenant_id) cad_file = session.get(CadFile, cad_file_id) if not cad_file or not cad_file.stored_path: logger.error("generate_gltf_geometry_task: no stored_path for %s", cad_file_id) return step_path_str = cad_file.stored_path # Build hex color_map from product.cad_part_materials from app.domains.products.models import Product product = session.execute( _select(Product).where(Product.cad_file_id == cad_file.id) ).scalar_one_or_none() color_map: dict[str, str] = {} product_id = str(product.id) if product else None if product and product.cad_part_materials: for entry in product.cad_part_materials: part_name = entry.get("part_name") or entry.get("name", "") hex_color = entry.get("hex_color") or entry.get("color", "") if part_name and hex_color: color_map[part_name] = hex_color settings_rows = session.execute(_select(_SysSetting)).scalars().all() sys_settings = {s.key: s.value for s in settings_rows} linear_deflection = float(sys_settings.get("scene_linear_deflection", "0.1")) angular_deflection = float(sys_settings.get("scene_angular_deflection", "0.1")) tessellation_engine = sys_settings.get("tessellation_engine", "occ") # Hash-based cache check: skip tessellation if file and settings haven't changed from app.domains.products.cache_service import compute_step_hash as _compute_step_hash from app.domains.media.models import MediaAsset, MediaAssetType import uuid as _uuid_check _current_hash = _compute_step_hash(str(step_path_str)) _cache_hit_asset_id = None # Composite cache key includes deflection settings so changing them invalidates cache effective_cache_key = ( f"{_current_hash}:{linear_deflection}:{angular_deflection}:{tessellation_engine}" if _current_hash else None ) if effective_cache_key: existing_geo = session.execute( _select(MediaAsset).where( MediaAsset.cad_file_id == _uuid_check.UUID(cad_file_id), MediaAsset.asset_type == MediaAssetType.gltf_geometry, ) ).scalars().first() stored_key = (existing_geo.render_config or {}).get("cache_key", "") if existing_geo else "" if stored_key == effective_cache_key: _asset_disk_path = _Path(app_settings.upload_dir) / existing_geo.storage_key if _asset_disk_path.exists(): logger.info("[CACHE] cache key match — skipping geometry GLB tessellation for %s", cad_file_id) pl.step_done("export_glb_geometry", result={"cached": True, "asset_id": str(existing_geo.id)}) _cache_hit_asset_id = str(existing_geo.id) else: logger.info("[CACHE] cache key match but asset missing on disk — re-running tessellation for %s", cad_file_id) else: # Cache miss: update stored hash so next run can use it cad_file.step_file_hash = _current_hash session.commit() else: # No hash available: update stored hash and proceed cad_file.step_file_hash = _current_hash session.commit() eng.dispose() if _cache_hit_asset_id is not None: # Still chain USD master — it has its own hash-check try: generate_usd_master_task.delay(cad_file_id) except Exception: logger.debug("Could not queue generate_usd_master_task from cache-hit path (non-fatal)") return {"cached": True, "asset_id": _cache_hit_asset_id} step = _Path(step_path_str) if not step.exists(): log_task_event(self.request.id, f"Failed: STEP file not found: {step}", "error") raise RuntimeError(f"STEP file not found: {step}") output_path = step.parent / f"{step.stem}_geometry.glb" log_task_event( self.request.id, f"Starting OCC GLB export: {len(color_map)} part colors", "info", ) # Run export_step_to_gltf.py as a subprocess so OCP imports don't pollute worker state scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) script_path = scripts_dir / "export_step_to_gltf.py" python_bin = _sys.executable cmd = [ python_bin, str(script_path), "--step_path", str(step), "--output_path", str(output_path), "--color_map", _json.dumps(color_map), "--linear_deflection", str(linear_deflection), "--angular_deflection", str(angular_deflection), "--tessellation_engine", tessellation_engine, ] log_task_event( self.request.id, f"Tessellation ({tessellation_engine}): linear={linear_deflection}mm, angular={angular_deflection}rad", "info", ) try: result = _subprocess.run(cmd, capture_output=True, text=True, timeout=600) for line in result.stdout.splitlines(): logger.info("[occ-gltf] %s", line) for line in result.stderr.splitlines(): logger.warning("[occ-gltf stderr] %s", line) if result.returncode != 0 or not output_path.exists() or output_path.stat().st_size == 0: raise RuntimeError( f"export_step_to_gltf.py failed (exit {result.returncode}).\n" f"STDERR: {result.stderr[-1000:]}" ) except Exception as exc: log_task_event(self.request.id, f"Failed: {exc}", "error") pl.step_error("export_glb_geometry", str(exc), exc) logger.error("generate_gltf_geometry_task OCC export failed: %s", exc) raise self.retry(exc=exc, countdown=15) log_task_event(self.request.id, f"OCC GLB export completed: {output_path.name}", "done") # --- Store MediaAsset (upsert: update existing to keep stable ID/URL) --- import uuid as _uuid from sqlalchemy import create_engine as _ce, select as _sel2 from sqlalchemy.orm import Session as _Session from app.domains.media.models import MediaAsset, MediaAssetType _sync_url = app_settings.database_url.replace("+asyncpg", "") _eng2 = _ce(_sync_url) with _Session(_eng2) as _sess: set_tenant_context_sync(_sess, _tenant_id) _key = str(output_path) _prefix = str(app_settings.upload_dir).rstrip("/") + "/" if _key.startswith(_prefix): _key = _key[len(_prefix):] _file_size = output_path.stat().st_size if output_path.exists() else None existing = _sess.execute( _sel2(MediaAsset).where( MediaAsset.cad_file_id == _uuid.UUID(cad_file_id), MediaAsset.asset_type == MediaAssetType.gltf_geometry, ) ).scalars().first() if existing: existing.storage_key = _key existing.mime_type = "model/gltf-binary" existing.file_size_bytes = _file_size existing.render_config = {"cache_key": effective_cache_key} if product_id: existing.product_id = _uuid.UUID(product_id) _sess.commit() asset_id = str(existing.id) else: asset = MediaAsset( cad_file_id=_uuid.UUID(cad_file_id), product_id=_uuid.UUID(product_id) if product_id else None, asset_type=MediaAssetType.gltf_geometry, storage_key=_key, mime_type="model/gltf-binary", file_size_bytes=_file_size, render_config={"cache_key": effective_cache_key}, ) _sess.add(asset) _sess.commit() asset_id = str(asset.id) _eng2.dispose() pl.step_done("export_glb_geometry", result={"glb_path": str(output_path), "asset_id": asset_id}) logger.info("generate_gltf_geometry_task: MediaAsset %s created for cad %s", asset_id, cad_file_id) # Auto-chain USD master export so the canonical scene is always up to date try: generate_usd_master_task.delay(cad_file_id) logger.info("generate_gltf_geometry_task: queued generate_usd_master_task for %s", cad_file_id) except Exception: logger.debug("Could not queue generate_usd_master_task (non-fatal)") return {"glb_path": str(output_path), "asset_id": asset_id} finally: _r.delete(_lock_key) @celery_app.task( bind=True, name="app.tasks.step_tasks.generate_usd_master_task", queue="asset_pipeline", # needs pxr (usd-core) + OCC — both only in render-worker max_retries=1, ) def generate_usd_master_task(self, cad_file_id: str) -> dict: """Export a USD master file from STEP via OCC + pxr authoring. Pipeline: 1. Reads STEP file via export_step_to_usd.py (OCC XCAF + pxr) 2. Writes .usd file alongside the STEP file 3. Stores result as usd_master MediaAsset 4. Parses MANIFEST_JSON from stdout → writes resolved_material_assignments to CadFile """ import json as _json import os as _os import subprocess as _subprocess import sys as _sys import uuid as _uuid from pathlib import Path as _Path from sqlalchemy import create_engine as _ce, select as _sel from sqlalchemy.orm import Session as _Session from app.config import settings as app_settings from app.domains.media.models import MediaAsset, MediaAssetType from app.models.cad_file import CadFile from app.models.system_setting import SystemSetting from app.domains.products.models import Product from app.services.material_service import resolve_material_map pl = PipelineLogger(task_id=self.request.id) pl.step_start("usd_master", {"cad_file_id": cad_file_id}) # Redis dedup lock: prevent concurrent duplicate tasks for the same cad_file_id import redis as _redis_lib _lock_key = f"usd_master_lock:{cad_file_id}" _r = _redis_lib.from_url(app_settings.redis_url) _acquired = _r.set(_lock_key, "1", nx=True, ex=1800) # 30-min TTL if not _acquired: logger.warning("generate_usd_master_task: %s already in-flight — skipping duplicate", cad_file_id) pl.step_done("usd_master", result={"skipped": True, "reason": "duplicate"}) return {"skipped": True} try: from app.core.tenant_context import resolve_tenant_id_for_cad, set_tenant_context_sync _tenant_id = resolve_tenant_id_for_cad(cad_file_id) sync_url = app_settings.database_url.replace("+asyncpg", "") eng = _ce(sync_url) _cache_hit_asset_id: str | None = None with _Session(eng) as sess: set_tenant_context_sync(sess, _tenant_id) cad_file = sess.get(CadFile, cad_file_id) if not cad_file or not cad_file.stored_path: logger.error("generate_usd_master_task: no stored_path for %s", cad_file_id) return {"error": "no stored_path"} step_path = _Path(cad_file.stored_path) product = sess.execute( _sel(Product).where(Product.cad_file_id == cad_file.id) ).scalar_one_or_none() color_map: dict[str, str] = {} raw_mat_map: dict[str, str] = {} if product and product.cad_part_materials: for entry in product.cad_part_materials: part_name = entry.get("part_name") or entry.get("name", "") hex_color = entry.get("hex_color") or entry.get("color", "") if part_name and hex_color: color_map[part_name] = hex_color # Build raw material map for resolve_material_map raw_material = entry.get("material", "") if part_name and raw_material: raw_mat_map[part_name] = raw_material # Resolve raw material names to SCHAEFFLER library names via aliases material_map: dict[str, str] = {} if raw_mat_map: material_map = resolve_material_map(raw_mat_map) logger.info( "generate_usd_master_task: resolved %d material(s) for material_map", len(material_map), ) settings_rows = sess.execute(_sel(SystemSetting)).scalars().all() sys_settings = {s.key: s.value for s in settings_rows} linear_deflection = float(sys_settings.get("render_linear_deflection", "0.03")) angular_deflection = float(sys_settings.get("render_angular_deflection", "0.05")) sharp_threshold = float(sys_settings.get("sharp_edge_threshold", "20.0")) # Hash-based cache check: skip tessellation if file and settings haven't changed from app.domains.products.cache_service import compute_step_hash as _compute_step_hash_usd _current_hash_usd = _compute_step_hash_usd(str(step_path)) # Composite cache key includes deflection settings and material_map # so changing either invalidates cache (material primvars are baked into USD) import hashlib as _hashlib_cache _mat_hash = _hashlib_cache.md5( _json.dumps(material_map, sort_keys=True).encode() ).hexdigest()[:12] if material_map else "none" effective_cache_key = ( f"{_current_hash_usd}:{linear_deflection}:{angular_deflection}:{sharp_threshold}:{_mat_hash}" if _current_hash_usd else None ) if effective_cache_key: existing_usd = sess.execute( _sel(MediaAsset).where( MediaAsset.cad_file_id == cad_file.id, MediaAsset.asset_type == MediaAssetType.usd_master, ) ).scalars().first() stored_key = (existing_usd.render_config or {}).get("cache_key", "") if existing_usd else "" if stored_key == effective_cache_key: _usd_disk_path = _Path(app_settings.upload_dir) / existing_usd.storage_key if _usd_disk_path.exists(): logger.info("[CACHE] cache key match — skipping USD master tessellation for %s", cad_file_id) pl.step_done("usd_master", result={"cached": True, "asset_id": str(existing_usd.id)}) _cache_hit_asset_id = str(existing_usd.id) else: logger.info("[CACHE] cache key match but USD asset missing on disk — re-running tessellation for %s", cad_file_id) else: # Cache miss: update stored hash so next run can use it cad_file.step_file_hash = _current_hash_usd sess.commit() else: # No hash available: update stored hash and proceed cad_file.step_file_hash = _current_hash_usd sess.commit() eng.dispose() if _cache_hit_asset_id is not None: return {"cached": True, "asset_id": _cache_hit_asset_id} if not step_path.exists(): err = f"STEP file not found: {step_path}" pl.step_error("usd_master", err, None) raise RuntimeError(err) output_path = step_path.parent / f"{step_path.stem}_master.usd" scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) script_path = scripts_dir / "export_step_to_usd.py" if not script_path.exists(): err = f"export_step_to_usd.py not found at {script_path}" pl.step_error("usd_master", err, None) raise RuntimeError(err) cmd = [ _sys.executable, str(script_path), "--step_path", str(step_path), "--output_path", str(output_path), "--color_map", _json.dumps(color_map), "--linear_deflection", str(linear_deflection), "--angular_deflection", str(angular_deflection), "--sharp_threshold", str(sharp_threshold), "--cad_file_id", cad_file_id, ] if material_map: cmd += ["--material_map", _json.dumps(material_map)] log_task_event( self.request.id, f"[USD_MASTER] exporting STEP → USD: {step_path.name}", "info", ) try: result = _subprocess.run(cmd, capture_output=True, text=True, timeout=600) for line in result.stdout.splitlines(): logger.info("[usd-master] %s", line) for line in result.stderr.splitlines(): logger.warning("[usd-master stderr] %s", line) if result.returncode != 0 or not output_path.exists() or output_path.stat().st_size == 0: raise RuntimeError( f"export_step_to_usd.py failed (exit {result.returncode}).\n" f"STDERR: {result.stderr[-1000:]}" ) except Exception as exc: log_task_event(self.request.id, f"[USD_MASTER] failed: {exc}", "error") pl.step_error("usd_master", str(exc), exc) raise self.retry(exc=exc, countdown=15) # --- Store MediaAsset (upsert) --- eng2 = _ce(sync_url) asset_id: str = "" with _Session(eng2) as sess2: set_tenant_context_sync(sess2, _tenant_id) _key = str(output_path) _prefix = str(app_settings.upload_dir).rstrip("/") + "/" if _key.startswith(_prefix): _key = _key[len(_prefix):] _file_size = output_path.stat().st_size if output_path.exists() else None existing = sess2.execute( _sel(MediaAsset).where( MediaAsset.cad_file_id == _uuid.UUID(cad_file_id), MediaAsset.asset_type == MediaAssetType.usd_master, ) ).scalars().first() if existing: existing.storage_key = _key existing.mime_type = "model/vnd.usd" existing.file_size_bytes = _file_size existing.render_config = {"cache_key": effective_cache_key} sess2.commit() asset_id = str(existing.id) else: asset = MediaAsset( cad_file_id=_uuid.UUID(cad_file_id), asset_type=MediaAssetType.usd_master, storage_key=_key, mime_type="model/vnd.usd", file_size_bytes=_file_size, render_config={"cache_key": effective_cache_key}, ) sess2.add(asset) sess2.commit() asset_id = str(asset.id) eng2.dispose() # --- Parse MANIFEST_JSON and write resolved_material_assignments --- manifest_parts: list = [] for line in result.stdout.splitlines(): if line.startswith("MANIFEST_JSON: "): try: manifest_parts = _json.loads(line[len("MANIFEST_JSON: "):]).get("parts", []) except Exception as parse_exc: logger.warning("[USD_MASTER] MANIFEST_JSON parse failed: %s", parse_exc) break if manifest_parts: try: resolved = { p["part_key"]: { "source_name": p["source_name"], "prim_path": p["prim_path"], "canonical_material": p.get("canonical_material"), } for p in manifest_parts } eng3 = _ce(sync_url) with _Session(eng3) as sess3: set_tenant_context_sync(sess3, _tenant_id) row = sess3.get(CadFile, cad_file_id) if row: row.resolved_material_assignments = resolved sess3.commit() eng3.dispose() logger.info("[USD_MASTER] wrote resolved_material_assignments (%d parts)", len(resolved)) except Exception as write_exc: logger.warning("[USD_MASTER] failed to write resolved_material_assignments: %s", write_exc) log_task_event(self.request.id, f"[USD_MASTER] done: {output_path.name}", "done") pl.step_done("usd_master", result={"usd_path": str(output_path), "asset_id": asset_id}) return {"usd_path": str(output_path), "asset_id": asset_id, "n_parts": len(manifest_parts)} finally: _r.delete(_lock_key)