fix: deduplicate GLB/USD generation with Redis locks + review fixes

- Add per-file Redis SET NX EX 1800 locks to generate_gltf_geometry_task
  and generate_usd_master_task — concurrent duplicates (e.g. double-click
  of bulk action buttons) now log a warning and return immediately instead
  of running two expensive OCC tessellation subprocesses on the same file
- Fix eng.dispose() called inside with Session() block in cache-hit path
  of both tasks — moved to after the with block exits (Tasks 3+4 from plan)
- Add cad.updated_at = datetime.utcnow() in save_manual_material_overrides
  (was missing vs parallel save_part_materials endpoint)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-12 13:50:05 +01:00
parent 409fb92899
commit 71e099305c
4 changed files with 545 additions and 410 deletions
+1
View File
@@ -535,6 +535,7 @@ async def save_manual_material_overrides(
cad = await _get_cad_file(id, db)
cad.manual_material_overrides = body.overrides
cad.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(cad)
return ManualMaterialOverridesOut(
+342 -305
View File
@@ -24,191 +24,211 @@ def generate_gltf_geometry_task(self, cad_file_id: str):
4. Stores result as gltf_geometry MediaAsset (replaces any existing one)
Output is in meters, Y-up (glTF convention).
A Redis dedup lock (TTL=30min) prevents concurrent duplicate tasks for the same file.
"""
import json as _json
import os as _os
import subprocess as _subprocess
import sys as _sys
import redis as _redis_lib
from pathlib import Path as _Path
from sqlalchemy import create_engine, select as _select
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
from app.models.system_setting import SystemSetting as _SysSetting
pl = PipelineLogger(task_id=self.request.id)
pl.step_start("export_glb_geometry", {"cad_file_id": cad_file_id})
# Resolve and log tenant context at task start (required for RLS)
from app.core.tenant_context import resolve_tenant_id_for_cad, set_tenant_context_sync
_tenant_id = resolve_tenant_id_for_cad(cad_file_id)
# Redis dedup lock: prevent concurrent duplicate tasks for the same cad_file_id
_lock_key = f"glb_geometry_lock:{cad_file_id}"
_r = _redis_lib.from_url(app_settings.redis_url)
_acquired = _r.set(_lock_key, "1", nx=True, ex=1800) # 30-min TTL
if not _acquired:
logger.warning("generate_gltf_geometry_task: %s already in-flight — skipping duplicate", cad_file_id)
pl.step_done("export_glb_geometry", result={"skipped": True, "reason": "duplicate"})
return {"skipped": True}
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
set_tenant_context_sync(session, _tenant_id)
cad_file = session.get(CadFile, cad_file_id)
if not cad_file or not cad_file.stored_path:
logger.error("generate_gltf_geometry_task: no stored_path for %s", cad_file_id)
return
step_path_str = cad_file.stored_path
try:
# Resolve tenant context at task start (required for RLS)
from app.core.tenant_context import resolve_tenant_id_for_cad, set_tenant_context_sync
_tenant_id = resolve_tenant_id_for_cad(cad_file_id)
# Build hex color_map from product.cad_part_materials
from app.domains.products.models import Product
product = session.execute(
_select(Product).where(Product.cad_file_id == cad_file.id)
).scalar_one_or_none()
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
set_tenant_context_sync(session, _tenant_id)
cad_file = session.get(CadFile, cad_file_id)
if not cad_file or not cad_file.stored_path:
logger.error("generate_gltf_geometry_task: no stored_path for %s", cad_file_id)
return
step_path_str = cad_file.stored_path
color_map: dict[str, str] = {}
product_id = str(product.id) if product else None
if product and product.cad_part_materials:
for entry in product.cad_part_materials:
part_name = entry.get("part_name") or entry.get("name", "")
hex_color = entry.get("hex_color") or entry.get("color", "")
if part_name and hex_color:
color_map[part_name] = hex_color
# Build hex color_map from product.cad_part_materials
from app.domains.products.models import Product
product = session.execute(
_select(Product).where(Product.cad_file_id == cad_file.id)
).scalar_one_or_none()
settings_rows = session.execute(_select(_SysSetting)).scalars().all()
sys_settings = {s.key: s.value for s in settings_rows}
color_map: dict[str, str] = {}
product_id = str(product.id) if product else None
if product and product.cad_part_materials:
for entry in product.cad_part_materials:
part_name = entry.get("part_name") or entry.get("name", "")
hex_color = entry.get("hex_color") or entry.get("color", "")
if part_name and hex_color:
color_map[part_name] = hex_color
# Hash-based cache check: skip tessellation if file hasn't changed
step_file_hash = cad_file.step_file_hash
if step_file_hash:
from app.domains.media.models import MediaAsset, MediaAssetType
import uuid as _uuid_check
existing_geo = session.execute(
_select(MediaAsset).where(
MediaAsset.cad_file_id == _uuid_check.UUID(cad_file_id),
settings_rows = session.execute(_select(_SysSetting)).scalars().all()
sys_settings = {s.key: s.value for s in settings_rows}
# Hash-based cache check: skip tessellation if file hasn't changed
step_file_hash = cad_file.step_file_hash
if step_file_hash:
from app.domains.media.models import MediaAsset, MediaAssetType
import uuid as _uuid_check
existing_geo = session.execute(
_select(MediaAsset).where(
MediaAsset.cad_file_id == _uuid_check.UUID(cad_file_id),
MediaAsset.asset_type == MediaAssetType.gltf_geometry,
)
).scalars().first()
if existing_geo:
logger.info("[CACHE] hash match — skipping geometry GLB tessellation for %s", cad_file_id)
pl.step_done("export_glb_geometry", result={"cached": True, "asset_id": str(existing_geo.id)})
_cache_hit_asset_id = str(existing_geo.id)
else:
_cache_hit_asset_id = None
else:
_cache_hit_asset_id = None
eng.dispose()
if _cache_hit_asset_id is not None:
# Still chain USD master — it has its own hash-check
try:
generate_usd_master_task.delay(cad_file_id)
except Exception:
logger.debug("Could not queue generate_usd_master_task from cache-hit path (non-fatal)")
return {"cached": True, "asset_id": _cache_hit_asset_id}
linear_deflection = float(sys_settings.get("scene_linear_deflection", "0.1"))
angular_deflection = float(sys_settings.get("scene_angular_deflection", "0.1"))
tessellation_engine = sys_settings.get("tessellation_engine", "occ")
step = _Path(step_path_str)
if not step.exists():
log_task_event(self.request.id, f"Failed: STEP file not found: {step}", "error")
raise RuntimeError(f"STEP file not found: {step}")
output_path = step.parent / f"{step.stem}_geometry.glb"
log_task_event(
self.request.id,
f"Starting OCC GLB export: {len(color_map)} part colors",
"info",
)
# Run export_step_to_gltf.py as a subprocess so OCP imports don't pollute worker state
scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
script_path = scripts_dir / "export_step_to_gltf.py"
python_bin = _sys.executable
cmd = [
python_bin, str(script_path),
"--step_path", str(step),
"--output_path", str(output_path),
"--color_map", _json.dumps(color_map),
"--linear_deflection", str(linear_deflection),
"--angular_deflection", str(angular_deflection),
"--tessellation_engine", tessellation_engine,
]
log_task_event(
self.request.id,
f"Tessellation ({tessellation_engine}): linear={linear_deflection}mm, angular={angular_deflection}rad",
"info",
)
try:
result = _subprocess.run(cmd, capture_output=True, text=True, timeout=600)
for line in result.stdout.splitlines():
logger.info("[occ-gltf] %s", line)
for line in result.stderr.splitlines():
logger.warning("[occ-gltf stderr] %s", line)
if result.returncode != 0 or not output_path.exists() or output_path.stat().st_size == 0:
raise RuntimeError(
f"export_step_to_gltf.py failed (exit {result.returncode}).\n"
f"STDERR: {result.stderr[-1000:]}"
)
except Exception as exc:
log_task_event(self.request.id, f"Failed: {exc}", "error")
pl.step_error("export_glb_geometry", str(exc), exc)
logger.error("generate_gltf_geometry_task OCC export failed: %s", exc)
raise self.retry(exc=exc, countdown=15)
log_task_event(self.request.id, f"OCC GLB export completed: {output_path.name}", "done")
# --- Store MediaAsset (upsert: update existing to keep stable ID/URL) ---
import uuid as _uuid
from sqlalchemy import create_engine as _ce, select as _sel2
from sqlalchemy.orm import Session as _Session
from app.domains.media.models import MediaAsset, MediaAssetType
_sync_url = app_settings.database_url.replace("+asyncpg", "")
_eng2 = _ce(_sync_url)
with _Session(_eng2) as _sess:
set_tenant_context_sync(_sess, _tenant_id)
_key = str(output_path)
_prefix = str(app_settings.upload_dir).rstrip("/") + "/"
if _key.startswith(_prefix):
_key = _key[len(_prefix):]
_file_size = output_path.stat().st_size if output_path.exists() else None
existing = _sess.execute(
_sel2(MediaAsset).where(
MediaAsset.cad_file_id == _uuid.UUID(cad_file_id),
MediaAsset.asset_type == MediaAssetType.gltf_geometry,
)
).scalars().first()
if existing_geo:
logger.info("[CACHE] hash match — skipping geometry GLB tessellation for %s", cad_file_id)
pl.step_done("export_glb_geometry", result={"cached": True, "asset_id": str(existing_geo.id)})
eng.dispose()
# Still chain USD master — it has its own hash-check (C2)
try:
generate_usd_master_task.delay(cad_file_id)
except Exception:
logger.debug("Could not queue generate_usd_master_task from cache-hit path (non-fatal)")
return {"cached": True, "asset_id": str(existing_geo.id)}
eng.dispose()
linear_deflection = float(sys_settings.get("scene_linear_deflection", "0.1"))
angular_deflection = float(sys_settings.get("scene_angular_deflection", "0.1"))
tessellation_engine = sys_settings.get("tessellation_engine", "occ")
if existing:
existing.storage_key = _key
existing.mime_type = "model/gltf-binary"
existing.file_size_bytes = _file_size
if product_id:
existing.product_id = _uuid.UUID(product_id)
_sess.commit()
asset_id = str(existing.id)
else:
asset = MediaAsset(
cad_file_id=_uuid.UUID(cad_file_id),
product_id=_uuid.UUID(product_id) if product_id else None,
asset_type=MediaAssetType.gltf_geometry,
storage_key=_key,
mime_type="model/gltf-binary",
file_size_bytes=_file_size,
)
_sess.add(asset)
_sess.commit()
asset_id = str(asset.id)
_eng2.dispose()
step = _Path(step_path_str)
pl.step_done("export_glb_geometry", result={"glb_path": str(output_path), "asset_id": asset_id})
logger.info("generate_gltf_geometry_task: MediaAsset %s created for cad %s", asset_id, cad_file_id)
if not step.exists():
log_task_event(self.request.id, f"Failed: STEP file not found: {step}", "error")
raise RuntimeError(f"STEP file not found: {step}")
# Auto-chain USD master export so the canonical scene is always up to date
try:
generate_usd_master_task.delay(cad_file_id)
logger.info("generate_gltf_geometry_task: queued generate_usd_master_task for %s", cad_file_id)
except Exception:
logger.debug("Could not queue generate_usd_master_task (non-fatal)")
output_path = step.parent / f"{step.stem}_geometry.glb"
return {"glb_path": str(output_path), "asset_id": asset_id}
log_task_event(
self.request.id,
f"Starting OCC GLB export: {len(color_map)} part colors",
"info",
)
# Run export_step_to_gltf.py as a subprocess so OCP imports don't pollute worker state
scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
script_path = scripts_dir / "export_step_to_gltf.py"
python_bin = _sys.executable
cmd = [
python_bin, str(script_path),
"--step_path", str(step),
"--output_path", str(output_path),
"--color_map", _json.dumps(color_map),
"--linear_deflection", str(linear_deflection),
"--angular_deflection", str(angular_deflection),
"--tessellation_engine", tessellation_engine,
]
log_task_event(
self.request.id,
f"Tessellation ({tessellation_engine}): linear={linear_deflection}mm, angular={angular_deflection}rad",
"info",
)
try:
result = _subprocess.run(cmd, capture_output=True, text=True, timeout=600)
for line in result.stdout.splitlines():
logger.info("[occ-gltf] %s", line)
for line in result.stderr.splitlines():
logger.warning("[occ-gltf stderr] %s", line)
if result.returncode != 0 or not output_path.exists() or output_path.stat().st_size == 0:
raise RuntimeError(
f"export_step_to_gltf.py failed (exit {result.returncode}).\n"
f"STDERR: {result.stderr[-1000:]}"
)
except Exception as exc:
log_task_event(self.request.id, f"Failed: {exc}", "error")
pl.step_error("export_glb_geometry", str(exc), exc)
logger.error("generate_gltf_geometry_task OCC export failed: %s", exc)
raise self.retry(exc=exc, countdown=15)
log_task_event(self.request.id, f"OCC GLB export completed: {output_path.name}", "done")
# --- Store MediaAsset (upsert: update existing to keep stable ID/URL) ---
import uuid as _uuid
from sqlalchemy import create_engine as _ce, select as _sel2
from sqlalchemy.orm import Session as _Session
from app.domains.media.models import MediaAsset, MediaAssetType
_sync_url = app_settings.database_url.replace("+asyncpg", "")
_eng2 = _ce(_sync_url)
with _Session(_eng2) as _sess:
set_tenant_context_sync(_sess, _tenant_id)
_key = str(output_path)
_prefix = str(app_settings.upload_dir).rstrip("/") + "/"
if _key.startswith(_prefix):
_key = _key[len(_prefix):]
_file_size = output_path.stat().st_size if output_path.exists() else None
existing = _sess.execute(
_sel2(MediaAsset).where(
MediaAsset.cad_file_id == _uuid.UUID(cad_file_id),
MediaAsset.asset_type == MediaAssetType.gltf_geometry,
)
).scalars().first()
if existing:
existing.storage_key = _key
existing.mime_type = "model/gltf-binary"
existing.file_size_bytes = _file_size
if product_id:
existing.product_id = _uuid.UUID(product_id)
_sess.commit()
asset_id = str(existing.id)
else:
asset = MediaAsset(
cad_file_id=_uuid.UUID(cad_file_id),
product_id=_uuid.UUID(product_id) if product_id else None,
asset_type=MediaAssetType.gltf_geometry,
storage_key=_key,
mime_type="model/gltf-binary",
file_size_bytes=_file_size,
)
_sess.add(asset)
_sess.commit()
asset_id = str(asset.id)
_eng2.dispose()
pl.step_done("export_glb_geometry", result={"glb_path": str(output_path), "asset_id": asset_id})
logger.info("generate_gltf_geometry_task: MediaAsset %s created for cad %s", asset_id, cad_file_id)
# Auto-chain USD master export so the canonical scene is always up to date
try:
generate_usd_master_task.delay(cad_file_id)
logger.info("generate_gltf_geometry_task: queued generate_usd_master_task for %s", cad_file_id)
except Exception:
logger.debug("Could not queue generate_usd_master_task (non-fatal)")
return {"glb_path": str(output_path), "asset_id": asset_id}
finally:
_r.delete(_lock_key)
@celery_app.task(
@@ -521,169 +541,186 @@ def generate_usd_master_task(self, cad_file_id: str) -> dict:
pl = PipelineLogger(task_id=self.request.id)
pl.step_start("usd_master", {"cad_file_id": cad_file_id})
from app.core.tenant_context import resolve_tenant_id_for_cad, set_tenant_context_sync
_tenant_id = resolve_tenant_id_for_cad(cad_file_id)
# Redis dedup lock: prevent concurrent duplicate tasks for the same cad_file_id
import redis as _redis_lib
_lock_key = f"usd_master_lock:{cad_file_id}"
_r = _redis_lib.from_url(app_settings.redis_url)
_acquired = _r.set(_lock_key, "1", nx=True, ex=1800) # 30-min TTL
if not _acquired:
logger.warning("generate_usd_master_task: %s already in-flight — skipping duplicate", cad_file_id)
pl.step_done("usd_master", result={"skipped": True, "reason": "duplicate"})
return {"skipped": True}
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = _ce(sync_url)
try:
from app.core.tenant_context import resolve_tenant_id_for_cad, set_tenant_context_sync
_tenant_id = resolve_tenant_id_for_cad(cad_file_id)
with _Session(eng) as sess:
set_tenant_context_sync(sess, _tenant_id)
cad_file = sess.get(CadFile, cad_file_id)
if not cad_file or not cad_file.stored_path:
logger.error("generate_usd_master_task: no stored_path for %s", cad_file_id)
return {"error": "no stored_path"}
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = _ce(sync_url)
_cache_hit_asset_id: str | None = None
step_path = _Path(cad_file.stored_path)
with _Session(eng) as sess:
set_tenant_context_sync(sess, _tenant_id)
cad_file = sess.get(CadFile, cad_file_id)
if not cad_file or not cad_file.stored_path:
logger.error("generate_usd_master_task: no stored_path for %s", cad_file_id)
return {"error": "no stored_path"}
product = sess.execute(
_sel(Product).where(Product.cad_file_id == cad_file.id)
).scalar_one_or_none()
step_path = _Path(cad_file.stored_path)
color_map: dict[str, str] = {}
if product and product.cad_part_materials:
for entry in product.cad_part_materials:
part_name = entry.get("part_name") or entry.get("name", "")
hex_color = entry.get("hex_color") or entry.get("color", "")
if part_name and hex_color:
color_map[part_name] = hex_color
product = sess.execute(
_sel(Product).where(Product.cad_file_id == cad_file.id)
).scalar_one_or_none()
settings_rows = sess.execute(_sel(SystemSetting)).scalars().all()
sys_settings = {s.key: s.value for s in settings_rows}
color_map: dict[str, str] = {}
if product and product.cad_part_materials:
for entry in product.cad_part_materials:
part_name = entry.get("part_name") or entry.get("name", "")
hex_color = entry.get("hex_color") or entry.get("color", "")
if part_name and hex_color:
color_map[part_name] = hex_color
# Hash-based cache check: skip tessellation if file hasn't changed
step_file_hash = cad_file.step_file_hash
if step_file_hash:
existing_usd = sess.execute(
settings_rows = sess.execute(_sel(SystemSetting)).scalars().all()
sys_settings = {s.key: s.value for s in settings_rows}
# Hash-based cache check: skip tessellation if file hasn't changed
step_file_hash = cad_file.step_file_hash
if step_file_hash:
existing_usd = sess.execute(
_sel(MediaAsset).where(
MediaAsset.cad_file_id == cad_file.id,
MediaAsset.asset_type == MediaAssetType.usd_master,
)
).scalars().first()
if existing_usd:
logger.info("[CACHE] hash match — skipping USD master tessellation for %s", cad_file_id)
pl.step_done("usd_master", result={"cached": True, "asset_id": str(existing_usd.id)})
_cache_hit_asset_id = str(existing_usd.id)
eng.dispose()
if _cache_hit_asset_id is not None:
return {"cached": True, "asset_id": _cache_hit_asset_id}
if not step_path.exists():
err = f"STEP file not found: {step_path}"
pl.step_error("usd_master", err, None)
raise RuntimeError(err)
linear_deflection = float(sys_settings.get("render_linear_deflection", "0.03"))
angular_deflection = float(sys_settings.get("render_angular_deflection", "0.05"))
sharp_threshold = float(sys_settings.get("sharp_edge_threshold", "20.0"))
output_path = step_path.parent / f"{step_path.stem}_master.usd"
scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
script_path = scripts_dir / "export_step_to_usd.py"
if not script_path.exists():
err = f"export_step_to_usd.py not found at {script_path}"
pl.step_error("usd_master", err, None)
raise RuntimeError(err)
cmd = [
_sys.executable, str(script_path),
"--step_path", str(step_path),
"--output_path", str(output_path),
"--color_map", _json.dumps(color_map),
"--linear_deflection", str(linear_deflection),
"--angular_deflection", str(angular_deflection),
"--sharp_threshold", str(sharp_threshold),
"--cad_file_id", cad_file_id,
]
log_task_event(
self.request.id,
f"[USD_MASTER] exporting STEP → USD: {step_path.name}",
"info",
)
try:
result = _subprocess.run(cmd, capture_output=True, text=True, timeout=600)
for line in result.stdout.splitlines():
logger.info("[usd-master] %s", line)
for line in result.stderr.splitlines():
logger.warning("[usd-master stderr] %s", line)
if result.returncode != 0 or not output_path.exists() or output_path.stat().st_size == 0:
raise RuntimeError(
f"export_step_to_usd.py failed (exit {result.returncode}).\n"
f"STDERR: {result.stderr[-1000:]}"
)
except Exception as exc:
log_task_event(self.request.id, f"[USD_MASTER] failed: {exc}", "error")
pl.step_error("usd_master", str(exc), exc)
raise self.retry(exc=exc, countdown=15)
# --- Store MediaAsset (upsert) ---
eng2 = _ce(sync_url)
asset_id: str = ""
with _Session(eng2) as sess2:
set_tenant_context_sync(sess2, _tenant_id)
_key = str(output_path)
_prefix = str(app_settings.upload_dir).rstrip("/") + "/"
if _key.startswith(_prefix):
_key = _key[len(_prefix):]
_file_size = output_path.stat().st_size if output_path.exists() else None
existing = sess2.execute(
_sel(MediaAsset).where(
MediaAsset.cad_file_id == cad_file.id,
MediaAsset.cad_file_id == _uuid.UUID(cad_file_id),
MediaAsset.asset_type == MediaAssetType.usd_master,
)
).scalars().first()
if existing_usd:
logger.info("[CACHE] hash match — skipping USD master tessellation for %s", cad_file_id)
pl.step_done("usd_master", result={"cached": True, "asset_id": str(existing_usd.id)})
eng.dispose()
return {"cached": True, "asset_id": str(existing_usd.id)}
eng.dispose()
if not step_path.exists():
err = f"STEP file not found: {step_path}"
pl.step_error("usd_master", err, None)
raise RuntimeError(err)
if existing:
existing.storage_key = _key
existing.mime_type = "model/vnd.usd"
existing.file_size_bytes = _file_size
sess2.commit()
asset_id = str(existing.id)
else:
asset = MediaAsset(
cad_file_id=_uuid.UUID(cad_file_id),
asset_type=MediaAssetType.usd_master,
storage_key=_key,
mime_type="model/vnd.usd",
file_size_bytes=_file_size,
)
sess2.add(asset)
sess2.commit()
asset_id = str(asset.id)
eng2.dispose()
linear_deflection = float(sys_settings.get("render_linear_deflection", "0.03"))
angular_deflection = float(sys_settings.get("render_angular_deflection", "0.05"))
sharp_threshold = float(sys_settings.get("sharp_edge_threshold", "20.0"))
output_path = step_path.parent / f"{step_path.stem}_master.usd"
scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
script_path = scripts_dir / "export_step_to_usd.py"
if not script_path.exists():
err = f"export_step_to_usd.py not found at {script_path}"
pl.step_error("usd_master", err, None)
raise RuntimeError(err)
cmd = [
_sys.executable, str(script_path),
"--step_path", str(step_path),
"--output_path", str(output_path),
"--color_map", _json.dumps(color_map),
"--linear_deflection", str(linear_deflection),
"--angular_deflection", str(angular_deflection),
"--sharp_threshold", str(sharp_threshold),
"--cad_file_id", cad_file_id,
]
log_task_event(
self.request.id,
f"[USD_MASTER] exporting STEP → USD: {step_path.name}",
"info",
)
try:
result = _subprocess.run(cmd, capture_output=True, text=True, timeout=600)
# --- Parse MANIFEST_JSON and write resolved_material_assignments ---
manifest_parts: list = []
for line in result.stdout.splitlines():
logger.info("[usd-master] %s", line)
for line in result.stderr.splitlines():
logger.warning("[usd-master stderr] %s", line)
if line.startswith("MANIFEST_JSON: "):
try:
manifest_parts = _json.loads(line[len("MANIFEST_JSON: "):]).get("parts", [])
except Exception as parse_exc:
logger.warning("[USD_MASTER] MANIFEST_JSON parse failed: %s", parse_exc)
break
if result.returncode != 0 or not output_path.exists() or output_path.stat().st_size == 0:
raise RuntimeError(
f"export_step_to_usd.py failed (exit {result.returncode}).\n"
f"STDERR: {result.stderr[-1000:]}"
)
except Exception as exc:
log_task_event(self.request.id, f"[USD_MASTER] failed: {exc}", "error")
pl.step_error("usd_master", str(exc), exc)
raise self.retry(exc=exc, countdown=15)
# --- Store MediaAsset (upsert) ---
eng2 = _ce(sync_url)
asset_id: str = ""
with _Session(eng2) as sess2:
set_tenant_context_sync(sess2, _tenant_id)
_key = str(output_path)
_prefix = str(app_settings.upload_dir).rstrip("/") + "/"
if _key.startswith(_prefix):
_key = _key[len(_prefix):]
_file_size = output_path.stat().st_size if output_path.exists() else None
existing = sess2.execute(
_sel(MediaAsset).where(
MediaAsset.cad_file_id == _uuid.UUID(cad_file_id),
MediaAsset.asset_type == MediaAssetType.usd_master,
)
).scalars().first()
if existing:
existing.storage_key = _key
existing.mime_type = "model/vnd.usd"
existing.file_size_bytes = _file_size
sess2.commit()
asset_id = str(existing.id)
else:
asset = MediaAsset(
cad_file_id=_uuid.UUID(cad_file_id),
asset_type=MediaAssetType.usd_master,
storage_key=_key,
mime_type="model/vnd.usd",
file_size_bytes=_file_size,
)
sess2.add(asset)
sess2.commit()
asset_id = str(asset.id)
eng2.dispose()
# --- Parse MANIFEST_JSON and write resolved_material_assignments ---
manifest_parts: list = []
for line in result.stdout.splitlines():
if line.startswith("MANIFEST_JSON: "):
if manifest_parts:
try:
manifest_parts = _json.loads(line[len("MANIFEST_JSON: "):]).get("parts", [])
except Exception as parse_exc:
logger.warning("[USD_MASTER] MANIFEST_JSON parse failed: %s", parse_exc)
break
resolved = {
p["part_key"]: {"source_name": p["source_name"], "prim_path": p["prim_path"]}
for p in manifest_parts
}
eng3 = _ce(sync_url)
with _Session(eng3) as sess3:
set_tenant_context_sync(sess3, _tenant_id)
row = sess3.get(CadFile, cad_file_id)
if row:
row.resolved_material_assignments = resolved
sess3.commit()
eng3.dispose()
logger.info("[USD_MASTER] wrote resolved_material_assignments (%d parts)", len(resolved))
except Exception as write_exc:
logger.warning("[USD_MASTER] failed to write resolved_material_assignments: %s", write_exc)
if manifest_parts:
try:
resolved = {
p["part_key"]: {"source_name": p["source_name"], "prim_path": p["prim_path"]}
for p in manifest_parts
}
eng3 = _ce(sync_url)
with _Session(eng3) as sess3:
set_tenant_context_sync(sess3, _tenant_id)
row = sess3.get(CadFile, cad_file_id)
if row:
row.resolved_material_assignments = resolved
sess3.commit()
eng3.dispose()
logger.info("[USD_MASTER] wrote resolved_material_assignments (%d parts)", len(resolved))
except Exception as write_exc:
logger.warning("[USD_MASTER] failed to write resolved_material_assignments: %s", write_exc)
log_task_event(self.request.id, f"[USD_MASTER] done: {output_path.name}", "done")
pl.step_done("usd_master", result={"usd_path": str(output_path), "asset_id": asset_id})
return {"usd_path": str(output_path), "asset_id": asset_id, "n_parts": len(manifest_parts)}
log_task_event(self.request.id, f"[USD_MASTER] done: {output_path.name}", "done")
pl.step_done("usd_master", result={"usd_path": str(output_path), "asset_id": asset_id})
return {"usd_path": str(output_path), "asset_id": asset_id, "n_parts": len(manifest_parts)}
finally:
_r.delete(_lock_key)