544 lines
22 KiB
Python
544 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""Serial live parity runner for real Blender turntable output types.
|
|
|
|
Creates reversible shadow-workflow probes for active template-backed turntable
|
|
output types, dispatches real renders against the live stack, waits for the
|
|
legacy + shadow outputs, and compares the resulting MP4s frame-by-frame.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import importlib.util
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
from PIL import Image, ImageChops, ImageStat
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
HARNESS_PATH = ROOT / "scripts" / "test_render_pipeline.py"
|
|
|
|
TURNABLE_RENDER_TIMEOUT_SECONDS = 3600
|
|
TURNABLE_WORKFLOW_RUN_TIMEOUT_SECONDS = 3600
|
|
TURNABLE_WORKFLOW_COMPARISON_TIMEOUT_SECONDS = 1200
|
|
TURNABLE_ASSET_TIMEOUT_SECONDS = 600
|
|
|
|
|
|
def _load_harness():
|
|
spec = importlib.util.spec_from_file_location("live_render_harness", HARNESS_PATH)
|
|
if spec is None or spec.loader is None:
|
|
raise RuntimeError(f"Could not load harness from {HARNESS_PATH}")
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
def _is_real_turntable_output_type(output_type: dict, *, include_generated: bool) -> bool:
|
|
if not output_type.get("is_active", True):
|
|
return False
|
|
if output_type.get("renderer") != "blender":
|
|
return False
|
|
if output_type.get("artifact_kind") != "turntable_video":
|
|
return False
|
|
if not output_type.get("is_animation"):
|
|
return False
|
|
if include_generated:
|
|
return True
|
|
name = str(output_type.get("name") or "")
|
|
return not name.startswith("[")
|
|
|
|
|
|
def _build_turntable_shadow_probe_config(harness) -> dict:
|
|
config = harness.build_graph_turntable_config(execution_mode="shadow")
|
|
for node in config.get("nodes", []):
|
|
if node.get("id") == "turntable":
|
|
node["params"] = {}
|
|
return config
|
|
|
|
|
|
def _ensure_shadow_probe_workflow(harness, client, *, output_type: dict) -> dict:
|
|
workflow_name = f"[Turntable Parity] {output_type['name']}"
|
|
workflows = harness.get_workflows(client)
|
|
workflow = harness.find_named(workflows, workflow_name)
|
|
workflow_payload = {
|
|
"name": workflow_name,
|
|
"output_type_id": output_type["id"],
|
|
"config": _build_turntable_shadow_probe_config(harness),
|
|
"is_active": True,
|
|
}
|
|
if workflow is None:
|
|
resp = client.post("/workflows", json=workflow_payload)
|
|
if resp.status_code not in (200, 201):
|
|
raise RuntimeError(
|
|
f"Shadow probe workflow create failed for {output_type['name']}: "
|
|
f"{resp.status_code} {resp.text[:400]}"
|
|
)
|
|
workflow = resp.json()
|
|
else:
|
|
resp = client.put(
|
|
f"/workflows/{workflow['id']}",
|
|
json={
|
|
"name": workflow_payload["name"],
|
|
"config": workflow_payload["config"],
|
|
"is_active": workflow_payload["is_active"],
|
|
},
|
|
)
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(
|
|
f"Shadow probe workflow update failed for {output_type['name']}: "
|
|
f"{resp.status_code} {resp.text[:400]}"
|
|
)
|
|
workflow = resp.json()
|
|
return workflow
|
|
|
|
|
|
def _download_bytes(client, path: str) -> bytes:
|
|
response = client.session.get(f"{client.host}{path}", timeout=120)
|
|
response.raise_for_status()
|
|
return response.content
|
|
|
|
|
|
def _write_temp_file(tmpdir: Path, *, name: str, payload: bytes) -> Path:
|
|
path = tmpdir / name
|
|
path.write_bytes(payload)
|
|
return path
|
|
|
|
|
|
def _probe_video(video_path: Path) -> dict:
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-print_format",
|
|
"json",
|
|
"-show_streams",
|
|
"-show_format",
|
|
str(video_path),
|
|
]
|
|
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
payload = json.loads(result.stdout or "{}")
|
|
streams = payload.get("streams") or []
|
|
video_stream = next((stream for stream in streams if stream.get("codec_type") == "video"), {})
|
|
format_info = payload.get("format") or {}
|
|
return {
|
|
"codec_name": video_stream.get("codec_name"),
|
|
"pix_fmt": video_stream.get("pix_fmt"),
|
|
"width": video_stream.get("width"),
|
|
"height": video_stream.get("height"),
|
|
"avg_frame_rate": video_stream.get("avg_frame_rate"),
|
|
"r_frame_rate": video_stream.get("r_frame_rate"),
|
|
"nb_frames": video_stream.get("nb_frames"),
|
|
"duration": format_info.get("duration") or video_stream.get("duration"),
|
|
"size": format_info.get("size"),
|
|
}
|
|
|
|
|
|
def _extract_frames(video_path: Path, output_dir: Path) -> list[Path]:
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-i",
|
|
str(video_path),
|
|
"-vsync",
|
|
"0",
|
|
str(output_dir / "frame_%06d.png"),
|
|
]
|
|
subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
return sorted(output_dir.glob("frame_*.png"))
|
|
|
|
|
|
def _compute_image_delta(authoritative_path: Path, observer_path: Path) -> tuple[bool | None, float | None]:
|
|
try:
|
|
with Image.open(authoritative_path) as authoritative_image, Image.open(observer_path) as observer_image:
|
|
authoritative_rgba = authoritative_image.convert("RGBA")
|
|
observer_rgba = observer_image.convert("RGBA")
|
|
if authoritative_rgba.size != observer_rgba.size:
|
|
return False, None
|
|
diff = ImageChops.difference(authoritative_rgba, observer_rgba)
|
|
mean_channels = ImageStat.Stat(diff).mean
|
|
return True, sum(mean_channels) / (len(mean_channels) * 255.0)
|
|
except Exception:
|
|
return None, None
|
|
|
|
|
|
def _build_video_artifact(payload: bytes, *, download_url: str, metadata: dict) -> dict:
|
|
return {
|
|
"path": download_url,
|
|
"storage_key": None,
|
|
"exists": bool(payload),
|
|
"file_size_bytes": len(payload) if payload else None,
|
|
"sha256": hashlib.sha256(payload).hexdigest() if payload else None,
|
|
"mime_type": "video/mp4",
|
|
"image_width": metadata.get("width"),
|
|
"image_height": metadata.get("height"),
|
|
}
|
|
|
|
|
|
def _compare_videos(authoritative_bytes: bytes, observer_bytes: bytes) -> dict:
|
|
with tempfile.TemporaryDirectory(prefix="hartomat-turntable-parity-") as tmp:
|
|
tmpdir = Path(tmp)
|
|
authoritative_path = _write_temp_file(tmpdir, name="authoritative.mp4", payload=authoritative_bytes)
|
|
observer_path = _write_temp_file(tmpdir, name="observer.mp4", payload=observer_bytes)
|
|
|
|
authoritative_meta = _probe_video(authoritative_path)
|
|
observer_meta = _probe_video(observer_path)
|
|
|
|
authoritative_frames = _extract_frames(authoritative_path, tmpdir / "authoritative_frames")
|
|
observer_frames = _extract_frames(observer_path, tmpdir / "observer_frames")
|
|
|
|
exact_match = hashlib.sha256(authoritative_bytes).hexdigest() == hashlib.sha256(observer_bytes).hexdigest()
|
|
dimensions_match = (
|
|
authoritative_meta.get("width") == observer_meta.get("width")
|
|
and authoritative_meta.get("height") == observer_meta.get("height")
|
|
and authoritative_meta.get("width") is not None
|
|
and observer_meta.get("width") is not None
|
|
)
|
|
|
|
frame_count_match = len(authoritative_frames) == len(observer_frames)
|
|
mean_frame_deltas: list[float] = []
|
|
frames_dimensions_match = dimensions_match
|
|
if frame_count_match:
|
|
for authoritative_frame, observer_frame in zip(authoritative_frames, observer_frames, strict=True):
|
|
frame_dimensions_match, frame_delta = _compute_image_delta(authoritative_frame, observer_frame)
|
|
if frame_dimensions_match is False:
|
|
frames_dimensions_match = False
|
|
mean_frame_deltas = []
|
|
break
|
|
if frame_delta is None:
|
|
mean_frame_deltas = []
|
|
break
|
|
mean_frame_deltas.append(frame_delta)
|
|
|
|
if exact_match:
|
|
status = "matched"
|
|
summary = "Observer video matches the authoritative legacy output byte-for-byte."
|
|
mean_pixel_delta = 0.0
|
|
dimensions_match = True
|
|
elif not frame_count_match:
|
|
status = "different"
|
|
summary = "Observer video differs from the authoritative output and the frame count changed."
|
|
mean_pixel_delta = None
|
|
elif frames_dimensions_match is False:
|
|
status = "different"
|
|
summary = "Observer video differs from the authoritative output and frame dimensions changed."
|
|
mean_pixel_delta = None
|
|
elif mean_frame_deltas:
|
|
mean_pixel_delta = sum(mean_frame_deltas) / len(mean_frame_deltas)
|
|
if mean_pixel_delta <= 1e-6:
|
|
status = "matched"
|
|
summary = "Observer video matches the authoritative output within the visual pass threshold."
|
|
else:
|
|
status = "different"
|
|
summary = "Observer video differs from the authoritative output."
|
|
else:
|
|
mean_pixel_delta = None
|
|
status = "different"
|
|
summary = "Observer video differs from the authoritative output and could not be frame-compared."
|
|
|
|
return {
|
|
"status": status,
|
|
"summary": summary,
|
|
"exact_match": exact_match,
|
|
"dimensions_match": dimensions_match if frame_count_match else False,
|
|
"mean_pixel_delta": mean_pixel_delta,
|
|
"authoritative_video": authoritative_meta,
|
|
"observer_video": observer_meta,
|
|
"authoritative_frame_count": len(authoritative_frames),
|
|
"observer_frame_count": len(observer_frames),
|
|
}
|
|
|
|
|
|
def _resolve_turntable_assets(
|
|
harness,
|
|
client,
|
|
*,
|
|
order_line_id: str,
|
|
workflow_run_id: str,
|
|
timeout_seconds: int,
|
|
) -> tuple[dict, dict]:
|
|
deadline = harness.time.time() + timeout_seconds
|
|
while harness.time.time() < deadline:
|
|
assets = harness.list_media_assets(
|
|
client,
|
|
order_line_id=order_line_id,
|
|
asset_type="turntable",
|
|
)
|
|
observer_asset = next(
|
|
(asset for asset in assets if str(asset.get("workflow_run_id")) == workflow_run_id),
|
|
None,
|
|
)
|
|
authoritative_asset = next(
|
|
(
|
|
asset
|
|
for asset in assets
|
|
if asset.get("workflow_run_id") is None
|
|
or str(asset.get("workflow_run_id")) != workflow_run_id
|
|
),
|
|
None,
|
|
)
|
|
if authoritative_asset is not None and observer_asset is not None:
|
|
return authoritative_asset, observer_asset
|
|
harness.time.sleep(2)
|
|
raise RuntimeError(
|
|
f"Could not resolve authoritative+observer turntable assets for line {order_line_id} run {workflow_run_id}"
|
|
)
|
|
|
|
|
|
def _run_single_output_type_with_product(harness, client, *, product_id: str, output_type: dict) -> dict:
|
|
templates = harness.render_template_candidates_for_output_type(
|
|
harness.get_render_templates(client),
|
|
output_type["id"],
|
|
)
|
|
if not templates:
|
|
raise RuntimeError(f"Output type {output_type['name']} is not template-backed")
|
|
|
|
snapshot = harness.build_output_type_workflow_snapshot(output_type)
|
|
workflow = _ensure_shadow_probe_workflow(harness, client, output_type=output_type)
|
|
|
|
try:
|
|
resp = client.patch(
|
|
f"/output-types/{output_type['id']}",
|
|
json=harness.build_output_type_workflow_link_payload(
|
|
workflow_definition_id=workflow["id"],
|
|
execution_mode="shadow",
|
|
),
|
|
)
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(
|
|
f"Output type shadow link failed for {output_type['name']}: "
|
|
f"{resp.status_code} {resp.text[:400]}"
|
|
)
|
|
bound_output_type = resp.json()
|
|
|
|
order = harness.create_test_order(
|
|
client,
|
|
product_id=product_id,
|
|
output_type_ids=[bound_output_type["id"]],
|
|
test_label=f"Turntable Parity [{bound_output_type['name']}]",
|
|
)
|
|
if order is None:
|
|
raise RuntimeError(f"Order creation failed for {bound_output_type['name']}")
|
|
|
|
lines = order.get("lines", [])
|
|
if len(lines) != 1:
|
|
raise RuntimeError(
|
|
f"Expected exactly one order line for {bound_output_type['name']}, got {len(lines)}"
|
|
)
|
|
line_id = lines[0]["id"]
|
|
|
|
resp_preflight = client.get(
|
|
f"/workflows/{workflow['id']}/preflight",
|
|
params={"context_id": line_id},
|
|
)
|
|
if resp_preflight.status_code != 200:
|
|
raise RuntimeError(
|
|
f"Workflow preflight failed for {bound_output_type['name']}: "
|
|
f"{resp_preflight.status_code} {resp_preflight.text[:400]}"
|
|
)
|
|
preflight = resp_preflight.json()
|
|
if not preflight.get("graph_dispatch_allowed"):
|
|
raise RuntimeError(
|
|
f"Workflow preflight blocked dispatch for {bound_output_type['name']}: "
|
|
f"{preflight.get('summary')}"
|
|
)
|
|
|
|
success = harness._submit_and_wait(
|
|
client,
|
|
order,
|
|
[bound_output_type["id"]],
|
|
use_graph_dispatch=False,
|
|
timeout_seconds=getattr(harness, "ANIMATION_RENDER_TIMEOUT_SECONDS", TURNABLE_RENDER_TIMEOUT_SECONDS),
|
|
)
|
|
if not success:
|
|
raise RuntimeError(f"Render dispatch did not complete successfully for {bound_output_type['name']}")
|
|
|
|
workflow_run = harness.wait_for_workflow_run(
|
|
client,
|
|
workflow_id=workflow["id"],
|
|
line_id=line_id,
|
|
timeout_seconds=getattr(harness, "ANIMATION_WORKFLOW_RUN_TIMEOUT_SECONDS", TURNABLE_WORKFLOW_RUN_TIMEOUT_SECONDS),
|
|
terminal_only=True,
|
|
)
|
|
if workflow_run is None:
|
|
raise RuntimeError(f"Workflow run not found for {bound_output_type['name']}")
|
|
|
|
raw_comparison = harness.wait_for_workflow_comparison(
|
|
client,
|
|
workflow_run_id=workflow_run["id"],
|
|
timeout_seconds=getattr(harness, "ANIMATION_WORKFLOW_COMPARISON_TIMEOUT_SECONDS", TURNABLE_WORKFLOW_COMPARISON_TIMEOUT_SECONDS),
|
|
)
|
|
if raw_comparison is None:
|
|
raise RuntimeError(f"Workflow comparison did not stabilize for {bound_output_type['name']}")
|
|
|
|
authoritative_asset, observer_asset = _resolve_turntable_assets(
|
|
harness,
|
|
client,
|
|
order_line_id=line_id,
|
|
workflow_run_id=str(workflow_run["id"]),
|
|
timeout_seconds=TURNABLE_ASSET_TIMEOUT_SECONDS,
|
|
)
|
|
authoritative_bytes = _download_bytes(client, authoritative_asset["download_url"])
|
|
observer_bytes = _download_bytes(client, observer_asset["download_url"])
|
|
|
|
manual = _compare_videos(authoritative_bytes, observer_bytes)
|
|
comparison = {
|
|
"workflow_run_id": workflow_run.get("id"),
|
|
"workflow_def_id": workflow.get("id"),
|
|
"order_line_id": line_id,
|
|
"execution_mode": workflow_run.get("execution_mode"),
|
|
"status": manual["status"],
|
|
"summary": manual["summary"],
|
|
"authoritative_output": _build_video_artifact(
|
|
authoritative_bytes,
|
|
download_url=authoritative_asset["download_url"],
|
|
metadata=manual["authoritative_video"],
|
|
),
|
|
"observer_output": _build_video_artifact(
|
|
observer_bytes,
|
|
download_url=observer_asset["download_url"],
|
|
metadata=manual["observer_video"],
|
|
),
|
|
"exact_match": manual["exact_match"],
|
|
"dimensions_match": manual["dimensions_match"],
|
|
"mean_pixel_delta": manual["mean_pixel_delta"],
|
|
"authoritative_frame_count": manual["authoritative_frame_count"],
|
|
"observer_frame_count": manual["observer_frame_count"],
|
|
}
|
|
rollout_gate = harness.evaluate_rollout_gate_from_comparison(comparison)
|
|
template_node = harness._node_result_by_name(workflow_run, "template")
|
|
render_node = harness._node_result_by_name(workflow_run, "turntable")
|
|
|
|
return {
|
|
"output_type": {
|
|
"id": bound_output_type["id"],
|
|
"name": bound_output_type["name"],
|
|
"format": bound_output_type.get("output_format"),
|
|
"artifact_kind": bound_output_type.get("artifact_kind"),
|
|
"render_settings": bound_output_type.get("render_settings"),
|
|
"invocation_overrides": bound_output_type.get("invocation_overrides"),
|
|
},
|
|
"templates": [
|
|
{
|
|
"name": template.get("name"),
|
|
"blend_file_path": template.get("blend_file_path"),
|
|
"lighting_only": template.get("lighting_only"),
|
|
"shadow_catcher_enabled": template.get("shadow_catcher_enabled"),
|
|
"target_collection": template.get("target_collection"),
|
|
"camera_orbit": template.get("camera_orbit"),
|
|
}
|
|
for template in templates
|
|
],
|
|
"workflow": {
|
|
"id": workflow.get("id"),
|
|
"name": workflow.get("name"),
|
|
},
|
|
"preflight": {
|
|
"execution_mode": preflight.get("execution_mode"),
|
|
"context_kind": preflight.get("context_kind"),
|
|
"graph_dispatch_allowed": preflight.get("graph_dispatch_allowed"),
|
|
},
|
|
"order_line_id": line_id,
|
|
"workflow_run": {
|
|
"id": workflow_run.get("id"),
|
|
"status": workflow_run.get("status"),
|
|
"execution_mode": workflow_run.get("execution_mode"),
|
|
},
|
|
"template_resolution": template_node.get("output") if template_node else None,
|
|
"render_output": render_node.get("output") if render_node else None,
|
|
"raw_backend_comparison": raw_comparison,
|
|
"comparison": comparison,
|
|
"rollout_gate": rollout_gate,
|
|
}
|
|
finally:
|
|
harness.restore_output_type_workflow_snapshot(
|
|
client,
|
|
output_type_id=output_type["id"],
|
|
snapshot=snapshot,
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
harness = _load_harness()
|
|
|
|
parser = argparse.ArgumentParser(description="Compare live legacy vs shadow renders for real turntable output types")
|
|
parser.add_argument("--host", default=os.environ.get("TEST_HOST", "http://localhost:8888"))
|
|
parser.add_argument("--email", default=os.environ.get("TEST_EMAIL", "admin@hartomat.com"))
|
|
parser.add_argument("--password", default=os.environ.get("TEST_PASSWORD", "Admin1234!"))
|
|
parser.add_argument("--product-id", required=True, help="Existing product id to use for parity runs")
|
|
parser.add_argument("--include-generated", action="store_true")
|
|
parser.add_argument("--output", default=None, help="Optional path for JSON report")
|
|
parser.add_argument("--only", action="append", default=[], help="Only run the named output type; repeatable")
|
|
args = parser.parse_args()
|
|
|
|
client = harness.APIClient(args.host, args.email, args.password)
|
|
if not harness.test_health(client):
|
|
raise RuntimeError("Render stack health check failed")
|
|
|
|
output_types = [
|
|
output_type
|
|
for output_type in harness.get_output_types(client, include_inactive=True)
|
|
if _is_real_turntable_output_type(output_type, include_generated=args.include_generated)
|
|
]
|
|
if args.only:
|
|
wanted = set(args.only)
|
|
output_types = [output_type for output_type in output_types if output_type.get("name") in wanted]
|
|
output_types.sort(key=lambda item: item.get("name") or "")
|
|
if not output_types:
|
|
raise RuntimeError("No eligible turntable output types found")
|
|
|
|
results: list[dict] = []
|
|
for output_type in output_types:
|
|
print(f"\n=== {output_type['name']} ===", flush=True)
|
|
result = _run_single_output_type_with_product(
|
|
harness,
|
|
client,
|
|
product_id=args.product_id,
|
|
output_type=output_type,
|
|
)
|
|
results.append(result)
|
|
summary = {
|
|
"name": result["output_type"]["name"],
|
|
"artifact_kind": result["output_type"].get("artifact_kind"),
|
|
"exact_match": result["comparison"].get("exact_match"),
|
|
"status": result["comparison"].get("status"),
|
|
"mean_pixel_delta": result["comparison"].get("mean_pixel_delta"),
|
|
"authoritative_frame_count": result["comparison"].get("authoritative_frame_count"),
|
|
"observer_frame_count": result["comparison"].get("observer_frame_count"),
|
|
"rollout_verdict": result["rollout_gate"].get("verdict"),
|
|
"workflow_run_id": result["workflow_run"]["id"],
|
|
"order_line_id": result["order_line_id"],
|
|
}
|
|
print(json.dumps(summary, ensure_ascii=False), flush=True)
|
|
|
|
report = {
|
|
"host": args.host,
|
|
"product_id": args.product_id,
|
|
"results": results,
|
|
}
|
|
if args.output:
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(json.dumps(report, indent=2, ensure_ascii=False) + "\n")
|
|
print(f"\nWrote report to {output_path}", flush=True)
|
|
|
|
overall = {
|
|
"total": len(results),
|
|
"exact_match": sum(1 for item in results if item["comparison"].get("exact_match") is True),
|
|
"pass": sum(1 for item in results if item["rollout_gate"].get("verdict") == "pass"),
|
|
"warn": sum(1 for item in results if item["rollout_gate"].get("verdict") == "warn"),
|
|
"fail": sum(1 for item in results if item["rollout_gate"].get("verdict") == "fail"),
|
|
}
|
|
print("\n=== Overall ===", flush=True)
|
|
print(json.dumps(overall, ensure_ascii=False), flush=True)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|