Files
HartOMat/scripts/compare_live_still_parity.py

1457 lines
55 KiB
Python

#!/usr/bin/env python3
"""Serial live parity runner for real Blender parity-sensitive output types.
Creates reversible shadow-workflow probes for active Blender output types,
dispatches real renders against the live stack, waits for workflow comparisons,
and prints a compact JSON summary per output type. Still images use the backend
comparison endpoint directly; turntable videos additionally perform a manual
frame-sampling comparison so template-backed Legacy and Graph outputs can be
validated with real Blender renders and identical authoritative settings.
"""
from __future__ import annotations
import argparse
import hashlib
import importlib.util
import json
import mimetypes
import os
import shutil
import subprocess
import sys
import tempfile
import time
from io import BytesIO
from pathlib import Path
from PIL import Image, ImageChops, ImageStat
ROOT = Path(__file__).resolve().parents[1]
HARNESS_PATH = ROOT / "scripts" / "test_render_pipeline.py"
def _load_harness():
spec = importlib.util.spec_from_file_location("live_render_harness", HARNESS_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"Could not load harness from {HARNESS_PATH}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
SUPPORTED_ARTIFACT_KINDS = {"still_image", "thumbnail_image", "turntable_video"}
def _is_real_blender_output_type(output_type: dict, *, include_generated: bool) -> bool:
if not output_type.get("is_active", True):
return False
if output_type.get("renderer") != "blender":
return False
if output_type.get("artifact_kind") not in SUPPORTED_ARTIFACT_KINDS:
return False
if include_generated:
return True
name = str(output_type.get("name") or "")
return not name.startswith("[")
def _normalize_thumbnail_render_params(output_type: dict) -> dict:
normalized = harness_like_normalize(dict(output_type.get("render_settings") or {}))
if "transparent_bg" not in normalized and output_type.get("transparent_bg") is not None:
normalized["transparent_bg"] = bool(output_type.get("transparent_bg"))
return {
key: value
for key, value in normalized.items()
if key in {"render_engine", "samples", "width", "height", "transparent_bg"}
}
def harness_like_normalize(params: dict | None = None) -> dict:
normalized = dict(params or {})
resolution = normalized.pop("resolution", None)
if isinstance(resolution, (list, tuple)) and len(resolution) == 2:
normalized.setdefault("width", int(resolution[0]))
normalized.setdefault("height", int(resolution[1]))
if "engine" in normalized and "render_engine" not in normalized:
normalized["render_engine"] = normalized.pop("engine")
return normalized
def _build_graph_thumbnail_config(*, execution_mode: str, output_type: dict) -> dict:
render_params = _normalize_thumbnail_render_params(output_type)
renderer = str(output_type.get("renderer") or "blender").lower().strip()
render_step = "blender_render" if renderer == "blender" else "threejs_render"
render_node_id = "thumbnail_render"
return {
"version": 1,
"ui": {
"preset": "custom",
"execution_mode": execution_mode,
"family": "cad_file",
"blueprint": "cad_intake",
},
"nodes": [
{
"id": "resolve_step",
"step": "resolve_step_path",
"params": {},
"ui": {"label": "Resolve STEP Path", "position": {"x": 0, "y": 180}},
},
{
"id": "extract_objects",
"step": "occ_object_extract",
"params": {},
"ui": {"label": "Extract STEP Objects", "position": {"x": 220, "y": 180}},
},
{
"id": "export_glb",
"step": "occ_glb_export",
"params": {},
"ui": {"label": "Export GLB", "position": {"x": 440, "y": 180}},
},
{
"id": render_node_id,
"step": render_step,
"params": render_params,
"ui": {"label": "Render Thumbnail", "position": {"x": 680, "y": 180}},
},
{
"id": "save_thumbnail",
"step": "thumbnail_save",
"params": {},
"ui": {"label": "Save Thumbnail", "position": {"x": 920, "y": 180}},
},
],
"edges": [
{"from": "resolve_step", "to": "extract_objects"},
{"from": "extract_objects", "to": "export_glb"},
{"from": "export_glb", "to": render_node_id},
{"from": render_node_id, "to": "save_thumbnail"},
],
}
def _build_graph_turntable_config(*, execution_mode: str) -> dict:
return {
"version": 1,
"ui": {
"preset": "turntable",
"execution_mode": execution_mode,
"family": "order_line",
},
"nodes": [
{
"id": "setup",
"step": "order_line_setup",
"params": {},
"ui": {"label": "Order Line Setup", "position": {"x": 0, "y": 100}},
},
{
"id": "template",
"step": "resolve_template",
"params": {},
"ui": {"label": "Resolve Template", "position": {"x": 220, "y": 100}},
},
{
"id": "populate_materials",
"step": "auto_populate_materials",
"params": {},
"ui": {"label": "Auto Populate Materials", "position": {"x": 220, "y": 260}},
},
{
"id": "bbox",
"step": "glb_bbox",
"params": {},
"ui": {"label": "Compute Bounding Box", "position": {"x": 220, "y": -20}},
},
{
"id": "resolve_materials",
"step": "material_map_resolve",
"params": {},
"ui": {"label": "Resolve Material Map", "position": {"x": 440, "y": 160}},
},
{
"id": "turntable",
"step": "blender_turntable",
"params": {
"use_custom_render_settings": False,
},
"ui": {"type": "renderFramesNode", "label": "Turntable Render", "position": {"x": 440, "y": 100}},
},
{
"id": "output",
"step": "output_save",
"params": {},
"ui": {"type": "outputNode", "label": "Save Output", "position": {"x": 660, "y": 100}},
},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "setup", "to": "populate_materials"},
{"from": "setup", "to": "bbox"},
{"from": "template", "to": "resolve_materials"},
{"from": "populate_materials", "to": "resolve_materials"},
{"from": "resolve_materials", "to": "turntable"},
{"from": "bbox", "to": "turntable"},
{"from": "template", "to": "turntable"},
{"from": "turntable", "to": "output"},
],
}
def _ensure_shadow_probe_workflow(harness, client, *, output_type: dict) -> dict:
workflow_name = f"[Parity Matrix] {output_type['name']}"
workflows = harness.get_workflows(client)
workflow = harness.find_named(workflows, workflow_name)
artifact_kind = output_type.get("artifact_kind")
if artifact_kind == "thumbnail_image":
workflow_config = _build_graph_thumbnail_config(
execution_mode="shadow",
output_type=output_type,
)
elif artifact_kind == "turntable_video":
workflow_config = _build_graph_turntable_config(execution_mode="shadow")
else:
workflow_config = harness.build_graph_still_config(
execution_mode="shadow",
use_custom_render_settings=False,
)
workflow_payload = {
"name": workflow_name,
"output_type_id": output_type["id"],
"config": workflow_config,
"is_active": True,
}
if workflow is None:
resp = client.post("/workflows", json=workflow_payload)
if resp.status_code not in (200, 201):
raise RuntimeError(
f"Shadow probe workflow create failed for {output_type['name']}: "
f"{resp.status_code} {resp.text[:400]}"
)
workflow = resp.json()
else:
resp = client.put(
f"/workflows/{workflow['id']}",
json={
"name": workflow_payload["name"],
"config": workflow_payload["config"],
"is_active": workflow_payload["is_active"],
},
)
if resp.status_code != 200:
raise RuntimeError(
f"Shadow probe workflow update failed for {output_type['name']}: "
f"{resp.status_code} {resp.text[:400]}"
)
workflow = resp.json()
return workflow
def _wait_for_workflow_run_id(
harness,
client,
*,
workflow_id: str,
workflow_run_id: str,
timeout_seconds: int,
) -> dict | None:
deadline = time.time() + timeout_seconds
terminal_statuses = {"completed", "failed", "cancelled"}
while time.time() < deadline:
resp = client.get(f"/workflows/{workflow_id}/runs")
if resp.status_code == 200:
for run in resp.json():
if str(run.get("id")) != workflow_run_id:
continue
if run.get("status") in terminal_statuses:
return run
time.sleep(2)
return None
def _download_bytes(client, path: str) -> tuple[bytes, str | None]:
response = client.session.get(f"{client.host}{path}", timeout=60)
response.raise_for_status()
return response.content, response.headers.get("content-type")
def _build_byte_artifact(payload: bytes, *, path: str | None, mime_type: str | None) -> dict:
image_width = None
image_height = None
if payload:
try:
with Image.open(BytesIO(payload)) as image:
image_width, image_height = image.size
except Exception:
image_width = None
image_height = None
guessed_mime_type = mime_type or mimetypes.guess_type(path or "")[0]
return {
"path": path,
"storage_key": None,
"exists": bool(payload),
"file_size_bytes": len(payload) if payload else None,
"sha256": hashlib.sha256(payload).hexdigest() if payload else None,
"mime_type": guessed_mime_type,
"image_width": image_width,
"image_height": image_height,
}
def _compute_image_delta(authoritative: bytes, observer: bytes) -> tuple[bool | None, float | None]:
if not authoritative or not observer:
return None, None
try:
with Image.open(BytesIO(authoritative)) as authoritative_image, Image.open(BytesIO(observer)) as observer_image:
authoritative_rgba = authoritative_image.convert("RGBA")
observer_rgba = observer_image.convert("RGBA")
if authoritative_rgba.size != observer_rgba.size:
return False, None
diff = ImageChops.difference(authoritative_rgba, observer_rgba)
mean_channels = ImageStat.Stat(diff).mean
return True, sum(mean_channels) / (len(mean_channels) * 255.0)
except Exception:
return None, None
def _build_manual_comparison(
authoritative_bytes: bytes,
observer_bytes: bytes,
*,
authoritative_path: str,
authoritative_mime_type: str | None,
observer_path: str,
observer_mime_type: str | None,
) -> dict:
authoritative_output = _build_byte_artifact(
authoritative_bytes,
path=authoritative_path,
mime_type=authoritative_mime_type,
)
observer_output = _build_byte_artifact(
observer_bytes,
path=observer_path,
mime_type=observer_mime_type,
)
exact_match = (
authoritative_output["sha256"] == observer_output["sha256"]
if authoritative_output["exists"] and observer_output["exists"]
else None
)
dimensions_match, mean_pixel_delta = _compute_image_delta(authoritative_bytes, observer_bytes)
if exact_match is True:
dimensions_match = True
mean_pixel_delta = 0.0
status = "ready"
if not authoritative_output["exists"]:
status = "missing_authoritative"
elif not observer_output["exists"]:
status = "missing_observer"
elif dimensions_match is False:
status = "dimension_mismatch"
elif mean_pixel_delta is None and exact_match is not True:
status = "non_image_or_uncomparable"
summary = (
"Manual thumbnail parity comparison completed."
if status == "ready"
else f"Manual thumbnail parity comparison status={status}."
)
return {
"workflow_run_id": None,
"workflow_def_id": None,
"order_line_id": None,
"execution_mode": "shadow",
"status": status,
"summary": summary,
"authoritative_output": authoritative_output,
"observer_output": observer_output,
"exact_match": exact_match,
"dimensions_match": dimensions_match,
"mean_pixel_delta": mean_pixel_delta,
}
def _parse_fraction(value: str | None) -> float | None:
if not value or value in {"0/0", "N/A"}:
return None
if "/" in value:
left, right = value.split("/", 1)
try:
numerator = float(left)
denominator = float(right)
except ValueError:
return None
if denominator == 0:
return None
return numerator / denominator
try:
return float(value)
except ValueError:
return None
def _ffprobe_video(video_path: Path) -> dict:
command = [
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=codec_name,width,height,avg_frame_rate,nb_frames,duration:format=duration",
"-of",
"json",
str(video_path),
]
result = subprocess.run(command, capture_output=True, text=True, check=True)
payload = json.loads(result.stdout or "{}")
stream = (payload.get("streams") or [{}])[0]
format_info = payload.get("format") or {}
frame_count = stream.get("nb_frames")
try:
frame_count = int(frame_count) if frame_count not in (None, "", "N/A") else None
except ValueError:
frame_count = None
duration = stream.get("duration") or format_info.get("duration")
try:
duration_value = float(duration) if duration not in (None, "", "N/A") else None
except ValueError:
duration_value = None
width = stream.get("width")
height = stream.get("height")
try:
width = int(width) if width is not None else None
except ValueError:
width = None
try:
height = int(height) if height is not None else None
except ValueError:
height = None
return {
"codec_name": stream.get("codec_name"),
"width": width,
"height": height,
"avg_frame_rate_raw": stream.get("avg_frame_rate"),
"avg_frame_rate": _parse_fraction(stream.get("avg_frame_rate")),
"frame_count": frame_count,
"duration_s": duration_value,
}
def _extract_video_frame(video_path: Path, *, frame_index: int, target_path: Path) -> None:
command = [
"ffmpeg",
"-v",
"error",
"-y",
"-i",
str(video_path),
"-vf",
f"select=eq(n\\,{frame_index})",
"-vsync",
"vfr",
"-frames:v",
"1",
str(target_path),
]
subprocess.run(command, capture_output=True, text=True, check=True)
def _sample_frame_indexes(frame_count: int | None) -> list[int]:
if frame_count is None or frame_count <= 1:
return [0]
last_index = max(frame_count - 1, 0)
indexes = {0, last_index}
indexes.add(last_index // 2)
if last_index >= 4:
indexes.add(last_index // 4)
indexes.add((3 * last_index) // 4)
return sorted(indexes)
def _mean_or_none(values: list[float | None]) -> float | None:
filtered = [value for value in values if value is not None]
if not filtered:
return None
return sum(filtered) / len(filtered)
def _build_manual_video_comparison(
authoritative_bytes: bytes,
observer_bytes: bytes,
*,
authoritative_path: str,
authoritative_mime_type: str | None,
observer_path: str,
observer_mime_type: str | None,
) -> dict:
authoritative_output = _build_byte_artifact(
authoritative_bytes,
path=authoritative_path,
mime_type=authoritative_mime_type,
)
observer_output = _build_byte_artifact(
observer_bytes,
path=observer_path,
mime_type=observer_mime_type,
)
exact_match = (
authoritative_output["sha256"] == observer_output["sha256"]
if authoritative_output["exists"] and observer_output["exists"]
else None
)
if exact_match is True:
return {
"workflow_run_id": None,
"workflow_def_id": None,
"order_line_id": None,
"execution_mode": "shadow",
"status": "ready",
"summary": "Manual turntable parity comparison completed (byte-identical MP4).",
"authoritative_output": authoritative_output,
"observer_output": observer_output,
"exact_match": True,
"dimensions_match": True,
"mean_pixel_delta": 0.0,
"video_metadata": {},
"frame_samples": [],
}
with tempfile.TemporaryDirectory(prefix="hartomat-turntable-parity-") as temp_dir_raw:
temp_dir = Path(temp_dir_raw)
authoritative_video_path = temp_dir / "authoritative.mp4"
observer_video_path = temp_dir / "observer.mp4"
authoritative_video_path.write_bytes(authoritative_bytes)
observer_video_path.write_bytes(observer_bytes)
authoritative_video = _ffprobe_video(authoritative_video_path)
observer_video = _ffprobe_video(observer_video_path)
dimensions_match = (
authoritative_video.get("width") == observer_video.get("width")
and authoritative_video.get("height") == observer_video.get("height")
and authoritative_video.get("width") is not None
and authoritative_video.get("height") is not None
)
frame_count_match = authoritative_video.get("frame_count") == observer_video.get("frame_count")
sample_count_source = authoritative_video.get("frame_count")
if sample_count_source is None:
sample_count_source = observer_video.get("frame_count")
frame_indexes = _sample_frame_indexes(sample_count_source)
frame_samples: list[dict] = []
sample_deltas: list[float | None] = []
sample_dimensions: list[bool | None] = []
for frame_index in frame_indexes:
authoritative_frame_path = temp_dir / f"authoritative-{frame_index:04d}.png"
observer_frame_path = temp_dir / f"observer-{frame_index:04d}.png"
_extract_video_frame(authoritative_video_path, frame_index=frame_index, target_path=authoritative_frame_path)
_extract_video_frame(observer_video_path, frame_index=frame_index, target_path=observer_frame_path)
authoritative_frame = authoritative_frame_path.read_bytes()
observer_frame = observer_frame_path.read_bytes()
frame_dimensions_match, frame_delta = _compute_image_delta(authoritative_frame, observer_frame)
frame_exact_match = hashlib.sha256(authoritative_frame).hexdigest() == hashlib.sha256(observer_frame).hexdigest()
if frame_exact_match:
frame_dimensions_match = True
frame_delta = 0.0
sample_deltas.append(frame_delta)
sample_dimensions.append(frame_dimensions_match)
frame_samples.append(
{
"frame_index": frame_index,
"exact_match": frame_exact_match,
"dimensions_match": frame_dimensions_match,
"mean_pixel_delta": frame_delta,
}
)
mean_pixel_delta = _mean_or_none(sample_deltas)
sampled_dimensions_match = all(value is not False for value in sample_dimensions) if sample_dimensions else None
status = "ready"
if not authoritative_output["exists"]:
status = "missing_authoritative"
elif not observer_output["exists"]:
status = "missing_observer"
elif dimensions_match is False or sampled_dimensions_match is False:
status = "dimension_mismatch"
elif frame_count_match is False:
status = "frame_count_mismatch"
elif mean_pixel_delta is None:
status = "non_image_or_uncomparable"
summary = (
"Manual turntable parity comparison completed."
if status == "ready"
else f"Manual turntable parity comparison status={status}."
)
return {
"workflow_run_id": None,
"workflow_def_id": None,
"order_line_id": None,
"execution_mode": "shadow",
"status": status,
"summary": summary,
"authoritative_output": authoritative_output,
"observer_output": observer_output,
"exact_match": exact_match,
"dimensions_match": dimensions_match if sampled_dimensions_match is not False else False,
"mean_pixel_delta": mean_pixel_delta,
"video_metadata": {
"authoritative": authoritative_video,
"observer": observer_video,
"frame_count_match": frame_count_match,
},
"frame_samples": frame_samples,
}
def _wait_for_turntable_asset(
harness,
client,
*,
order_line_id: str,
workflow_run_id: str,
timeout_seconds: int,
) -> tuple[dict, dict]:
deadline = time.time() + timeout_seconds
while time.time() < deadline:
assets = harness.list_media_assets(
client,
order_line_id=order_line_id,
asset_type="turntable",
)
observer_asset = None
authoritative_asset = None
for asset in assets:
if str(asset.get("workflow_run_id")) == workflow_run_id:
observer_asset = asset
elif authoritative_asset is None and asset.get("download_url"):
authoritative_asset = asset
if authoritative_asset is not None and observer_asset is not None:
return authoritative_asset, observer_asset
time.sleep(2)
raise RuntimeError(
f"Timed out waiting for turntable assets for order_line={order_line_id} workflow_run={workflow_run_id}"
)
def _unsupported_output_type_reason(output_type: dict, templates: list[dict]) -> str | None:
artifact_kind = output_type.get("artifact_kind")
if artifact_kind == "turntable_video" and (output_type.get("render_settings") or {}).get("cinematic"):
return "Cinematic legacy path has no graph-equivalent node today; turntable parity is not comparable."
if artifact_kind == "blend_asset":
return "No real legacy blend-export parity path exists today."
if artifact_kind == "turntable_video" and not templates:
return "Turntable parity requires a real render template link."
return None
def _find_thumbnail_asset_for_run(
client,
*,
cad_file_id: str,
workflow_run_id: str,
timeout_seconds: int,
) -> dict | None:
deadline = time.time() + timeout_seconds
while time.time() < deadline:
resp = client.get(
"/media",
params={
"cad_file_id": cad_file_id,
"asset_type": "thumbnail",
"limit": 50,
},
)
if resp.status_code == 200:
for asset in resp.json():
if str(asset.get("workflow_run_id")) == workflow_run_id:
return asset
time.sleep(2)
return None
def _run_single_output_type(harness, client, *, cad_file_id: str, output_type: dict) -> dict:
templates = harness.render_template_candidates_for_output_type(
harness.get_render_templates(client),
output_type["id"],
)
template_backed = bool(templates)
snapshot = harness.build_output_type_workflow_snapshot(output_type)
workflow = None
product_id = harness.get_or_create_test_product(client, cad_file_id)
if not product_id:
raise RuntimeError(f"Could not resolve product for CAD file {cad_file_id}")
try:
if template_backed:
resources = harness.ensure_template_parity_shadow_resources(
client,
output_type=output_type,
)
bound_output_type = resources["output_type"]
workflow = resources["workflow"]
else:
workflow = _ensure_shadow_probe_workflow(
harness,
client,
output_type=output_type,
)
resp = client.patch(
f"/output-types/{output_type['id']}",
json=harness.build_output_type_workflow_link_payload(
workflow_definition_id=workflow["id"],
execution_mode="shadow",
),
)
if resp.status_code != 200:
raise RuntimeError(
f"Output type shadow link failed for {output_type['name']}: "
f"{resp.status_code} {resp.text[:400]}"
)
bound_output_type = resp.json()
order = harness.create_test_order(
client,
product_id=product_id,
output_type_ids=[bound_output_type["id"]],
test_label=f"Still Parity Matrix [{bound_output_type['name']}]",
)
if order is None:
raise RuntimeError(f"Order creation failed for {bound_output_type['name']}")
lines = order.get("lines", [])
if len(lines) != 1:
raise RuntimeError(
f"Expected exactly one order line for {bound_output_type['name']}, got {len(lines)}"
)
line_id = lines[0]["id"]
resp_preflight = client.get(
f"/workflows/{workflow['id']}/preflight",
params={"context_id": line_id},
)
if resp_preflight.status_code != 200:
raise RuntimeError(
f"Workflow preflight failed for {bound_output_type['name']}: "
f"{resp_preflight.status_code} {resp_preflight.text[:400]}"
)
preflight = resp_preflight.json()
if not preflight.get("graph_dispatch_allowed"):
raise RuntimeError(
f"Workflow preflight blocked dispatch for {bound_output_type['name']}: "
f"{preflight.get('summary')}"
)
success = harness._submit_and_wait(
client,
order,
[bound_output_type["id"]],
use_graph_dispatch=False,
)
if not success:
raise RuntimeError(f"Render dispatch did not complete successfully for {bound_output_type['name']}")
workflow_run = harness.wait_for_workflow_run(
client,
workflow_id=workflow["id"],
line_id=line_id,
timeout_seconds=harness.WORKFLOW_RUN_TIMEOUT_SECONDS,
terminal_only=True,
)
if workflow_run is None:
raise RuntimeError(f"Workflow run not found for {bound_output_type['name']}")
comparison = harness.wait_for_workflow_comparison(
client,
workflow_run_id=workflow_run["id"],
timeout_seconds=harness.WORKFLOW_COMPARISON_TIMEOUT_SECONDS,
)
if comparison is None:
raise RuntimeError(f"Workflow comparison did not stabilize for {bound_output_type['name']}")
rollout_gate = harness.evaluate_rollout_gate_from_comparison(comparison)
template_node = harness._node_result_by_name(workflow_run, "template")
render_node = harness._node_result_by_name(workflow_run, "render")
return {
"output_type": {
"id": bound_output_type["id"],
"name": bound_output_type["name"],
"format": bound_output_type.get("output_format"),
"transparent_bg": bound_output_type.get("transparent_bg"),
"artifact_kind": bound_output_type.get("artifact_kind"),
"render_settings": bound_output_type.get("render_settings"),
"invocation_overrides": bound_output_type.get("invocation_overrides"),
},
"template_backed": template_backed,
"templates": [
{
"name": template.get("name"),
"blend_file_path": template.get("blend_file_path"),
"lighting_only": template.get("lighting_only"),
"shadow_catcher_enabled": template.get("shadow_catcher_enabled"),
"target_collection": template.get("target_collection"),
}
for template in templates
],
"workflow": {
"id": workflow.get("id"),
"name": workflow.get("name"),
},
"preflight": {
"execution_mode": preflight.get("execution_mode"),
"context_kind": preflight.get("context_kind"),
"graph_dispatch_allowed": preflight.get("graph_dispatch_allowed"),
},
"order_line_id": line_id,
"workflow_run": {
"id": workflow_run.get("id"),
"status": workflow_run.get("status"),
"execution_mode": workflow_run.get("execution_mode"),
},
"template_resolution": template_node.get("output") if template_node else None,
"render_output": render_node.get("output") if render_node else None,
"comparison": comparison,
"rollout_gate": rollout_gate,
}
finally:
harness.restore_output_type_workflow_snapshot(
client,
output_type_id=output_type["id"],
snapshot=snapshot,
)
def _run_single_turntable_output_type_with_product(harness, client, *, product_id: str, output_type: dict) -> dict:
templates = harness.render_template_candidates_for_output_type(
harness.get_render_templates(client),
output_type["id"],
)
unsupported_reason = _unsupported_output_type_reason(output_type, templates)
if unsupported_reason is not None:
return {
"output_type": {
"id": output_type["id"],
"name": output_type["name"],
"format": output_type.get("output_format"),
"transparent_bg": output_type.get("transparent_bg"),
"artifact_kind": output_type.get("artifact_kind"),
"render_settings": output_type.get("render_settings"),
"invocation_overrides": output_type.get("invocation_overrides"),
},
"template_backed": bool(templates),
"templates": [
{
"name": template.get("name"),
"blend_file_path": template.get("blend_file_path"),
"lighting_only": template.get("lighting_only"),
"shadow_catcher_enabled": template.get("shadow_catcher_enabled"),
"target_collection": template.get("target_collection"),
}
for template in templates
],
"workflow": None,
"preflight": None,
"workflow_run": None,
"comparison": {
"status": "unsupported_parity_path",
"summary": unsupported_reason,
"authoritative_output": {"exists": False},
"observer_output": {"exists": False},
"exact_match": None,
"dimensions_match": None,
"mean_pixel_delta": None,
},
"rollout_gate": {
"verdict": "fail",
"ready": False,
"reasons": [unsupported_reason],
},
"skipped": True,
}
snapshot = harness.build_output_type_workflow_snapshot(output_type)
workflow = None
try:
workflow = _ensure_shadow_probe_workflow(
harness,
client,
output_type=output_type,
)
resp = client.patch(
f"/output-types/{output_type['id']}",
json=harness.build_output_type_workflow_link_payload(
workflow_definition_id=workflow["id"],
execution_mode="shadow",
),
)
if resp.status_code != 200:
raise RuntimeError(
f"Output type shadow link failed for {output_type['name']}: "
f"{resp.status_code} {resp.text[:400]}"
)
bound_output_type = resp.json()
order = harness.create_test_order(
client,
product_id=product_id,
output_type_ids=[bound_output_type["id"]],
test_label=f"Turntable Parity Matrix [{bound_output_type['name']}]",
)
if order is None:
raise RuntimeError(f"Order creation failed for {bound_output_type['name']}")
lines = order.get("lines", [])
if len(lines) != 1:
raise RuntimeError(
f"Expected exactly one order line for {bound_output_type['name']}, got {len(lines)}"
)
line_id = lines[0]["id"]
resp_preflight = client.get(
f"/workflows/{workflow['id']}/preflight",
params={"context_id": line_id},
)
if resp_preflight.status_code != 200:
raise RuntimeError(
f"Workflow preflight failed for {bound_output_type['name']}: "
f"{resp_preflight.status_code} {resp_preflight.text[:400]}"
)
preflight = resp_preflight.json()
if not preflight.get("graph_dispatch_allowed"):
raise RuntimeError(
f"Workflow preflight blocked dispatch for {bound_output_type['name']}: "
f"{preflight.get('summary')}"
)
success = harness._submit_and_wait(
client,
order,
[bound_output_type["id"]],
use_graph_dispatch=False,
)
if not success:
raise RuntimeError(f"Render dispatch did not complete successfully for {bound_output_type['name']}")
workflow_run = harness.wait_for_workflow_run(
client,
workflow_id=workflow["id"],
line_id=line_id,
timeout_seconds=harness.WORKFLOW_RUN_TIMEOUT_SECONDS,
terminal_only=True,
)
if workflow_run is None:
raise RuntimeError(f"Workflow run not found for {bound_output_type['name']}")
comparison_seed = harness.wait_for_workflow_comparison(
client,
workflow_run_id=workflow_run["id"],
timeout_seconds=harness.WORKFLOW_COMPARISON_TIMEOUT_SECONDS,
)
if comparison_seed is None:
raise RuntimeError(f"Workflow comparison did not stabilize for {bound_output_type['name']}")
authoritative_asset, observer_asset = _wait_for_turntable_asset(
harness,
client,
order_line_id=line_id,
workflow_run_id=str(workflow_run["id"]),
timeout_seconds=harness.WORKFLOW_COMPARISON_TIMEOUT_SECONDS,
)
authoritative_download_url = authoritative_asset.get("download_url")
observer_download_url = observer_asset.get("download_url")
if not authoritative_download_url or not observer_download_url:
raise RuntimeError("Turntable media asset download URL missing")
authoritative_bytes, authoritative_mime_type = _download_bytes(client, authoritative_download_url)
observer_bytes, observer_mime_type = _download_bytes(client, observer_download_url)
comparison = _build_manual_video_comparison(
authoritative_bytes,
observer_bytes,
authoritative_path=authoritative_download_url,
authoritative_mime_type=authoritative_mime_type,
observer_path=observer_download_url,
observer_mime_type=observer_mime_type,
)
comparison["workflow_run_id"] = workflow_run["id"]
comparison["workflow_def_id"] = workflow["id"]
comparison["order_line_id"] = line_id
comparison["comparison_seed"] = comparison_seed
rollout_gate = harness.evaluate_rollout_gate_from_comparison(comparison)
template_node = harness._node_result_by_name(workflow_run, "template")
render_node = harness._node_result_by_name(workflow_run, "turntable")
return {
"output_type": {
"id": bound_output_type["id"],
"name": bound_output_type["name"],
"format": bound_output_type.get("output_format"),
"transparent_bg": bound_output_type.get("transparent_bg"),
"artifact_kind": bound_output_type.get("artifact_kind"),
"render_settings": bound_output_type.get("render_settings"),
"invocation_overrides": bound_output_type.get("invocation_overrides"),
},
"template_backed": bool(templates),
"templates": [
{
"name": template.get("name"),
"blend_file_path": template.get("blend_file_path"),
"lighting_only": template.get("lighting_only"),
"shadow_catcher_enabled": template.get("shadow_catcher_enabled"),
"target_collection": template.get("target_collection"),
"camera_orbit": template.get("camera_orbit"),
}
for template in templates
],
"workflow": {
"id": workflow.get("id"),
"name": workflow.get("name"),
},
"preflight": {
"execution_mode": preflight.get("execution_mode"),
"context_kind": preflight.get("context_kind"),
"graph_dispatch_allowed": preflight.get("graph_dispatch_allowed"),
},
"order_line_id": line_id,
"workflow_run": {
"id": workflow_run.get("id"),
"status": workflow_run.get("status"),
"execution_mode": workflow_run.get("execution_mode"),
},
"template_resolution": template_node.get("output") if template_node else None,
"render_output": render_node.get("output") if render_node else None,
"comparison": comparison,
"rollout_gate": rollout_gate,
}
finally:
harness.restore_output_type_workflow_snapshot(
client,
output_type_id=output_type["id"],
snapshot=snapshot,
)
def _run_single_thumbnail_output_type(harness, client, *, cad_file_id: str, output_type: dict) -> dict:
workflow = _ensure_shadow_probe_workflow(
harness,
client,
output_type=output_type,
)
resp_preflight = client.get(
f"/workflows/{workflow['id']}/preflight",
params={"context_id": cad_file_id},
)
if resp_preflight.status_code != 200:
raise RuntimeError(
f"Workflow preflight failed for {output_type['name']}: "
f"{resp_preflight.status_code} {resp_preflight.text[:400]}"
)
preflight = resp_preflight.json()
if not preflight.get("graph_dispatch_allowed"):
raise RuntimeError(
f"Workflow preflight blocked dispatch for {output_type['name']}: "
f"{preflight.get('summary')}"
)
resp_dispatch = client.post(
f"/workflows/{workflow['id']}/dispatch",
params={"context_id": cad_file_id},
)
if resp_dispatch.status_code != 200:
raise RuntimeError(
f"Workflow dispatch failed for {output_type['name']}: "
f"{resp_dispatch.status_code} {resp_dispatch.text[:400]}"
)
dispatch_payload = resp_dispatch.json()
workflow_run_id = str(dispatch_payload["workflow_run"]["id"])
workflow_run = _wait_for_workflow_run_id(
harness,
client,
workflow_id=str(workflow["id"]),
workflow_run_id=workflow_run_id,
timeout_seconds=harness.WORKFLOW_RUN_TIMEOUT_SECONDS,
)
if workflow_run is None:
raise RuntimeError(f"Workflow run did not reach terminal status for {output_type['name']}")
asset = _find_thumbnail_asset_for_run(
client,
cad_file_id=cad_file_id,
workflow_run_id=workflow_run_id,
timeout_seconds=harness.WORKFLOW_COMPARISON_TIMEOUT_SECONDS,
)
if asset is None:
raise RuntimeError(f"Shadow thumbnail asset not found for workflow run {workflow_run_id}")
authoritative_bytes, authoritative_mime_type = _download_bytes(
client,
f"/api/cad/{cad_file_id}/thumbnail",
)
observer_path = asset.get("download_url")
if not observer_path:
raise RuntimeError(f"Thumbnail asset {asset.get('id')} is missing download_url")
observer_bytes, observer_mime_type = _download_bytes(client, observer_path)
comparison = _build_manual_comparison(
authoritative_bytes,
observer_bytes,
authoritative_path=f"/api/cad/{cad_file_id}/thumbnail",
authoritative_mime_type=authoritative_mime_type,
observer_path=observer_path,
observer_mime_type=observer_mime_type,
)
comparison["workflow_run_id"] = workflow_run_id
comparison["workflow_def_id"] = workflow.get("id")
rollout_gate = harness.evaluate_rollout_gate_from_comparison(comparison)
return {
"output_type": {
"id": output_type["id"],
"name": output_type["name"],
"format": output_type.get("output_format"),
"transparent_bg": output_type.get("transparent_bg"),
"render_settings": output_type.get("render_settings"),
"invocation_overrides": output_type.get("invocation_overrides"),
"artifact_kind": output_type.get("artifact_kind"),
},
"template_backed": False,
"templates": [],
"workflow": {
"id": workflow.get("id"),
"name": workflow.get("name"),
},
"preflight": {
"execution_mode": preflight.get("execution_mode"),
"context_kind": preflight.get("context_kind"),
"graph_dispatch_allowed": preflight.get("graph_dispatch_allowed"),
},
"cad_file_id": cad_file_id,
"thumbnail_asset_id": asset.get("id"),
"workflow_run": {
"id": workflow_run.get("id"),
"status": workflow_run.get("status"),
"execution_mode": workflow_run.get("execution_mode"),
},
"comparison": comparison,
"rollout_gate": rollout_gate,
}
def main() -> int:
harness = _load_harness()
parser = argparse.ArgumentParser(description="Compare live legacy vs shadow renders for real Blender parity output types")
parser.add_argument("--host", default=os.environ.get("TEST_HOST", "http://localhost:8888"))
parser.add_argument("--email", default=os.environ.get("TEST_EMAIL", "admin@hartomat.com"))
parser.add_argument("--password", default=os.environ.get("TEST_PASSWORD", "Admin1234!"))
parser.add_argument("--step", default=str(harness.SAMPLE_STEP))
parser.add_argument("--product-id", default=None, help="Existing product id to use instead of creating/reusing one from --step")
parser.add_argument("--cad-file-id", default=None, help="CAD file id for --product-id; inferred from /api/products when omitted")
parser.add_argument("--include-generated", action="store_true")
parser.add_argument(
"--artifact-kind",
action="append",
default=[],
help="Restrict to artifact kinds like still_image, thumbnail_image, turntable_video; repeatable",
)
parser.add_argument("--output", default=None, help="Optional path for JSON report")
parser.add_argument("--only", action="append", default=[], help="Only run the named output type; repeatable")
args = parser.parse_args()
client = harness.APIClient(args.host, args.email, args.password)
health_ok = harness.test_health(client)
if not health_ok:
raise RuntimeError("Render stack health check failed")
if args.product_id:
product_resp = client.get(f"/products/{args.product_id}")
if product_resp.status_code != 200:
raise RuntimeError(
f"Could not load product {args.product_id}: {product_resp.status_code} {product_resp.text[:400]}"
)
product = product_resp.json()
cad_file_id = args.cad_file_id or product.get("cad_file_id")
if not cad_file_id:
raise RuntimeError(f"Product {args.product_id} has no cad_file_id")
fixed_product_id = args.product_id
print(f"Using existing product {fixed_product_id} with cad_file_id {cad_file_id}", flush=True)
else:
cad_file_id = harness.test_step_upload(client, Path(args.step))
if not cad_file_id:
raise RuntimeError("STEP upload / CAD processing failed")
fixed_product_id = None
output_types = [
ot
for ot in harness.get_output_types(client, include_inactive=True)
if _is_real_blender_output_type(ot, include_generated=args.include_generated)
]
if args.artifact_kind:
wanted_artifact_kinds = set(args.artifact_kind)
output_types = [ot for ot in output_types if ot.get("artifact_kind") in wanted_artifact_kinds]
if args.only:
wanted = set(args.only)
output_types = [ot for ot in output_types if ot.get("name") in wanted]
output_types.sort(key=lambda item: item.get("name") or "")
if not output_types:
raise RuntimeError("No eligible still-image or thumbnail output types found")
results: list[dict] = []
for output_type in output_types:
print(f"\n=== {output_type['name']} ===", flush=True)
artifact_kind = output_type.get("artifact_kind")
if artifact_kind == "thumbnail_image":
result = _run_single_thumbnail_output_type(
harness,
client,
cad_file_id=cad_file_id,
output_type=output_type,
)
elif artifact_kind == "turntable_video":
if fixed_product_id is None:
product_id = harness.get_or_create_test_product(client, cad_file_id)
if not product_id:
raise RuntimeError(f"Could not resolve product for CAD file {cad_file_id}")
else:
product_id = fixed_product_id
result = _run_single_turntable_output_type_with_product(
harness,
client,
product_id=product_id,
output_type=output_type,
)
else:
if fixed_product_id is None:
result = _run_single_output_type(
harness,
client,
cad_file_id=cad_file_id,
output_type=output_type,
)
else:
result = _run_single_output_type_with_product(
harness,
client,
product_id=fixed_product_id,
output_type=output_type,
)
results.append(result)
summary = {
"name": result["output_type"]["name"],
"artifact_kind": result["output_type"].get("artifact_kind"),
"template_backed": result["template_backed"],
"skipped": bool(result.get("skipped")),
"exact_match": result["comparison"].get("exact_match"),
"status": result["comparison"].get("status"),
"mean_pixel_delta": result["comparison"].get("mean_pixel_delta"),
"rollout_verdict": result["rollout_gate"].get("verdict"),
"workflow_run_id": result["workflow_run"]["id"] if result.get("workflow_run") else None,
"order_line_id": result.get("order_line_id"),
"cad_file_id": result.get("cad_file_id"),
}
print(json.dumps(summary, ensure_ascii=False), flush=True)
report = {
"host": args.host,
"cad_file_id": cad_file_id,
"product_id": fixed_product_id,
"results": results,
}
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(report, indent=2, ensure_ascii=False) + "\n")
print(f"\nWrote report to {output_path}", flush=True)
overall = {
"total": len(results),
"exact_match": sum(1 for item in results if item["comparison"].get("exact_match") is True),
"pass": sum(1 for item in results if item["rollout_gate"].get("verdict") == "pass"),
"warn": sum(1 for item in results if item["rollout_gate"].get("verdict") == "warn"),
"fail": sum(1 for item in results if item["rollout_gate"].get("verdict") == "fail"),
"skipped": sum(1 for item in results if item.get("skipped")),
}
print("\n=== Overall ===", flush=True)
print(json.dumps(overall, ensure_ascii=False), flush=True)
return 0
def _run_single_output_type_with_product(harness, client, *, product_id: str, output_type: dict) -> dict:
templates = harness.render_template_candidates_for_output_type(
harness.get_render_templates(client),
output_type["id"],
)
template_backed = bool(templates)
snapshot = harness.build_output_type_workflow_snapshot(output_type)
workflow = None
try:
if template_backed:
resources = harness.ensure_template_parity_shadow_resources(
client,
output_type=output_type,
)
bound_output_type = resources["output_type"]
workflow = resources["workflow"]
else:
workflow = _ensure_shadow_probe_workflow(
harness,
client,
output_type=output_type,
)
resp = client.patch(
f"/output-types/{output_type['id']}",
json=harness.build_output_type_workflow_link_payload(
workflow_definition_id=workflow["id"],
execution_mode="shadow",
),
)
if resp.status_code != 200:
raise RuntimeError(
f"Output type shadow link failed for {output_type['name']}: "
f"{resp.status_code} {resp.text[:400]}"
)
bound_output_type = resp.json()
order = harness.create_test_order(
client,
product_id=product_id,
output_type_ids=[bound_output_type["id"]],
test_label=f"Still Parity Matrix [{bound_output_type['name']}]",
)
if order is None:
raise RuntimeError(f"Order creation failed for {bound_output_type['name']}")
lines = order.get("lines", [])
if len(lines) != 1:
raise RuntimeError(
f"Expected exactly one order line for {bound_output_type['name']}, got {len(lines)}"
)
line_id = lines[0]["id"]
resp_preflight = client.get(
f"/workflows/{workflow['id']}/preflight",
params={"context_id": line_id},
)
if resp_preflight.status_code != 200:
raise RuntimeError(
f"Workflow preflight failed for {bound_output_type['name']}: "
f"{resp_preflight.status_code} {resp_preflight.text[:400]}"
)
preflight = resp_preflight.json()
if not preflight.get("graph_dispatch_allowed"):
raise RuntimeError(
f"Workflow preflight blocked dispatch for {bound_output_type['name']}: "
f"{preflight.get('summary')}"
)
success = harness._submit_and_wait(
client,
order,
[bound_output_type["id"]],
use_graph_dispatch=False,
)
if not success:
raise RuntimeError(f"Render dispatch did not complete successfully for {bound_output_type['name']}")
workflow_run = harness.wait_for_workflow_run(
client,
workflow_id=workflow["id"],
line_id=line_id,
timeout_seconds=harness.WORKFLOW_RUN_TIMEOUT_SECONDS,
terminal_only=True,
)
if workflow_run is None:
raise RuntimeError(f"Workflow run not found for {bound_output_type['name']}")
comparison = harness.wait_for_workflow_comparison(
client,
workflow_run_id=workflow_run["id"],
timeout_seconds=harness.WORKFLOW_COMPARISON_TIMEOUT_SECONDS,
)
if comparison is None:
raise RuntimeError(f"Workflow comparison did not stabilize for {bound_output_type['name']}")
rollout_gate = harness.evaluate_rollout_gate_from_comparison(comparison)
template_node = harness._node_result_by_name(workflow_run, "template")
render_node = harness._node_result_by_name(workflow_run, "render")
return {
"output_type": {
"id": bound_output_type["id"],
"name": bound_output_type["name"],
"format": bound_output_type.get("output_format"),
"transparent_bg": bound_output_type.get("transparent_bg"),
"artifact_kind": bound_output_type.get("artifact_kind"),
"render_settings": bound_output_type.get("render_settings"),
"invocation_overrides": bound_output_type.get("invocation_overrides"),
},
"template_backed": template_backed,
"templates": [
{
"name": template.get("name"),
"blend_file_path": template.get("blend_file_path"),
"lighting_only": template.get("lighting_only"),
"shadow_catcher_enabled": template.get("shadow_catcher_enabled"),
"target_collection": template.get("target_collection"),
}
for template in templates
],
"workflow": {
"id": workflow.get("id"),
"name": workflow.get("name"),
},
"preflight": {
"execution_mode": preflight.get("execution_mode"),
"context_kind": preflight.get("context_kind"),
"graph_dispatch_allowed": preflight.get("graph_dispatch_allowed"),
},
"order_line_id": line_id,
"workflow_run": {
"id": workflow_run.get("id"),
"status": workflow_run.get("status"),
"execution_mode": workflow_run.get("execution_mode"),
},
"template_resolution": template_node.get("output") if template_node else None,
"render_output": render_node.get("output") if render_node else None,
"comparison": comparison,
"rollout_gate": rollout_gate,
}
finally:
harness.restore_output_type_workflow_snapshot(
client,
output_type_id=output_type["id"],
snapshot=snapshot,
)
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
raise SystemExit(130)