Files
HartOMat/scripts/compare_live_turntable_parity.py

544 lines
22 KiB
Python

#!/usr/bin/env python3
"""Serial live parity runner for real Blender turntable output types.
Creates reversible shadow-workflow probes for active template-backed turntable
output types, dispatches real renders against the live stack, waits for the
legacy + shadow outputs, and compares the resulting MP4s frame-by-frame.
"""
from __future__ import annotations
import argparse
import hashlib
import importlib.util
import json
import os
import subprocess
import tempfile
from pathlib import Path
from PIL import Image, ImageChops, ImageStat
ROOT = Path(__file__).resolve().parents[1]
HARNESS_PATH = ROOT / "scripts" / "test_render_pipeline.py"
TURNABLE_RENDER_TIMEOUT_SECONDS = 3600
TURNABLE_WORKFLOW_RUN_TIMEOUT_SECONDS = 3600
TURNABLE_WORKFLOW_COMPARISON_TIMEOUT_SECONDS = 1200
TURNABLE_ASSET_TIMEOUT_SECONDS = 600
def _load_harness():
spec = importlib.util.spec_from_file_location("live_render_harness", HARNESS_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"Could not load harness from {HARNESS_PATH}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _is_real_turntable_output_type(output_type: dict, *, include_generated: bool) -> bool:
if not output_type.get("is_active", True):
return False
if output_type.get("renderer") != "blender":
return False
if output_type.get("artifact_kind") != "turntable_video":
return False
if not output_type.get("is_animation"):
return False
if include_generated:
return True
name = str(output_type.get("name") or "")
return not name.startswith("[")
def _build_turntable_shadow_probe_config(harness) -> dict:
config = harness.build_graph_turntable_config(execution_mode="shadow")
for node in config.get("nodes", []):
if node.get("id") == "turntable":
node["params"] = {}
return config
def _ensure_shadow_probe_workflow(harness, client, *, output_type: dict) -> dict:
workflow_name = f"[Turntable Parity] {output_type['name']}"
workflows = harness.get_workflows(client)
workflow = harness.find_named(workflows, workflow_name)
workflow_payload = {
"name": workflow_name,
"output_type_id": output_type["id"],
"config": _build_turntable_shadow_probe_config(harness),
"is_active": True,
}
if workflow is None:
resp = client.post("/workflows", json=workflow_payload)
if resp.status_code not in (200, 201):
raise RuntimeError(
f"Shadow probe workflow create failed for {output_type['name']}: "
f"{resp.status_code} {resp.text[:400]}"
)
workflow = resp.json()
else:
resp = client.put(
f"/workflows/{workflow['id']}",
json={
"name": workflow_payload["name"],
"config": workflow_payload["config"],
"is_active": workflow_payload["is_active"],
},
)
if resp.status_code != 200:
raise RuntimeError(
f"Shadow probe workflow update failed for {output_type['name']}: "
f"{resp.status_code} {resp.text[:400]}"
)
workflow = resp.json()
return workflow
def _download_bytes(client, path: str) -> bytes:
response = client.session.get(f"{client.host}{path}", timeout=120)
response.raise_for_status()
return response.content
def _write_temp_file(tmpdir: Path, *, name: str, payload: bytes) -> Path:
path = tmpdir / name
path.write_bytes(payload)
return path
def _probe_video(video_path: Path) -> dict:
cmd = [
"ffprobe",
"-v",
"error",
"-print_format",
"json",
"-show_streams",
"-show_format",
str(video_path),
]
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
payload = json.loads(result.stdout or "{}")
streams = payload.get("streams") or []
video_stream = next((stream for stream in streams if stream.get("codec_type") == "video"), {})
format_info = payload.get("format") or {}
return {
"codec_name": video_stream.get("codec_name"),
"pix_fmt": video_stream.get("pix_fmt"),
"width": video_stream.get("width"),
"height": video_stream.get("height"),
"avg_frame_rate": video_stream.get("avg_frame_rate"),
"r_frame_rate": video_stream.get("r_frame_rate"),
"nb_frames": video_stream.get("nb_frames"),
"duration": format_info.get("duration") or video_stream.get("duration"),
"size": format_info.get("size"),
}
def _extract_frames(video_path: Path, output_dir: Path) -> list[Path]:
output_dir.mkdir(parents=True, exist_ok=True)
cmd = [
"ffmpeg",
"-y",
"-i",
str(video_path),
"-vsync",
"0",
str(output_dir / "frame_%06d.png"),
]
subprocess.run(cmd, check=True, capture_output=True, text=True)
return sorted(output_dir.glob("frame_*.png"))
def _compute_image_delta(authoritative_path: Path, observer_path: Path) -> tuple[bool | None, float | None]:
try:
with Image.open(authoritative_path) as authoritative_image, Image.open(observer_path) as observer_image:
authoritative_rgba = authoritative_image.convert("RGBA")
observer_rgba = observer_image.convert("RGBA")
if authoritative_rgba.size != observer_rgba.size:
return False, None
diff = ImageChops.difference(authoritative_rgba, observer_rgba)
mean_channels = ImageStat.Stat(diff).mean
return True, sum(mean_channels) / (len(mean_channels) * 255.0)
except Exception:
return None, None
def _build_video_artifact(payload: bytes, *, download_url: str, metadata: dict) -> dict:
return {
"path": download_url,
"storage_key": None,
"exists": bool(payload),
"file_size_bytes": len(payload) if payload else None,
"sha256": hashlib.sha256(payload).hexdigest() if payload else None,
"mime_type": "video/mp4",
"image_width": metadata.get("width"),
"image_height": metadata.get("height"),
}
def _compare_videos(authoritative_bytes: bytes, observer_bytes: bytes) -> dict:
with tempfile.TemporaryDirectory(prefix="hartomat-turntable-parity-") as tmp:
tmpdir = Path(tmp)
authoritative_path = _write_temp_file(tmpdir, name="authoritative.mp4", payload=authoritative_bytes)
observer_path = _write_temp_file(tmpdir, name="observer.mp4", payload=observer_bytes)
authoritative_meta = _probe_video(authoritative_path)
observer_meta = _probe_video(observer_path)
authoritative_frames = _extract_frames(authoritative_path, tmpdir / "authoritative_frames")
observer_frames = _extract_frames(observer_path, tmpdir / "observer_frames")
exact_match = hashlib.sha256(authoritative_bytes).hexdigest() == hashlib.sha256(observer_bytes).hexdigest()
dimensions_match = (
authoritative_meta.get("width") == observer_meta.get("width")
and authoritative_meta.get("height") == observer_meta.get("height")
and authoritative_meta.get("width") is not None
and observer_meta.get("width") is not None
)
frame_count_match = len(authoritative_frames) == len(observer_frames)
mean_frame_deltas: list[float] = []
frames_dimensions_match = dimensions_match
if frame_count_match:
for authoritative_frame, observer_frame in zip(authoritative_frames, observer_frames, strict=True):
frame_dimensions_match, frame_delta = _compute_image_delta(authoritative_frame, observer_frame)
if frame_dimensions_match is False:
frames_dimensions_match = False
mean_frame_deltas = []
break
if frame_delta is None:
mean_frame_deltas = []
break
mean_frame_deltas.append(frame_delta)
if exact_match:
status = "matched"
summary = "Observer video matches the authoritative legacy output byte-for-byte."
mean_pixel_delta = 0.0
dimensions_match = True
elif not frame_count_match:
status = "different"
summary = "Observer video differs from the authoritative output and the frame count changed."
mean_pixel_delta = None
elif frames_dimensions_match is False:
status = "different"
summary = "Observer video differs from the authoritative output and frame dimensions changed."
mean_pixel_delta = None
elif mean_frame_deltas:
mean_pixel_delta = sum(mean_frame_deltas) / len(mean_frame_deltas)
if mean_pixel_delta <= 1e-6:
status = "matched"
summary = "Observer video matches the authoritative output within the visual pass threshold."
else:
status = "different"
summary = "Observer video differs from the authoritative output."
else:
mean_pixel_delta = None
status = "different"
summary = "Observer video differs from the authoritative output and could not be frame-compared."
return {
"status": status,
"summary": summary,
"exact_match": exact_match,
"dimensions_match": dimensions_match if frame_count_match else False,
"mean_pixel_delta": mean_pixel_delta,
"authoritative_video": authoritative_meta,
"observer_video": observer_meta,
"authoritative_frame_count": len(authoritative_frames),
"observer_frame_count": len(observer_frames),
}
def _resolve_turntable_assets(
harness,
client,
*,
order_line_id: str,
workflow_run_id: str,
timeout_seconds: int,
) -> tuple[dict, dict]:
deadline = harness.time.time() + timeout_seconds
while harness.time.time() < deadline:
assets = harness.list_media_assets(
client,
order_line_id=order_line_id,
asset_type="turntable",
)
observer_asset = next(
(asset for asset in assets if str(asset.get("workflow_run_id")) == workflow_run_id),
None,
)
authoritative_asset = next(
(
asset
for asset in assets
if asset.get("workflow_run_id") is None
or str(asset.get("workflow_run_id")) != workflow_run_id
),
None,
)
if authoritative_asset is not None and observer_asset is not None:
return authoritative_asset, observer_asset
harness.time.sleep(2)
raise RuntimeError(
f"Could not resolve authoritative+observer turntable assets for line {order_line_id} run {workflow_run_id}"
)
def _run_single_output_type_with_product(harness, client, *, product_id: str, output_type: dict) -> dict:
templates = harness.render_template_candidates_for_output_type(
harness.get_render_templates(client),
output_type["id"],
)
if not templates:
raise RuntimeError(f"Output type {output_type['name']} is not template-backed")
snapshot = harness.build_output_type_workflow_snapshot(output_type)
workflow = _ensure_shadow_probe_workflow(harness, client, output_type=output_type)
try:
resp = client.patch(
f"/output-types/{output_type['id']}",
json=harness.build_output_type_workflow_link_payload(
workflow_definition_id=workflow["id"],
execution_mode="shadow",
),
)
if resp.status_code != 200:
raise RuntimeError(
f"Output type shadow link failed for {output_type['name']}: "
f"{resp.status_code} {resp.text[:400]}"
)
bound_output_type = resp.json()
order = harness.create_test_order(
client,
product_id=product_id,
output_type_ids=[bound_output_type["id"]],
test_label=f"Turntable Parity [{bound_output_type['name']}]",
)
if order is None:
raise RuntimeError(f"Order creation failed for {bound_output_type['name']}")
lines = order.get("lines", [])
if len(lines) != 1:
raise RuntimeError(
f"Expected exactly one order line for {bound_output_type['name']}, got {len(lines)}"
)
line_id = lines[0]["id"]
resp_preflight = client.get(
f"/workflows/{workflow['id']}/preflight",
params={"context_id": line_id},
)
if resp_preflight.status_code != 200:
raise RuntimeError(
f"Workflow preflight failed for {bound_output_type['name']}: "
f"{resp_preflight.status_code} {resp_preflight.text[:400]}"
)
preflight = resp_preflight.json()
if not preflight.get("graph_dispatch_allowed"):
raise RuntimeError(
f"Workflow preflight blocked dispatch for {bound_output_type['name']}: "
f"{preflight.get('summary')}"
)
success = harness._submit_and_wait(
client,
order,
[bound_output_type["id"]],
use_graph_dispatch=False,
timeout_seconds=getattr(harness, "ANIMATION_RENDER_TIMEOUT_SECONDS", TURNABLE_RENDER_TIMEOUT_SECONDS),
)
if not success:
raise RuntimeError(f"Render dispatch did not complete successfully for {bound_output_type['name']}")
workflow_run = harness.wait_for_workflow_run(
client,
workflow_id=workflow["id"],
line_id=line_id,
timeout_seconds=getattr(harness, "ANIMATION_WORKFLOW_RUN_TIMEOUT_SECONDS", TURNABLE_WORKFLOW_RUN_TIMEOUT_SECONDS),
terminal_only=True,
)
if workflow_run is None:
raise RuntimeError(f"Workflow run not found for {bound_output_type['name']}")
raw_comparison = harness.wait_for_workflow_comparison(
client,
workflow_run_id=workflow_run["id"],
timeout_seconds=getattr(harness, "ANIMATION_WORKFLOW_COMPARISON_TIMEOUT_SECONDS", TURNABLE_WORKFLOW_COMPARISON_TIMEOUT_SECONDS),
)
if raw_comparison is None:
raise RuntimeError(f"Workflow comparison did not stabilize for {bound_output_type['name']}")
authoritative_asset, observer_asset = _resolve_turntable_assets(
harness,
client,
order_line_id=line_id,
workflow_run_id=str(workflow_run["id"]),
timeout_seconds=TURNABLE_ASSET_TIMEOUT_SECONDS,
)
authoritative_bytes = _download_bytes(client, authoritative_asset["download_url"])
observer_bytes = _download_bytes(client, observer_asset["download_url"])
manual = _compare_videos(authoritative_bytes, observer_bytes)
comparison = {
"workflow_run_id": workflow_run.get("id"),
"workflow_def_id": workflow.get("id"),
"order_line_id": line_id,
"execution_mode": workflow_run.get("execution_mode"),
"status": manual["status"],
"summary": manual["summary"],
"authoritative_output": _build_video_artifact(
authoritative_bytes,
download_url=authoritative_asset["download_url"],
metadata=manual["authoritative_video"],
),
"observer_output": _build_video_artifact(
observer_bytes,
download_url=observer_asset["download_url"],
metadata=manual["observer_video"],
),
"exact_match": manual["exact_match"],
"dimensions_match": manual["dimensions_match"],
"mean_pixel_delta": manual["mean_pixel_delta"],
"authoritative_frame_count": manual["authoritative_frame_count"],
"observer_frame_count": manual["observer_frame_count"],
}
rollout_gate = harness.evaluate_rollout_gate_from_comparison(comparison)
template_node = harness._node_result_by_name(workflow_run, "template")
render_node = harness._node_result_by_name(workflow_run, "turntable")
return {
"output_type": {
"id": bound_output_type["id"],
"name": bound_output_type["name"],
"format": bound_output_type.get("output_format"),
"artifact_kind": bound_output_type.get("artifact_kind"),
"render_settings": bound_output_type.get("render_settings"),
"invocation_overrides": bound_output_type.get("invocation_overrides"),
},
"templates": [
{
"name": template.get("name"),
"blend_file_path": template.get("blend_file_path"),
"lighting_only": template.get("lighting_only"),
"shadow_catcher_enabled": template.get("shadow_catcher_enabled"),
"target_collection": template.get("target_collection"),
"camera_orbit": template.get("camera_orbit"),
}
for template in templates
],
"workflow": {
"id": workflow.get("id"),
"name": workflow.get("name"),
},
"preflight": {
"execution_mode": preflight.get("execution_mode"),
"context_kind": preflight.get("context_kind"),
"graph_dispatch_allowed": preflight.get("graph_dispatch_allowed"),
},
"order_line_id": line_id,
"workflow_run": {
"id": workflow_run.get("id"),
"status": workflow_run.get("status"),
"execution_mode": workflow_run.get("execution_mode"),
},
"template_resolution": template_node.get("output") if template_node else None,
"render_output": render_node.get("output") if render_node else None,
"raw_backend_comparison": raw_comparison,
"comparison": comparison,
"rollout_gate": rollout_gate,
}
finally:
harness.restore_output_type_workflow_snapshot(
client,
output_type_id=output_type["id"],
snapshot=snapshot,
)
def main() -> int:
harness = _load_harness()
parser = argparse.ArgumentParser(description="Compare live legacy vs shadow renders for real turntable output types")
parser.add_argument("--host", default=os.environ.get("TEST_HOST", "http://localhost:8888"))
parser.add_argument("--email", default=os.environ.get("TEST_EMAIL", "admin@hartomat.com"))
parser.add_argument("--password", default=os.environ.get("TEST_PASSWORD", "Admin1234!"))
parser.add_argument("--product-id", required=True, help="Existing product id to use for parity runs")
parser.add_argument("--include-generated", action="store_true")
parser.add_argument("--output", default=None, help="Optional path for JSON report")
parser.add_argument("--only", action="append", default=[], help="Only run the named output type; repeatable")
args = parser.parse_args()
client = harness.APIClient(args.host, args.email, args.password)
if not harness.test_health(client):
raise RuntimeError("Render stack health check failed")
output_types = [
output_type
for output_type in harness.get_output_types(client, include_inactive=True)
if _is_real_turntable_output_type(output_type, include_generated=args.include_generated)
]
if args.only:
wanted = set(args.only)
output_types = [output_type for output_type in output_types if output_type.get("name") in wanted]
output_types.sort(key=lambda item: item.get("name") or "")
if not output_types:
raise RuntimeError("No eligible turntable output types found")
results: list[dict] = []
for output_type in output_types:
print(f"\n=== {output_type['name']} ===", flush=True)
result = _run_single_output_type_with_product(
harness,
client,
product_id=args.product_id,
output_type=output_type,
)
results.append(result)
summary = {
"name": result["output_type"]["name"],
"artifact_kind": result["output_type"].get("artifact_kind"),
"exact_match": result["comparison"].get("exact_match"),
"status": result["comparison"].get("status"),
"mean_pixel_delta": result["comparison"].get("mean_pixel_delta"),
"authoritative_frame_count": result["comparison"].get("authoritative_frame_count"),
"observer_frame_count": result["comparison"].get("observer_frame_count"),
"rollout_verdict": result["rollout_gate"].get("verdict"),
"workflow_run_id": result["workflow_run"]["id"],
"order_line_id": result["order_line_id"],
}
print(json.dumps(summary, ensure_ascii=False), flush=True)
report = {
"host": args.host,
"product_id": args.product_id,
"results": results,
}
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(report, indent=2, ensure_ascii=False) + "\n")
print(f"\nWrote report to {output_path}", flush=True)
overall = {
"total": len(results),
"exact_match": sum(1 for item in results if item["comparison"].get("exact_match") is True),
"pass": sum(1 for item in results if item["rollout_gate"].get("verdict") == "pass"),
"warn": sum(1 for item in results if item["rollout_gate"].get("verdict") == "warn"),
"fail": sum(1 for item in results if item["rollout_gate"].get("verdict") == "fail"),
}
print("\n=== Overall ===", flush=True)
print(json.dumps(overall, ensure_ascii=False), flush=True)
return 0
if __name__ == "__main__":
raise SystemExit(main())