Files
HartOMat/scripts/test_render_pipeline.py
T

934 lines
33 KiB
Python

#!/usr/bin/env python3
"""Render pipeline integration test.
Tests the full pipeline: STEP upload → CAD processing → thumbnail rendering →
order creation → submit → dispatch renders → wait for completed.
Usage:
# Quick smoke test (1 STEP file, 1 output type)
python scripts/test_render_pipeline.py --sample
# Full test — all output types, waits for all renders
python scripts/test_render_pipeline.py --full
# Only check render health endpoint
python scripts/test_render_pipeline.py --health
# Custom credentials / host
python scripts/test_render_pipeline.py --sample --host http://localhost:8888 \
--email admin@hartomat.com --password Admin1234!
Environment variables (alternative to flags):
TEST_HOST, TEST_EMAIL, TEST_PASSWORD
"""
import argparse
import os
import sys
import time
import json
import requests
from pathlib import Path
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEFAULT_HOST = os.environ.get("TEST_HOST", "http://localhost:8888")
DEFAULT_EMAIL = os.environ.get("TEST_EMAIL", "admin@hartomat.com")
DEFAULT_PASSWORD = os.environ.get("TEST_PASSWORD", "Admin1234!")
SAMPLE_STEP = Path(__file__).parent.parent / "step-sample-file" / "81113-l_cut.stp"
RENDER_TIMEOUT_SECONDS = 300 # 5 minutes per render
POLL_INTERVAL_SECONDS = 5
CAD_PROCESSING_TIMEOUT = 120 # 2 minutes for STEP processing
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
RESET = "\033[0m"
passed = []
failed = []
warnings = []
ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA = 0.0
ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA = 0.02
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def ok(msg: str):
print(f" {GREEN}{RESET} {msg}")
passed.append(msg)
def fail(msg: str):
print(f" {RED}{RESET} {msg}")
failed.append(msg)
def warn(msg: str):
print(f" {YELLOW}{RESET} {msg}")
warnings.append(msg)
def info(msg: str):
print(f" {BLUE}{RESET} {msg}")
def section(title: str):
print(f"\n{BLUE}{'='*60}{RESET}")
print(f"{BLUE} {title}{RESET}")
print(f"{BLUE}{'='*60}{RESET}")
def evaluate_rollout_gate_from_comparison(comparison: dict) -> dict:
reasons: list[str] = []
mean_pixel_delta = comparison.get("mean_pixel_delta")
exact_match = comparison.get("exact_match")
dimensions_match = comparison.get("dimensions_match")
status = comparison.get("status")
authoritative_exists = bool(comparison.get("authoritative_output", {}).get("exists"))
observer_exists = bool(comparison.get("observer_output", {}).get("exists"))
if not authoritative_exists:
verdict = "fail"
reasons.append("Authoritative legacy output is missing.")
elif not observer_exists:
verdict = "fail"
reasons.append("Observer workflow output is missing.")
elif exact_match:
verdict = "pass"
reasons.append("Observer output matches the authoritative legacy output byte-for-byte.")
elif dimensions_match is False:
verdict = "fail"
reasons.append("Observer output dimensions differ from the authoritative legacy output.")
elif mean_pixel_delta is None:
verdict = "fail"
reasons.append(f"Workflow comparison did not produce a pixel delta (status={status}).")
elif mean_pixel_delta <= ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA:
verdict = "pass"
reasons.append("Observer output is visually identical within the pass threshold.")
elif mean_pixel_delta <= ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA:
verdict = "warn"
reasons.append("Observer output differs slightly but remains within the warn threshold.")
else:
verdict = "fail"
reasons.append("Observer output exceeds the rollout parity threshold.")
if mean_pixel_delta is not None and not exact_match:
reasons.append(
f"Mean pixel delta {mean_pixel_delta:.6f}; "
f"pass<={ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA:.6f}, "
f"warn<={ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA:.6f}."
)
return {
"verdict": verdict,
"ready": verdict == "pass",
"reasons": reasons,
}
class APIClient:
def __init__(self, host: str, email: str, password: str):
self.host = host.rstrip("/")
self.session = requests.Session()
self.token: str | None = None
self._login(email, password)
def _login(self, email: str, password: str):
resp = self.session.post(
f"{self.host}/api/auth/login",
json={"email": email, "password": password},
)
resp.raise_for_status()
data = resp.json()
self.token = data["access_token"]
self.session.headers["Authorization"] = f"Bearer {self.token}"
def get(self, path: str, **kwargs) -> requests.Response:
return self.session.get(f"{self.host}/api{path}", **kwargs)
def post(self, path: str, **kwargs) -> requests.Response:
return self.session.post(f"{self.host}/api{path}", **kwargs)
def put(self, path: str, **kwargs) -> requests.Response:
return self.session.put(f"{self.host}/api{path}", **kwargs)
def patch(self, path: str, **kwargs) -> requests.Response:
return self.session.patch(f"{self.host}/api{path}", **kwargs)
def delete(self, path: str, **kwargs) -> requests.Response:
return self.session.delete(f"{self.host}/api{path}", **kwargs)
def build_graph_still_config(*, execution_mode: str = "graph") -> dict:
return {
"version": 1,
"ui": {"preset": "still_graph", "execution_mode": execution_mode},
"nodes": [
{
"id": "setup",
"step": "order_line_setup",
"params": {},
"ui": {"label": "Order Line Setup", "position": {"x": 0, "y": 100}},
},
{
"id": "template",
"step": "resolve_template",
"params": {},
"ui": {"label": "Resolve Template", "position": {"x": 220, "y": 100}},
},
{
"id": "render",
"step": "blender_still",
"params": {},
"ui": {"type": "renderNode", "label": "Still Render", "position": {"x": 440, "y": 100}},
},
{
"id": "output",
"step": "output_save",
"params": {},
"ui": {"type": "outputNode", "label": "Save Output", "position": {"x": 660, "y": 100}},
},
],
"edges": [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
{"from": "render", "to": "output"},
],
}
def get_workflows(client: APIClient) -> list[dict]:
resp = client.get("/workflows")
if resp.status_code != 200:
return []
data = resp.json()
return data if isinstance(data, list) else []
def find_named(items: list[dict], name: str) -> dict | None:
return next((item for item in items if item.get("name") == name), None)
def smoke_output_type_name(execution_mode: str) -> str:
return f"[Workflow Smoke] Still {execution_mode.title()}"
def smoke_workflow_name(execution_mode: str) -> str:
return f"[Workflow Smoke] Canonical Still {execution_mode.title()}"
def ensure_workflow_still_smoke_resources(
client: APIClient,
*,
execution_mode: str,
) -> dict:
output_type_name = smoke_output_type_name(execution_mode)
workflow_name = smoke_workflow_name(execution_mode)
output_types = get_output_types(client, include_inactive=True)
output_type = find_named(output_types, output_type_name)
invocation_overrides = {
"width": 1024,
"height": 1024,
"engine": "cycles",
"samples": 64,
}
output_type_payload = {
"name": output_type_name,
"description": f"Canonical still workflow smoke profile ({execution_mode})",
"renderer": "blender",
"render_settings": invocation_overrides,
"output_format": "png",
"sort_order": 0,
"is_active": True,
"compatible_categories": [],
"render_backend": "celery",
"is_animation": False,
"transparent_bg": False,
"workflow_family": "order_line",
"artifact_kind": "still_image",
"invocation_overrides": invocation_overrides,
"workflow_definition_id": None,
}
if output_type is None:
resp = client.post("/output-types", json=output_type_payload)
if resp.status_code not in (200, 201):
raise RuntimeError(
f"Workflow smoke output type create failed: {resp.status_code} {resp.text[:400]}"
)
output_type = resp.json()
ok(f"Provisioned smoke output type: {output_type_name}")
else:
resp = client.patch(f"/output-types/{output_type['id']}", json=output_type_payload)
if resp.status_code != 200:
raise RuntimeError(
f"Workflow smoke output type update failed: {resp.status_code} {resp.text[:400]}"
)
output_type = resp.json()
info(f"Reusing smoke output type: {output_type_name}")
workflow = None
if execution_mode != "legacy":
workflows = get_workflows(client)
workflow = find_named(workflows, workflow_name)
workflow_payload = {
"name": workflow_name,
"output_type_id": output_type["id"],
"config": build_graph_still_config(execution_mode=execution_mode),
"is_active": True,
}
if workflow is None:
resp = client.post("/workflows", json=workflow_payload)
if resp.status_code not in (200, 201):
raise RuntimeError(
f"Workflow smoke workflow create failed: {resp.status_code} {resp.text[:400]}"
)
workflow = resp.json()
ok(f"Provisioned smoke workflow: {workflow_name}")
else:
resp = client.put(
f"/workflows/{workflow['id']}",
json={
"name": workflow_payload["name"],
"config": workflow_payload["config"],
"is_active": workflow_payload["is_active"],
},
)
if resp.status_code != 200:
raise RuntimeError(
f"Workflow smoke workflow update failed: {resp.status_code} {resp.text[:400]}"
)
workflow = resp.json()
info(f"Reusing smoke workflow: {workflow_name}")
resp = client.patch(
f"/output-types/{output_type['id']}",
json={"workflow_definition_id": workflow["id"], "is_active": True},
)
if resp.status_code != 200:
raise RuntimeError(
f"Workflow smoke output type link failed: {resp.status_code} {resp.text[:400]}"
)
output_type = resp.json()
else:
workflow = None
return {
"output_type": output_type,
"workflow": workflow,
"execution_mode": execution_mode,
}
# ---------------------------------------------------------------------------
# Test: Render health endpoint
# ---------------------------------------------------------------------------
def test_health(client: APIClient) -> bool:
section("1. Render Health Check")
resp = client.get("/worker/health/render")
if resp.status_code != 200:
fail(f"GET /worker/health/render → {resp.status_code}: {resp.text[:200]}")
return False
data = resp.json()
info(f"Overall status: {data['status']}")
info(f"Render worker connected: {data['render_worker_connected']}")
info(f"Blender available: {data['blender_available']}")
info(f"asset_pipeline queue depth: {data['thumbnail_queue_depth']}")
if data.get("last_render_at"):
info(f"Last render: {data['last_render_at']} ({'success' if data['last_render_success'] else 'FAILED'}, {data['last_render_age_minutes']}m ago)")
if data["render_worker_connected"]:
ok("Render worker connected")
else:
fail("Render worker NOT connected — renders will fail")
if data["blender_available"]:
ok("Blender renderer reachable (port 8100)")
else:
fail("Blender renderer NOT reachable — thumbnail/order renders will fail")
if data["thumbnail_queue_ok"]:
ok(f"asset_pipeline queue healthy (depth={data['thumbnail_queue_depth']})")
else:
warn(f"asset_pipeline queue DEEP ({data['thumbnail_queue_depth']} tasks) — renders may be slow")
return data["status"] != "down"
# ---------------------------------------------------------------------------
# Test: STEP upload + CAD processing
# ---------------------------------------------------------------------------
def test_step_upload(client: APIClient, step_file: Path) -> str | None:
"""Upload STEP file, wait for completed processing. Returns cad_file_id or None."""
section("2. STEP Upload + CAD Processing")
if not step_file.exists():
fail(f"Sample STEP file not found: {step_file}")
return None
info(f"Uploading {step_file.name} ({step_file.stat().st_size // 1024} KB)")
with open(step_file, "rb") as f:
resp = client.post(
"/uploads/step",
files={"file": (step_file.name, f, "application/octet-stream")},
)
if resp.status_code not in (200, 201):
fail(f"STEP upload failed: {resp.status_code} {resp.text[:300]}")
return None
data = resp.json()
cad_file_id = data["cad_file_id"]
ok(f"STEP uploaded → cad_file_id={cad_file_id[:8]}... status={data.get('status')}")
# Poll the existing CAD endpoints. There is no GET /api/cad/{id}; the most
# reliable readiness signal is /objects returning 200 with processing_status.
info(f"Waiting for CAD processing (timeout={CAD_PROCESSING_TIMEOUT}s)...")
deadline = time.time() + CAD_PROCESSING_TIMEOUT
last_status = None
while time.time() < deadline:
resp_objects = client.get(f"/cad/{cad_file_id}/objects")
if resp_objects.status_code == 200:
cad = resp_objects.json()
status = cad.get("processing_status")
if status != last_status:
info(f" CAD status: {status}")
last_status = status
if status == "completed":
ok("CAD processing completed (parsed objects available)")
return cad_file_id
if status == "failed":
fail(f"CAD processing FAILED: {cad.get('error_message', 'unknown error')}")
return None
resp_thumb = client.get(f"/cad/{cad_file_id}/thumbnail")
if resp_thumb.status_code == 200:
if last_status != "completed":
info(" CAD status: completed")
last_status = "completed"
ok("CAD processing completed (thumbnail available)")
return cad_file_id
time.sleep(POLL_INTERVAL_SECONDS)
fail(f"CAD processing timed out after {CAD_PROCESSING_TIMEOUT}s (last status: {last_status})")
return None
# ---------------------------------------------------------------------------
# Helpers: Product / Order / Workflow tracking
# ---------------------------------------------------------------------------
def get_or_create_test_product(client: APIClient, cad_file_id: str) -> str | None:
product_id = None
resp_products = client.get("/products/?limit=100")
if resp_products.status_code == 200:
products = resp_products.json()
if isinstance(products, dict):
products = products.get("items", [])
for p in products:
if str(p.get("cad_file_id")) == cad_file_id:
product_id = str(p["id"])
info(f"Using existing product: {p.get('name', p['id'])[:40]}")
break
if product_id:
return product_id
resp_create = client.post("/products/", json={
"name": f"Test Product {cad_file_id[:8]}",
"pim_id": f"TEST-{cad_file_id[:8]}",
"is_active": True,
"cad_file_id": cad_file_id,
})
if resp_create.status_code not in (200, 201):
fail(f"Product creation failed: {resp_create.status_code} {resp_create.text[:200]}")
return None
product_id = resp_create.json()["id"]
ok(f"Created test product: {product_id[:8]}...")
return product_id
def create_test_order(
client: APIClient,
*,
product_id: str,
output_type_ids: list[str],
test_label: str,
) -> dict | None:
resp_order = client.post(
"/orders",
json={
"notes": f"Render pipeline integration test: {test_label}",
"items": [],
"lines": [
{"product_id": product_id, "output_type_id": ot_id}
for ot_id in output_type_ids
],
},
)
if resp_order.status_code not in (200, 201):
fail(f"Order creation failed: {resp_order.status_code} {resp_order.text[:300]}")
return None
order = resp_order.json()
order_id = order["id"]
ok(f"Order created: {order.get('order_number')} (id={order_id[:8]}...)")
return order
def wait_for_workflow_run(
client: APIClient,
*,
workflow_id: str,
line_id: str,
timeout_seconds: int = 60,
) -> dict | None:
deadline = time.time() + timeout_seconds
while time.time() < deadline:
resp = client.get(f"/workflows/{workflow_id}/runs")
if resp.status_code == 200:
for run in resp.json():
if run.get("order_line_id") == line_id:
return run
time.sleep(2)
return None
# ---------------------------------------------------------------------------
# Test: Order creation + submit + dispatch + wait
# ---------------------------------------------------------------------------
def test_order_render(
client: APIClient,
cad_file_id: str,
output_type_ids: list[str],
test_label: str,
*,
use_graph_dispatch: bool = False,
) -> bool:
"""Create a minimal order, submit, dispatch renders, wait for completion."""
section(f"3. Order Render — {test_label}")
info(f"Output types: {len(output_type_ids)}")
product_id = get_or_create_test_product(client, cad_file_id)
if not product_id:
return False
order = create_test_order(
client,
product_id=product_id,
output_type_ids=output_type_ids,
test_label=test_label,
)
if order is None:
return False
return _submit_and_wait(
client,
order,
output_type_ids,
use_graph_dispatch=use_graph_dispatch,
)
def _submit_and_wait(
client: APIClient,
order: dict,
output_type_ids: list[str],
*,
use_graph_dispatch: bool = False,
) -> bool:
order_id = order["id"]
# Submit
resp_sub = client.post(f"/orders/{order_id}/submit")
if resp_sub.status_code not in (200, 201, 204):
if resp_sub.status_code == 409:
info("Order already submitted")
else:
fail(f"Order submit failed: {resp_sub.status_code} {resp_sub.text[:200]}")
return False
else:
ok("Order submitted")
dispatch_run_id = None
if use_graph_dispatch:
lines = order.get("lines", [])
if len(lines) != 1:
fail("Graph mode currently expects exactly one order line per test order")
return False
line_id = lines[0]["id"]
resp_disp = client.post(
"/workflows/dispatch",
json={
"context_id": line_id,
"config": build_graph_still_config(),
},
)
if resp_disp.status_code not in (200, 201):
fail(f"Workflow draft dispatch failed: {resp_disp.status_code} {resp_disp.text[:300]}")
return False
dispatch_data = resp_disp.json()
dispatch_run_id = dispatch_data["workflow_run"]["id"]
ok(f"Graph workflow dispatched (run={dispatch_run_id[:8]}..., tasks={dispatch_data.get('dispatched', '?')})")
else:
resp_disp = client.post(f"/orders/{order_id}/dispatch-renders")
if resp_disp.status_code not in (200, 201, 204):
fail(f"Dispatch renders failed: {resp_disp.status_code} {resp_disp.text[:200]}")
return False
dispatch_data = resp_disp.json() if resp_disp.content else {}
dispatched = dispatch_data.get("dispatched", "?")
ok(f"Renders dispatched ({dispatched} lines)")
# Poll for order completion
info(f"Waiting for renders to complete (timeout={RENDER_TIMEOUT_SECONDS}s per OT)...")
deadline = time.time() + RENDER_TIMEOUT_SECONDS * max(len(output_type_ids), 1)
last_summary = ""
while time.time() < deadline:
resp_ord = client.get(f"/orders/{order_id}")
if resp_ord.status_code != 200:
fail(f"Order poll failed: {resp_ord.status_code}")
return False
order = resp_ord.json()
order_status = order.get("status")
lines = order.get("lines", order.get("order_lines", []))
statuses = [l.get("render_status") for l in lines]
summary = f"order={order_status} lines={statuses}"
if summary != last_summary:
info(f" {summary}")
last_summary = summary
terminal_states = {"completed", "failed", "cancelled"}
line_states = [state for state in statuses if state]
if line_states and all(state in terminal_states for state in line_states):
all_success = all(state == "completed" for state in line_states)
if order_status == "completed":
ok(f"Order completed — all {len(lines)} render(s) done")
elif all_success:
ok(
f"All {len(lines)} render line(s) completed "
f"(order status remains {order_status})"
)
else:
fail(f"Order reached terminal line states with order={order_status}")
for line in lines:
rs = line.get("render_status")
ot_name = line.get("output_type_name") or line.get("output_type", {}).get("name", "?")
if rs == "completed":
ok(f" Line [{ot_name}]: completed")
elif rs == "failed":
fail(f" Line [{ot_name}]: FAILED")
else:
warn(f" Line [{ot_name}]: {rs}")
if all_success and dispatch_run_id:
resp_cmp = client.get(f"/workflows/runs/{dispatch_run_id}/comparison")
if resp_cmp.status_code == 200:
comparison = resp_cmp.json()
rollout_gate = evaluate_rollout_gate_from_comparison(comparison)
verdict = rollout_gate["verdict"]
if verdict == "pass":
ok(" Rollout gate PASS — graph output is ready for workflow-first rollout")
elif verdict == "warn":
warn(" Rollout gate WARN — keep legacy authoritative and review drift")
else:
warn(" Rollout gate FAIL — keep legacy authoritative")
info(f" Comparison status: {comparison.get('status')}, verdict={verdict}")
for reason in rollout_gate["reasons"]:
info(f" {reason}")
else:
warn(f" Comparison lookup failed: {resp_cmp.status_code}")
return all_success
if order_status == "failed":
fail("Order FAILED — check render logs")
return False
time.sleep(POLL_INTERVAL_SECONDS)
fail(f"Render timed out after {(time.time() - deadline + RENDER_TIMEOUT_SECONDS * max(len(output_type_ids), 1)):.0f}s")
return False
def test_workflow_still_smoke(
client: APIClient,
cad_file_id: str,
*,
execution_mode: str,
) -> bool:
section(f"3. Workflow Still Smoke — {execution_mode}")
smoke_resources = ensure_workflow_still_smoke_resources(
client,
execution_mode=execution_mode,
)
output_type = smoke_resources["output_type"]
workflow = smoke_resources["workflow"]
info(
f"Smoke contract: output_type={output_type['name']} "
f"workflow={workflow['name'] if workflow else 'legacy-only'}"
)
product_id = get_or_create_test_product(client, cad_file_id)
if not product_id:
return False
order = create_test_order(
client,
product_id=product_id,
output_type_ids=[output_type["id"]],
test_label=f"Workflow Still Smoke [{execution_mode}]",
)
if order is None:
return False
lines = order.get("lines", [])
if len(lines) != 1:
fail("Workflow still smoke expects exactly one order line")
return False
line_id = lines[0]["id"]
if workflow is not None:
resp_preflight = client.get(
f"/workflows/{workflow['id']}/preflight",
params={"context_id": line_id},
)
if resp_preflight.status_code != 200:
fail(f"Workflow preflight failed: {resp_preflight.status_code} {resp_preflight.text[:300]}")
return False
preflight = resp_preflight.json()
info(
"Preflight: "
f"execution_mode={preflight.get('execution_mode')} "
f"context={preflight.get('context_kind')} "
f"allowed={preflight.get('graph_dispatch_allowed')}"
)
if not preflight.get("graph_dispatch_allowed"):
fail(f"Workflow preflight blocked dispatch: {preflight.get('summary')}")
for issue in preflight.get("issues", []):
info(f" {issue.get('code')}: {issue.get('message')}")
return False
ok(f"Workflow preflight passed for {execution_mode} mode")
success = _submit_and_wait(
client,
order,
[output_type["id"]],
use_graph_dispatch=False,
)
workflow_run = None
if workflow is not None:
workflow_run = wait_for_workflow_run(
client,
workflow_id=workflow["id"],
line_id=line_id,
)
if workflow_run is None:
warn("Workflow run could not be resolved after dispatch")
else:
ok(
f"Workflow run tracked: mode={workflow_run.get('execution_mode')} "
f"run={workflow_run.get('id')[:8]}..."
)
if success and execution_mode == "shadow" and workflow_run is not None:
resp_cmp = client.get(f"/workflows/runs/{workflow_run['id']}/comparison")
if resp_cmp.status_code != 200:
warn(f"Shadow comparison lookup failed: {resp_cmp.status_code} {resp_cmp.text[:300]}")
return success
comparison = resp_cmp.json()
rollout_gate = evaluate_rollout_gate_from_comparison(comparison)
verdict = rollout_gate["verdict"]
info(
"Shadow comparison: "
f"status={comparison.get('status')} "
f"exact_match={comparison.get('exact_match')} "
f"mean_pixel_delta={comparison.get('mean_pixel_delta')}"
)
if verdict == "pass":
ok("Shadow rollout gate PASS — canonical still workflow is ready for workflow-first rollout")
elif verdict == "warn":
warn("Shadow rollout gate WARN — keep legacy authoritative and review drift")
else:
warn("Shadow rollout gate FAIL — keep legacy authoritative")
for reason in rollout_gate["reasons"]:
info(f" {reason}")
return success
# ---------------------------------------------------------------------------
# Get output types
# ---------------------------------------------------------------------------
def get_output_types(client: APIClient, *, include_inactive: bool = False) -> list[dict]:
params = {"include_inactive": "true"} if include_inactive else None
resp = client.get("/output-types/", params=params)
if resp.status_code != 200:
resp = client.get("/output-types", params=params)
if resp.status_code != 200:
return []
data = resp.json()
if isinstance(data, dict):
data = data.get("items", [])
return [ot for ot in data if ot.get("is_active", True)]
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Render pipeline integration tests")
parser.add_argument("--host", default=DEFAULT_HOST)
parser.add_argument("--email", default=DEFAULT_EMAIL)
parser.add_argument("--password", default=DEFAULT_PASSWORD)
parser.add_argument("--health", action="store_true", help="Only run health check")
parser.add_argument("--sample", action="store_true", help="Quick sample test (1 STEP, 1 OT)")
parser.add_argument("--full", action="store_true", help="Full test (all output types)")
parser.add_argument("--graph", action="store_true", help="Dispatch sample/full renders via /api/workflows/dispatch")
parser.add_argument(
"--workflow-still-smoke",
action="store_true",
help="Run the canonical still workflow smoke path via real order dispatch",
)
parser.add_argument(
"--execution-mode",
choices=["legacy", "graph", "shadow"],
default="shadow",
help="Execution mode for --workflow-still-smoke (default: shadow)",
)
parser.add_argument("--step", default=str(SAMPLE_STEP), help="Path to STEP file")
args = parser.parse_args()
if not any([args.health, args.sample, args.full, args.workflow_still_smoke]):
parser.print_help()
sys.exit(0)
print(f"\n{BLUE}Render Pipeline Test{RESET}")
print(f"Host: {args.host}")
mode_label = "health"
if args.workflow_still_smoke:
mode_label = f"workflow-still-smoke[{args.execution_mode}]"
elif args.sample:
mode_label = "sample"
elif args.full:
mode_label = "full"
print(f"Mode: {mode_label}")
# Login
try:
client = APIClient(args.host, args.email, args.password)
ok(f"Authenticated as {args.email}")
except Exception as exc:
fail(f"Authentication failed: {exc}")
sys.exit(1)
# Health check
health_ok = test_health(client)
if args.health:
_print_summary()
sys.exit(0 if not failed else 1)
if not health_ok:
warn("Health check failed — render tests may not work. Continuing anyway...")
# STEP upload
step_path = Path(args.step)
cad_file_id = test_step_upload(client, step_path)
if not cad_file_id:
fail("STEP processing failed — cannot proceed to render tests")
_print_summary()
sys.exit(1)
if args.workflow_still_smoke:
test_workflow_still_smoke(
client,
cad_file_id,
execution_mode=args.execution_mode,
)
elif args.sample:
output_types = get_output_types(client)
if not output_types:
fail("No active output types found")
_print_summary()
sys.exit(1)
info(f"Found {len(output_types)} active output types: {[ot['name'] for ot in output_types]}")
# Pick the first non-animation output type (fastest)
ot = next(
(ot for ot in output_types if not ot.get("is_animation") and "LQ" in ot["name"].upper()),
output_types[0],
)
info(f"Sample test using output type: {ot['name']}")
test_order_render(
client,
cad_file_id,
[ot["id"]],
f"Sample [{ot['name']}]",
use_graph_dispatch=args.graph,
)
elif args.full:
output_types = get_output_types(client)
if not output_types:
fail("No active output types found")
_print_summary()
sys.exit(1)
info(f"Found {len(output_types)} active output types: {[ot['name'] for ot in output_types]}")
# Test each output type individually
for ot in output_types:
if ot.get("is_animation"):
warn(f"Skipping animation output type: {ot['name']} (too slow for full test)")
continue
test_order_render(
client,
cad_file_id,
[ot["id"]],
ot["name"],
use_graph_dispatch=args.graph,
)
_print_summary()
sys.exit(0 if not failed else 1)
def _print_summary():
section("Test Summary")
print(f" {GREEN}Passed:{RESET} {len(passed)}")
print(f" {RED}Failed:{RESET} {len(failed)}")
print(f" {YELLOW}Warnings:{RESET} {len(warnings)}")
if failed:
print(f"\n{RED}FAILURES:{RESET}")
for f_ in failed:
print(f" - {f_}")
if not failed:
print(f"\n{GREEN}All tests passed!{RESET}")
else:
print(f"\n{RED}Tests FAILED{RESET}")
if __name__ == "__main__":
main()