feat: extract workflow output save phase 3

This commit is contained in:
2026-04-07 09:50:58 +02:00
parent 9c93ecef49
commit 160c198bb3
5 changed files with 232 additions and 81 deletions
@@ -70,10 +70,11 @@ def render_order_line_task(self, order_line_id: str):
emit(order_line_id, "Celery render task started")
try:
from sqlalchemy import create_engine, select, update as sql_update
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.domains.rendering.workflow_runtime_services import (
persist_order_line_output,
prepare_order_line_render_context,
resolve_order_line_template_context,
resolve_render_position_context,
@@ -85,7 +86,6 @@ def render_order_line_task(self, order_line_id: str):
with Session(engine) as session:
set_tenant_context_sync(session, _tenant_id)
from app.models.order_line import OrderLine
from pathlib import Path as _Path
setup = prepare_order_line_render_context(
@@ -458,82 +458,18 @@ def render_order_line_task(self, order_line_id: str):
new_status = "completed" if success else "failed"
render_end = datetime.utcnow()
elapsed = (render_end - render_start).total_seconds()
update_values = dict(
render_status=new_status,
render_completed_at=render_end,
render_log=render_log,
)
if success:
update_values["result_path"] = output_path
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(**update_values)
)
session.commit()
if success:
# Create MediaAsset so the render appears in the Media Browser
try:
import os as _os
from app.domains.media.models import MediaAsset, MediaAssetType as MAT
from app.config import settings as _cfg2
_ext = str(output_path).rsplit(".", 1)[-1].lower() if "." in str(output_path) else "bin"
_mime = (
"video/mp4" if _ext in ("mp4", "webm")
else "image/webp" if _ext == "webp"
else "image/jpeg" if _ext in ("jpg", "jpeg")
else "image/png"
)
# Extension determines type — poster frames (.jpg/.png) from animations are still stills
_at = MAT.turntable if _ext in ("mp4", "webm") else MAT.still
_tenant_id = line.product.cad_file.tenant_id if (line.product and line.product.cad_file) else None
# Normalize storage_key to relative path
_raw_key = str(output_path)
_upload_prefix = str(_cfg2.upload_dir).rstrip("/") + "/"
_norm_key = _raw_key[len(_upload_prefix):] if _raw_key.startswith(_upload_prefix) else _raw_key
_existing = session.execute(
select(MediaAsset.id).where(MediaAsset.storage_key == _norm_key).limit(1)
).scalar_one_or_none()
if not _existing:
# Probe output file for metadata
_file_size = None
_width = None
_height = None
if _os.path.exists(output_path):
try:
_file_size = _os.path.getsize(output_path)
except OSError:
pass
# Snapshot key render settings into render_config
_render_config = None
if isinstance(render_log, dict):
_render_config = {
k: render_log[k]
for k in (
"renderer", "engine_used", "engine", "samples",
"device_used", "compute_type", "total_duration_s",
)
if k in render_log
}
_asset = MediaAsset(
tenant_id=_tenant_id,
order_line_id=line.id,
product_id=line.product_id,
asset_type=_at,
storage_key=_norm_key,
mime_type=_mime,
file_size_bytes=_file_size,
width=_width,
height=_height,
render_config=_render_config,
)
session.add(_asset)
session.commit()
except Exception:
logger.exception("Failed to create MediaAsset for order_line %s", order_line_id)
try:
persist_order_line_output(
session,
line,
success=success,
output_path=output_path,
render_log=render_log if isinstance(render_log, dict) else None,
render_completed_at=render_end,
)
except Exception:
logger.exception("Failed to persist render output for order_line %s", order_line_id)
raise
if success:
emit(order_line_id, f"Render completed in {elapsed:.1f}s", "success")
@@ -99,6 +99,15 @@ class BBoxResolutionResult:
return self.bbox_data is not None
@dataclass(slots=True)
class OutputSaveResult:
status: Literal["completed", "failed"]
result_path: str | None
asset_id: str | None = None
storage_key: str | None = None
asset_type: MediaAssetType | None = None
def _emit(emit: EmitFn, order_line_id: str, message: str, level: str | None = None) -> None:
if emit is None:
return
@@ -197,6 +206,100 @@ def resolve_cad_bbox(
)
def _normalize_storage_key(output_path: str) -> str:
upload_prefix = str(app_settings.upload_dir).rstrip("/") + "/"
return output_path[len(upload_prefix):] if output_path.startswith(upload_prefix) else output_path
def _resolve_output_asset_type(output_path: str) -> MediaAssetType:
extension = output_path.rsplit(".", 1)[-1].lower() if "." in output_path else "bin"
return MediaAssetType.turntable if extension in ("mp4", "webm") else MediaAssetType.still
def _resolve_output_mime_type(output_path: str) -> str:
extension = output_path.rsplit(".", 1)[-1].lower() if "." in output_path else "bin"
if extension in ("mp4", "webm"):
return "video/mp4"
if extension == "webp":
return "image/webp"
if extension in ("jpg", "jpeg"):
return "image/jpeg"
return "image/png"
def persist_order_line_output(
session: Session,
line: OrderLine,
*,
success: bool,
output_path: str,
render_log: dict[str, Any] | None,
render_completed_at: datetime | None = None,
) -> OutputSaveResult:
"""Persist the render result for an order line and publish the media asset if needed."""
status: Literal["completed", "failed"] = "completed" if success else "failed"
completed_at = render_completed_at or datetime.utcnow()
line.render_status = status
line.render_completed_at = completed_at
line.render_log = render_log
line.result_path = output_path if success else None
session.flush()
asset_id: str | None = None
storage_key: str | None = None
asset_type: MediaAssetType | None = None
if success:
storage_key = _normalize_storage_key(output_path)
asset_type = _resolve_output_asset_type(output_path)
existing_asset = session.execute(
select(MediaAsset).where(MediaAsset.storage_key == storage_key).limit(1)
).scalar_one_or_none()
if existing_asset is None:
output_file = Path(output_path)
render_config = None
if isinstance(render_log, dict):
render_config = {
key: render_log[key]
for key in (
"renderer",
"engine_used",
"engine",
"samples",
"device_used",
"compute_type",
"total_duration_s",
)
if key in render_log
}
asset = MediaAsset(
tenant_id=line.product.cad_file.tenant_id if (line.product and line.product.cad_file) else None,
order_line_id=line.id,
product_id=line.product_id,
asset_type=asset_type,
storage_key=storage_key,
mime_type=_resolve_output_mime_type(output_path),
file_size_bytes=output_file.stat().st_size if output_file.exists() else None,
width=None,
height=None,
render_config=render_config,
)
session.add(asset)
session.flush()
asset_id = str(asset.id)
else:
asset_id = str(existing_asset.id)
session.commit()
return OutputSaveResult(
status=status,
result_path=line.result_path,
asset_id=asset_id,
storage_key=storage_key,
asset_type=asset_type,
)
def prepare_order_line_render_context(
session: Session,
order_line_id: str,
@@ -5,7 +5,7 @@ import uuid
from pathlib import Path
import pytest
from sqlalchemy import create_engine, text
from sqlalchemy import create_engine, select, text
from sqlalchemy.orm import Session
from app.database import Base
@@ -17,6 +17,7 @@ from app.domains.products.models import CadFile, Product
from app.domains.rendering.models import OutputType, RenderTemplate
from app.domains.rendering.workflow_runtime_services import (
auto_populate_materials_for_cad,
persist_order_line_output,
resolve_cad_bbox,
prepare_order_line_render_context,
resolve_order_line_material_map,
@@ -431,3 +432,114 @@ def test_extract_metadata_bbox_wrappers_delegate_to_runtime_services(monkeypatch
assert _bbox_from_step_cadquery("/tmp/a.step") == {
"dimensions_mm": {"x": 4.0, "y": 5.0, "z": 6.0}
}
def test_persist_order_line_output_saves_success_and_creates_media_asset(sync_session, tmp_path, monkeypatch):
from app.config import settings
upload_dir = tmp_path / "uploads"
monkeypatch.setattr(settings, "upload_dir", str(upload_dir))
line = _seed_order_line_graph(sync_session, tmp_path)
output_path = upload_dir / "renders" / str(line.id) / "bearing.png"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text("PNGDATA", encoding="utf-8")
result = persist_order_line_output(
sync_session,
line,
success=True,
output_path=str(output_path),
render_log={
"renderer": "blender",
"engine": "cycles",
"engine_used": "cycles",
"samples": 64,
"total_duration_s": 1.23,
},
)
sync_session.refresh(line)
asset = sync_session.execute(
select(MediaAsset).where(MediaAsset.order_line_id == line.id)
).scalar_one_or_none()
assert result.status == "completed"
assert result.result_path == str(output_path)
assert result.storage_key == f"renders/{line.id}/bearing.png"
assert result.asset_type == MediaAssetType.still
assert line.render_status == "completed"
assert line.result_path == str(output_path)
assert asset is not None
assert asset.storage_key == f"renders/{line.id}/bearing.png"
assert asset.asset_type == MediaAssetType.still
assert asset.file_size_bytes == output_path.stat().st_size
assert asset.mime_type == "image/png"
assert asset.render_config == {
"renderer": "blender",
"engine": "cycles",
"engine_used": "cycles",
"samples": 64,
"total_duration_s": 1.23,
}
def test_persist_order_line_output_reuses_existing_asset(sync_session, tmp_path, monkeypatch):
from app.config import settings
upload_dir = tmp_path / "uploads"
monkeypatch.setattr(settings, "upload_dir", str(upload_dir))
line = _seed_order_line_graph(sync_session, tmp_path)
output_path = upload_dir / "renders" / str(line.id) / "bearing.mp4"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text("MP4DATA", encoding="utf-8")
existing = MediaAsset(
id=uuid.uuid4(),
order_line_id=line.id,
product_id=line.product_id,
asset_type=MediaAssetType.turntable,
storage_key=f"renders/{line.id}/bearing.mp4",
)
sync_session.add(existing)
sync_session.commit()
result = persist_order_line_output(
sync_session,
line,
success=True,
output_path=str(output_path),
render_log={"renderer": "blender"},
)
assets = sync_session.execute(
select(MediaAsset).where(MediaAsset.storage_key == f"renders/{line.id}/bearing.mp4")
).scalars().all()
assert result.asset_id == str(existing.id)
assert result.asset_type == MediaAssetType.turntable
assert len(assets) == 1
def test_persist_order_line_output_marks_failure_without_result_path(sync_session, tmp_path):
line = _seed_order_line_graph(sync_session, tmp_path)
result = persist_order_line_output(
sync_session,
line,
success=False,
output_path=str(tmp_path / "renders" / "failed.png"),
render_log={"error": "boom"},
)
sync_session.refresh(line)
assets = sync_session.execute(
select(MediaAsset).where(MediaAsset.order_line_id == line.id)
).scalars().all()
assert result.status == "failed"
assert result.result_path is None
assert result.asset_id is None
assert line.render_status == "failed"
assert line.result_path is None
assert line.render_log == {"error": "boom"}
assert assets == []
@@ -22,7 +22,7 @@
- [ ] Missing legacy steps extracted into reusable executors
- [ ] Extracted node behavior matches legacy services
- [ ] Node-level tests cover success and failure paths
- Progress: `order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, and `glb_bbox` are extracted and covered by targeted backend tests; remaining parity nodes are still open.
- Progress: `order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, `glb_bbox`, and `output_save` are extracted and covered by targeted backend tests; remaining parity nodes are still open.
### Phase 4
@@ -56,7 +56,7 @@
- `E3-T4` Extract `material_map_resolve`. `completed`
- `E3-T5` Extract `auto_populate_materials`. `completed`
- `E3-T6` Extract `glb_bbox`. `completed`
- `E3-T7` Extract `output_save`.
- `E3-T7` Extract `output_save`. `completed`
- `E3-T8` Extract `notify`.
- `E3-T9` Add executor tests for all extracted nodes.