Files
HartOMat/backend/app/domains/rendering/tasks.py
T
Hartmut bfd58e3419 fix: media thumbnails, product dimensions, inline 3D viewer, GLB export
Bug A: Media Library thumbnails were gray because <img src> cannot send
JWT auth headers. Added useAuthBlob() hook (fetch + createObjectURL) in
MediaBrowser.tsx. Also fixed publish_asset Celery task to populate
product_id + cad_file_id on MediaAsset for thumbnail fallback resolution.

Bug B: Product dimensions now shown in Product Details card with Ruler
icon and "from CAD" label when cad_mesh_attributes.dimensions_mm exists.

Bug C: Replaced 128×128 CAD thumbnail with InlineCadViewer component.
Queries gltf_geometry MediaAssets, fetches GLB via auth fetch → blob URL
→ Three.js Canvas with OrbitControls. Falls back to thumbnail + "Load 3D
Model" button. Polling when GLB generation is in progress.

Bug D: trimesh was in [cad] optional extra but Dockerfile only installed
[dev]. Changed to pip install -e ".[dev,cad]" — trimesh now available in
backend container, GLB + Colors export works.

Also added bbox extraction (STL-first numpy parsing) in render_step_thumbnail
and admin "Re-extract CAD Metadata" bulk endpoint.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-07 13:27:46 +01:00

748 lines
28 KiB
Python

"""Rendering domain tasks — Celery tasks for Blender-based rendering.
These tasks run on the `thumbnail_rendering` queue in the render-worker
container, which has Blender and cadquery available.
Phase A2: Initial implementation replacing the blender-renderer HTTP service.
Phase B: This module will be expanded as part of the Domain-Driven restructure.
"""
import logging
from pathlib import Path
from app.tasks.celery_app import celery_app
from app.core.task_logs import log_task_event
logger = logging.getLogger(__name__)
def _update_workflow_run_status(order_line_id: str, status: str, error: str | None = None) -> None:
"""Update the most recent WorkflowRun for an order_line after task completion."""
try:
import asyncio
from datetime import datetime as _dt
async def _run():
from app.database import AsyncSessionLocal
from app.domains.rendering.models import WorkflowRun
from sqlalchemy import select as _sel
async with AsyncSessionLocal() as db:
res = await db.execute(
_sel(WorkflowRun)
.where(WorkflowRun.order_line_id == order_line_id)
.order_by(WorkflowRun.created_at.desc())
.limit(1)
)
run = res.scalar_one_or_none()
if run and run.status == "pending":
run.status = status
run.completed_at = _dt.utcnow()
if error:
run.error_message = error[:2000]
await db.commit()
asyncio.get_event_loop().run_until_complete(_run())
except Exception as _exc:
logger.warning("Failed to update WorkflowRun status for line %s: %s", order_line_id, _exc)
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.render_still_task",
queue="thumbnail_rendering",
max_retries=2,
)
def render_still_task(
self,
step_path: str,
output_path: str,
engine: str = "cycles",
samples: int = 256,
stl_quality: str = "low",
smooth_angle: int = 30,
cycles_device: str = "auto",
width: int = 512,
height: int = 512,
transparent_bg: bool = False,
template_path: str | None = None,
target_collection: str = "Product",
material_library_path: str | None = None,
material_map: dict | None = None,
part_names_ordered: list | None = None,
lighting_only: bool = False,
shadow_catcher: bool = False,
rotation_x: float = 0.0,
rotation_y: float = 0.0,
rotation_z: float = 0.0,
noise_threshold: str = "",
denoiser: str = "",
denoising_input_passes: str = "",
denoising_prefilter: str = "",
denoising_quality: str = "",
denoising_use_gpu: str = "",
mesh_attributes: dict | None = None,
) -> dict:
"""Render a STEP file to a still PNG via Blender subprocess.
Returns render metadata dict on success.
Retries up to 2 times on failure (30s countdown).
"""
log_task_event(self.request.id, f"Starting render_still_task: {Path(step_path).name}", "info")
try:
from app.services.render_blender import render_still
result = render_still(
step_path=Path(step_path),
output_path=Path(output_path),
engine=engine,
samples=samples,
stl_quality=stl_quality,
smooth_angle=smooth_angle,
cycles_device=cycles_device,
width=width,
height=height,
transparent_bg=transparent_bg,
template_path=template_path,
target_collection=target_collection,
material_library_path=material_library_path,
material_map=material_map,
part_names_ordered=part_names_ordered,
lighting_only=lighting_only,
shadow_catcher=shadow_catcher,
rotation_x=rotation_x,
rotation_y=rotation_y,
rotation_z=rotation_z,
noise_threshold=noise_threshold,
denoiser=denoiser,
denoising_input_passes=denoising_input_passes,
denoising_prefilter=denoising_prefilter,
denoising_quality=denoising_quality,
denoising_use_gpu=denoising_use_gpu,
mesh_attributes=mesh_attributes or {},
)
log_task_event(self.request.id, f"Completed successfully in {result.get('total_duration_s', 0):.1f}s", "done")
logger.info(
"render_still_task completed: %s%s in %.1fs",
Path(step_path).name, Path(output_path).name,
result.get("total_duration_s", 0),
)
try:
from app.core.websocket import publish_event_sync
publish_event_sync(None, {
"type": "render.still.completed",
"step_path": Path(step_path).name,
"output": Path(output_path).name,
})
except Exception:
pass
return result
except Exception as exc:
log_task_event(self.request.id, f"Failed: {exc}", "error")
logger.error("render_still_task failed for %s: %s", step_path, exc)
try:
from app.core.websocket import publish_event_sync
publish_event_sync(None, {
"type": "render.still.failed",
"step_path": Path(step_path).name,
"error": str(exc),
})
except Exception:
pass
raise self.retry(exc=exc, countdown=30)
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.render_turntable_task",
queue="thumbnail_rendering",
max_retries=2,
)
def render_turntable_task(
self,
step_path: str,
output_dir: str,
output_name: str = "turntable",
engine: str = "cycles",
samples: int = 64,
stl_quality: str = "low",
smooth_angle: int = 30,
cycles_device: str = "auto",
width: int = 1920,
height: int = 1080,
frame_count: int = 120,
fps: int = 30,
turntable_degrees: float = 360.0,
turntable_axis: str = "world_z",
bg_color: str = "",
template_path: str | None = None,
target_collection: str = "Product",
material_library_path: str | None = None,
material_map: dict | None = None,
part_names_ordered: list | None = None,
lighting_only: bool = False,
shadow_catcher: bool = False,
camera_orbit: bool = True,
rotation_x: float = 0.0,
rotation_y: float = 0.0,
rotation_z: float = 0.0,
) -> dict:
"""Render a STEP file as a turntable animation (frames + FFmpeg composite).
Returns render metadata dict on success.
"""
log_task_event(self.request.id, f"Starting render_turntable_task: {Path(step_path).name}", "info")
import json
import os
import shutil
import subprocess
from app.services.render_blender import (
find_blender, convert_step_to_stl, export_per_part_stls
)
blender_bin = find_blender()
if not blender_bin:
raise RuntimeError("Blender binary not found in render-worker container")
step = Path(step_path)
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
turntable_script = scripts_dir / "turntable_render.py"
# STL conversion — try MinIO cache first, then convert locally
stl_path = step.parent / f"{step.stem}_{stl_quality}.stl"
if not stl_path.exists() or stl_path.stat().st_size == 0:
try:
from app.domains.products.cache_service import compute_step_hash, check_stl_cache
step_hash = compute_step_hash(str(step))
cached = check_stl_cache(step_hash, stl_quality)
if cached:
stl_path.write_bytes(cached)
logger.info("STL restored from MinIO cache: %s", stl_path.name)
else:
convert_step_to_stl(step, stl_path, stl_quality)
except Exception as exc:
logger.warning("MinIO cache check failed (non-fatal): %s — falling back to conversion", exc)
convert_step_to_stl(step, stl_path, stl_quality)
parts_dir = step.parent / f"{step.stem}_{stl_quality}_parts"
if not (parts_dir / "manifest.json").exists():
try:
export_per_part_stls(step, parts_dir, stl_quality)
except Exception as exc:
logger.warning("per-part export non-fatal: %s", exc)
# Build turntable render arguments
frames_dir = out_dir / "frames"
frames_dir.mkdir(exist_ok=True)
cmd = [
blender_bin, "--background",
"--python", str(turntable_script),
"--",
str(stl_path),
str(frames_dir),
output_name,
str(width), str(height),
engine, str(samples), str(smooth_angle), cycles_device,
str(frame_count), str(fps), str(turntable_degrees), turntable_axis,
template_path or "",
target_collection,
material_library_path or "",
json.dumps(material_map) if material_map else "{}",
json.dumps(part_names_ordered) if part_names_ordered else "[]",
"1" if lighting_only else "0",
"1" if shadow_catcher else "0",
"1" if camera_orbit else "0",
str(rotation_x), str(rotation_y), str(rotation_z),
]
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=3600
)
if result.returncode != 0:
raise RuntimeError(
f"Blender turntable exited {result.returncode}:\n{result.stdout[-2000:]}"
)
except Exception as exc:
log_task_event(self.request.id, f"Failed: {exc}", "error")
logger.error("render_turntable_task failed: %s", exc)
try:
from app.core.websocket import publish_event_sync
publish_event_sync(None, {
"type": "render.turntable.failed",
"step_path": Path(step_path).name,
"error": str(exc),
})
except Exception:
pass
raise self.retry(exc=exc, countdown=60)
# FFmpeg composite: frames → MP4 with optional background
output_mp4 = out_dir / f"{output_name}.mp4"
ffmpeg_cmd = _build_ffmpeg_cmd(
frames_dir, output_mp4, fps=fps, bg_color=bg_color
)
try:
subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True, timeout=300)
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"FFmpeg composite failed: {exc.stderr[-500:]}")
log_task_event(self.request.id, "Completed successfully", "done")
try:
from app.core.websocket import publish_event_sync
publish_event_sync(None, {
"type": "render.turntable.completed",
"step_path": Path(step_path).name,
"output": Path(output_mp4).name,
})
except Exception:
pass
return {
"output_mp4": str(output_mp4),
"frame_count": frame_count,
"fps": fps,
}
@celery_app.task(
name="rendering.publish_asset",
queue="step_processing",
)
def publish_asset(
order_line_id: str,
asset_type: str,
storage_key: str,
render_config: dict | None = None,
) -> str | None:
"""Create a MediaAsset record after a successful render."""
import asyncio
async def _run() -> str | None:
from app.database import AsyncSessionLocal
from app.domains.media.models import MediaAsset, MediaAssetType
from app.domains.orders.models import OrderLine
from app.domains.products.models import Product
from sqlalchemy import select
async with AsyncSessionLocal() as db:
res = await db.execute(select(OrderLine).where(OrderLine.id == order_line_id))
line = res.scalar_one_or_none()
if not line:
return None
# Resolve cad_file_id from the linked product
cad_file_id = None
if line.product_id:
prod_res = await db.execute(select(Product).where(Product.id == line.product_id))
product = prod_res.scalar_one_or_none()
if product:
cad_file_id = product.cad_file_id
asset = MediaAsset(
tenant_id=getattr(line, "tenant_id", None),
order_line_id=line.id,
product_id=line.product_id,
cad_file_id=cad_file_id,
asset_type=MediaAssetType(asset_type),
storage_key=storage_key,
render_config=render_config,
)
db.add(asset)
await db.commit()
return str(asset.id)
return asyncio.get_event_loop().run_until_complete(_run())
def _resolve_step_path_for_order_line(order_line_id: str) -> tuple[str | None, str | None]:
"""Sync helper: resolves (step_path, cad_file_id) from an OrderLine via DB."""
import asyncio
async def _inner() -> tuple[str | None, str | None]:
from app.database import AsyncSessionLocal
from app.domains.orders.models import OrderLine
from app.domains.products.models import Product
from app.models.cad_file import CadFile
from sqlalchemy import select
from sqlalchemy.orm import selectinload
async with AsyncSessionLocal() as db:
res = await db.execute(
select(OrderLine)
.options(selectinload(OrderLine.product))
.where(OrderLine.id == order_line_id)
)
line = res.scalar_one_or_none()
if not line or not line.product or not line.product.cad_file_id:
return None, None
cad_res = await db.execute(
select(CadFile).where(CadFile.id == line.product.cad_file_id)
)
cad = cad_res.scalar_one_or_none()
if not cad or not cad.stored_path:
return None, None
return cad.stored_path, str(line.product.cad_file_id)
return asyncio.get_event_loop().run_until_complete(_inner())
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.render_order_line_still_task",
queue="thumbnail_rendering",
max_retries=2,
)
def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
"""Render a still image for an order line, resolving STEP path from DB.
Wraps render_still_task logic but accepts order_line_id instead of step_path.
On success, creates a MediaAsset record via publish_asset.
"""
log_task_event(self.request.id, f"Starting render_order_line_still_task: order_line={order_line_id}", "info")
step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id)
if not step_path_str:
log_task_event(self.request.id, f"Failed: cannot resolve STEP path for order_line {order_line_id}", "error")
raise RuntimeError(
f"Cannot resolve STEP path for order_line {order_line_id}: "
"product missing or has no linked CAD file"
)
step = Path(step_path_str)
output_dir = step.parent / "renders"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"line_{order_line_id}.png"
try:
from app.services.render_blender import render_still
result = render_still(
step_path=step,
output_path=output_path,
**params,
)
publish_asset.delay(
order_line_id,
"still",
str(output_path),
render_config=result,
)
log_task_event(self.request.id, f"Completed successfully in {result.get('total_duration_s', 0):.1f}s", "done")
logger.info(
"render_order_line_still_task completed for line %s in %.1fs",
order_line_id, result.get("total_duration_s", 0),
)
try:
from app.core.websocket import publish_event_sync
publish_event_sync(None, {
"type": "render.order_line.completed",
"order_line_id": order_line_id,
})
except Exception:
pass
_update_workflow_run_status(order_line_id, "completed")
return result
except Exception as exc:
log_task_event(self.request.id, f"Failed: {exc}", "error")
logger.error("render_order_line_still_task failed for %s: %s", order_line_id, exc)
try:
from app.core.websocket import publish_event_sync
publish_event_sync(None, {
"type": "render.order_line.failed",
"order_line_id": order_line_id,
"error": str(exc),
})
except Exception:
pass
_update_workflow_run_status(order_line_id, "failed", str(exc))
raise self.retry(exc=exc, countdown=30)
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.export_gltf_for_order_line_task",
queue="thumbnail_rendering",
max_retries=1,
)
def export_gltf_for_order_line_task(self, order_line_id: str) -> dict:
"""Export a GLB from the STL cache via Blender subprocess (with trimesh fallback).
Publishes a MediaAsset with asset_type='gltf_geometry' (no asset lib) or
'gltf_production' (when an asset library is applied).
Requires the STL low-quality cache to exist.
"""
import json
import os
import subprocess
step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id)
if not step_path_str:
raise RuntimeError(f"Cannot resolve STEP path for order_line {order_line_id}")
step = Path(step_path_str)
stl_path = step.parent / f"{step.stem}_low.stl"
if not stl_path.exists():
raise RuntimeError(
f"STL cache not found: {stl_path}. Run thumbnail generation first."
)
output_path = step.parent / f"{step.stem}_geometry.glb"
scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
export_script = scripts_dir / "export_gltf.py"
from app.services.render_blender import find_blender, is_blender_available
asset_type = "gltf_geometry"
# Load sharp edge hints from mesh_attributes for UV seam marking
sharp_edges_json = "[]"
if cad_file_id:
try:
import asyncio as _asyncio
async def _load_mesh_attrs() -> list:
from app.database import AsyncSessionLocal
from app.models.cad_file import CadFile as _CF
from sqlalchemy import select as _sel
async with AsyncSessionLocal() as _db:
_res = await _db.execute(_sel(_CF).where(_CF.id == cad_file_id))
_cad = _res.scalar_one_or_none()
if _cad and _cad.mesh_attributes:
return _cad.mesh_attributes.get("sharp_edge_midpoints") or []
return []
_midpoints = _asyncio.get_event_loop().run_until_complete(_load_mesh_attrs())
if _midpoints:
sharp_edges_json = json.dumps(_midpoints)
except Exception as _exc:
logger.warning("Could not load sharp_edge_midpoints for %s: %s", cad_file_id, _exc)
if is_blender_available() and export_script.exists():
blender_bin = find_blender()
cmd = [
blender_bin, "--background",
"--python", str(export_script),
"--",
"--stl_path", str(stl_path),
"--output_path", str(output_path),
"--asset_library_blend", "",
"--material_map", json.dumps({}),
"--sharp_edges_json", sharp_edges_json,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
raise RuntimeError(
f"export_gltf.py exited {result.returncode}:\n{result.stderr[-500:]}"
)
publish_asset.delay(order_line_id, asset_type, str(output_path))
logger.info("export_gltf_for_order_line_task completed via Blender: %s", output_path.name)
return {"glb_path": str(output_path), "method": "blender"}
except Exception as exc:
logger.warning(
"Blender GLB export failed for %s, falling back to trimesh: %s",
order_line_id, exc,
)
# Trimesh fallback
try:
import trimesh
mesh = trimesh.load(str(stl_path))
mesh.export(str(output_path))
publish_asset.delay(order_line_id, asset_type, str(output_path))
logger.info("export_gltf_for_order_line_task completed via trimesh: %s", output_path.name)
return {"glb_path": str(output_path), "method": "trimesh"}
except Exception as exc:
logger.error("export_gltf_for_order_line_task failed for %s: %s", order_line_id, exc)
raise self.retry(exc=exc, countdown=15)
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.export_blend_for_order_line_task",
queue="thumbnail_rendering",
max_retries=1,
)
def export_blend_for_order_line_task(self, order_line_id: str) -> dict:
"""Export a production .blend file via Blender + asset library (export_blend.py).
Publishes a MediaAsset with asset_type='blend_production'.
Requires Blender + the render-scripts directory.
"""
import json
import os
import subprocess
step_path_str, cad_file_id = _resolve_step_path_for_order_line(order_line_id)
if not step_path_str:
raise RuntimeError(f"Cannot resolve STEP path for order_line {order_line_id}")
step = Path(step_path_str)
stl_path = step.parent / f"{step.stem}_low.stl"
if not stl_path.exists():
raise RuntimeError(f"STL cache not found: {stl_path}")
output_path = step.parent / f"{step.stem}_production.blend"
scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
export_script = scripts_dir / "export_blend.py"
from app.services.render_blender import find_blender
blender_bin = find_blender()
if not blender_bin:
raise RuntimeError("Blender binary not found — cannot run export_blend task")
# Resolve asset library path and material map from DB
asset_lib_path = ""
mat_map: dict = {}
try:
from sqlalchemy import create_engine, select as sql_select
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.domains.orders.models import OrderLine
from app.domains.products.models import Product
engine = create_engine(app_settings.database_url_sync)
with Session(engine) as s:
line = s.execute(sql_select(OrderLine).where(OrderLine.id == order_line_id)).scalar_one_or_none()
if line:
product = s.execute(sql_select(Product).where(Product.id == line.product_id)).scalar_one_or_none()
if product:
mat_map = {
m.get("part_name", ""): m.get("material", "")
for m in (product.cad_part_materials or [])
}
except Exception as exc:
logger.warning("export_blend_for_order_line_task: DB resolution error (non-fatal): %s", exc)
try:
cmd = [
blender_bin, "--background",
"--python", str(export_script),
"--",
"--stl_path", str(stl_path),
"--output_path", str(output_path),
"--asset_library_blend", asset_lib_path,
"--material_map", json.dumps(mat_map),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
raise RuntimeError(
f"export_blend.py exited {result.returncode}:\n{result.stderr[-500:]}"
)
publish_asset.delay(order_line_id, "blend_production", str(output_path))
logger.info("export_blend_for_order_line_task completed: %s", output_path.name)
return {"blend_path": str(output_path)}
except Exception as exc:
logger.error("export_blend_for_order_line_task failed for %s: %s", order_line_id, exc)
raise self.retry(exc=exc, countdown=30)
@celery_app.task(
bind=True,
name="app.domains.rendering.tasks.apply_asset_library_materials_task",
queue="thumbnail_rendering",
max_retries=1,
)
def apply_asset_library_materials_task(self, order_line_id: str, asset_library_id: str) -> dict:
"""Apply Blender asset library materials to a render via the asset_library.py script."""
import json
import os
import subprocess
from pathlib import Path
from app.services.render_blender import find_blender
blender_bin = find_blender()
if not blender_bin:
raise RuntimeError("Blender not available")
# Resolve paths from DB
def _inner():
from sqlalchemy import create_engine, select as sql_select
from sqlalchemy.orm import Session
from app.config import settings
from app.domains.orders.models import OrderLine
from app.domains.products.models import CadFile, Product
engine = create_engine(settings.database_url_sync)
with Session(engine) as s:
line = s.execute(sql_select(OrderLine).where(OrderLine.id == order_line_id)).scalar_one_or_none()
if not line:
return None, None, None
product = s.execute(sql_select(Product).where(Product.id == line.product_id)).scalar_one_or_none()
if not product or not product.cad_file_id:
return None, None, None
cad = s.execute(sql_select(CadFile).where(CadFile.id == product.cad_file_id)).scalar_one_or_none()
stl_path = str(Path(cad.stored_path).parent / f"{Path(cad.stored_path).stem}_low.stl") if cad else None
# Resolve asset library blend path
try:
from app.domains.materials.models import AssetLibrary
lib = s.execute(sql_select(AssetLibrary).where(AssetLibrary.id == asset_library_id)).scalar_one_or_none()
blend_path = lib.blend_file_path if lib else None
except Exception:
blend_path = None
mat_map = {m.get("part_name", ""): m.get("material", "") for m in (product.cad_part_materials or [])}
return stl_path, blend_path, mat_map
result = _inner()
if result is None or result[0] is None:
logger.warning("apply_asset_library_materials_task: could not resolve paths for %s", order_line_id)
return {"status": "skipped"}
stl_path, blend_path, mat_map = result
if not stl_path or not Path(stl_path).exists():
logger.warning("STL not found for %s", order_line_id)
return {"status": "skipped", "reason": "stl_not_found"}
scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
script = scripts_dir / "asset_library.py"
cmd = [
blender_bin, "--background", "--python", str(script), "--",
"--stl_path", stl_path,
"--asset_library_blend", blend_path or "",
"--material_map", json.dumps(mat_map),
]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if proc.returncode != 0:
raise RuntimeError(f"asset_library.py failed: {proc.stderr[-500:]}")
except Exception as exc:
logger.error("apply_asset_library_materials_task failed for %s: %s", order_line_id, exc)
raise self.retry(exc=exc, countdown=15)
return {"status": "applied", "order_line_id": order_line_id}
def _build_ffmpeg_cmd(
frames_dir: Path, output_mp4: Path, fps: int = 30, bg_color: str = ""
) -> list:
"""Build FFmpeg command for compositing turntable frames to MP4."""
import shutil as _shutil
ffmpeg = _shutil.which("ffmpeg") or "ffmpeg"
frame_pattern = str(frames_dir / "%04d.png")
if bg_color:
# Overlay transparent frames onto solid color background
r = int(bg_color[1:3], 16) if bg_color.startswith("#") else 255
g = int(bg_color[3:5], 16) if bg_color.startswith("#") else 255
b = int(bg_color[5:7], 16) if bg_color.startswith("#") else 255
color_str = f"color=c=0x{r:02x}{g:02x}{b:02x}:s=1920x1080:r={fps}"
return [
ffmpeg, "-y",
"-f", "lavfi", "-i", color_str,
"-framerate", str(fps), "-i", frame_pattern,
"-filter_complex", "[0:v][1:v]overlay=0:0",
"-c:v", "libx264", "-pix_fmt", "yuv420p",
"-movflags", "+faststart",
str(output_mp4),
]
else:
return [
ffmpeg, "-y",
"-framerate", str(fps), "-i", frame_pattern,
"-c:v", "libx264", "-pix_fmt", "yuv420p",
"-movflags", "+faststart",
str(output_mp4),
]