"""Celery tasks for asset library management.""" from __future__ import annotations import json import logging import os import subprocess import uuid from pathlib import Path from app.tasks.celery_app import celery_app logger = logging.getLogger(__name__) # Scripts are copied to /render-scripts/ in the render-worker container (RENDER_SCRIPTS_DIR env var) CATALOG_SCRIPT = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) / "catalog_assets.py" @celery_app.task( name="app.domains.materials.tasks.refresh_asset_library_catalog", queue="asset_pipeline", bind=True, max_retries=2, default_retry_delay=30, ) def refresh_asset_library_catalog(self, asset_library_id: str) -> None: """Run Blender headless to extract catalog from a .blend asset library. Updates the `catalog` JSONB column of the AssetLibrary record. """ from sqlalchemy import create_engine from sqlalchemy.orm import Session import app.models # noqa: F401 — registers all SQLAlchemy models so relationships resolve from app.domains.materials.models import AssetLibrary from app.config import settings sync_url = settings.database_url.replace("postgresql+asyncpg://", "postgresql://") try: engine = create_engine(sync_url) with Session(engine) as db: lib = db.get(AssetLibrary, uuid.UUID(asset_library_id)) if not lib: logger.warning("AssetLibrary %s not found", asset_library_id) return blend_path = lib.blend_file_path engine.dispose() if not blend_path or not Path(blend_path).exists(): logger.warning("AssetLibrary %s: blend file not found at %s", asset_library_id, blend_path) return # Determine Blender binary blender_bin = os.environ.get("BLENDER_BIN", "blender") result = subprocess.run( [ blender_bin, "--background", "--python", str(CATALOG_SCRIPT), "--", blend_path, ], capture_output=True, text=True, timeout=120, ) if result.returncode != 0: logger.error("catalog_assets.py failed (exit %d):\n%s", result.returncode, result.stderr) return # Parse catalog JSON from stdout (last line that starts with '{') catalog = None for line in reversed(result.stdout.splitlines()): line = line.strip() if line.startswith("{"): try: catalog = json.loads(line) break except json.JSONDecodeError: continue if catalog is None: logger.error("catalog_assets.py: no JSON found in output:\n%s", result.stdout) return # Persist catalog engine2 = create_engine(sync_url) with Session(engine2) as db: lib = db.get(AssetLibrary, uuid.UUID(asset_library_id)) if lib: lib.catalog = catalog db.commit() logger.info( "AssetLibrary %s catalog updated: %d materials, %d node_groups", asset_library_id, len(catalog.get("materials", [])), len(catalog.get("node_groups", [])), ) engine2.dispose() except subprocess.TimeoutExpired: logger.error("catalog_assets.py timed out for library %s", asset_library_id) raise self.retry(countdown=60) except Exception as exc: logger.exception("refresh_asset_library_catalog failed: %s", exc) raise self.retry(exc=exc)