221 lines
8.0 KiB
Python
Executable File
221 lines
8.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
sys.path.insert(0, str(ROOT / "backend"))
|
|
|
|
from app.domains.rendering.template_input_audit import ( # noqa: E402
|
|
extract_template_input_marker,
|
|
suggest_workflow_input_schema,
|
|
)
|
|
|
|
DEFAULT_HOST = os.environ.get("TEST_HOST", "http://localhost:8888")
|
|
DEFAULT_EMAIL = os.environ.get("TEST_EMAIL", "admin@hartomat.com")
|
|
DEFAULT_PASSWORD = os.environ.get("TEST_PASSWORD", "Admin1234!")
|
|
DEFAULT_BACKEND_CONTAINER = os.environ.get("HARTOMAT_BACKEND_CONTAINER", "hartomat-backend-1")
|
|
|
|
AUDIT_SCRIPT = """
|
|
import bpy
|
|
import json
|
|
|
|
def props(target):
|
|
out = {}
|
|
for key in target.keys():
|
|
if key == "_RNA_UI":
|
|
continue
|
|
value = target[key]
|
|
if isinstance(value, bool):
|
|
out[key] = value
|
|
elif isinstance(value, (int, float, str)):
|
|
out[key] = value
|
|
else:
|
|
out[key] = str(value)
|
|
return out
|
|
|
|
payload = {
|
|
"file": bpy.data.filepath,
|
|
"scene": bpy.context.scene.name if bpy.context.scene else None,
|
|
"worlds": [{"name": world.name, "props": props(world)} for world in bpy.data.worlds],
|
|
"collections": [{"name": coll.name, "props": props(coll)} for coll in bpy.data.collections],
|
|
"objects": [{"name": obj.name, "type": obj.type, "props": props(obj)} for obj in bpy.data.objects],
|
|
}
|
|
print("===HARTOMAT_TEMPLATE_AUDIT_START===")
|
|
print(json.dumps(payload))
|
|
print("===HARTOMAT_TEMPLATE_AUDIT_END===")
|
|
"""
|
|
|
|
|
|
class APIClient:
|
|
def __init__(self, host: str, email: str, password: str) -> None:
|
|
self.host = host.rstrip("/")
|
|
self.session = requests.Session()
|
|
response = self.session.post(
|
|
f"{self.host}/api/auth/login",
|
|
json={"email": email, "password": password},
|
|
timeout=30,
|
|
)
|
|
response.raise_for_status()
|
|
access_token = response.json()["access_token"]
|
|
self.session.headers["Authorization"] = f"Bearer {access_token}"
|
|
|
|
def list_render_templates(self) -> list[dict[str, Any]]:
|
|
response = self.session.get(f"{self.host}/api/render-templates", timeout=30)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def _docker_cp(container: str, source: str, destination: Path) -> None:
|
|
subprocess.run(
|
|
["docker", "cp", f"{container}:{source}", str(destination)],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
|
|
def _run_blender_audit(blend_file: Path) -> dict[str, Any]:
|
|
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as handle:
|
|
handle.write(AUDIT_SCRIPT)
|
|
audit_script = Path(handle.name)
|
|
try:
|
|
completed = subprocess.run(
|
|
["blender", "-b", str(blend_file), "--python", str(audit_script)],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
finally:
|
|
audit_script.unlink(missing_ok=True)
|
|
|
|
match = re.search(
|
|
r"===HARTOMAT_TEMPLATE_AUDIT_START===\n(.*?)\n===HARTOMAT_TEMPLATE_AUDIT_END===",
|
|
completed.stdout,
|
|
re.DOTALL,
|
|
)
|
|
if not match:
|
|
raise RuntimeError(f"Could not parse Blender audit output for {blend_file}")
|
|
return json.loads(match.group(1))
|
|
|
|
|
|
def _collect_markers(audit_payload: dict[str, Any]) -> list[dict[str, str]]:
|
|
markers: list[dict[str, str]] = []
|
|
for kind in ("worlds", "collections", "objects"):
|
|
for target in audit_payload.get(kind, []):
|
|
marker = extract_template_input_marker(name=target.get("name"), props=target.get("props") or {})
|
|
if marker is None:
|
|
continue
|
|
key, value = marker
|
|
markers.append(
|
|
{
|
|
"target_kind": kind[:-1],
|
|
"target_name": target.get("name") or "",
|
|
"key": key,
|
|
"value": value,
|
|
}
|
|
)
|
|
return markers
|
|
|
|
|
|
def _render_markdown_report(report: list[dict[str, Any]]) -> str:
|
|
lines = [
|
|
"# Render Template Input Audit",
|
|
"",
|
|
"Generated from live `.blend` templates via `scripts/audit_render_templates.py`.",
|
|
"",
|
|
]
|
|
for template in report:
|
|
lines.append(f"## {template['name']}")
|
|
lines.append("")
|
|
lines.append(f"- Template ID: `{template['id']}`")
|
|
lines.append(f"- Blend path: `{template['blend_file_path']}`")
|
|
lines.append(f"- Existing workflow_input_schema entries: `{template['existing_schema_count']}`")
|
|
lines.append(f"- Collections: `{', '.join(template['collection_names']) or '-'}`")
|
|
lines.append(f"- Worlds: `{', '.join(template['world_names']) or '-'}`")
|
|
lines.append(f"- Objects: `{', '.join(template['object_names']) or '-'}`")
|
|
if template["markers"]:
|
|
lines.append("- Detected markers:")
|
|
for marker in template["markers"]:
|
|
lines.append(
|
|
f" - `{marker['target_kind']}:{marker['target_name']}` => "
|
|
f"`{marker['key']}={marker['value']}`"
|
|
)
|
|
else:
|
|
lines.append("- Detected markers: none")
|
|
if template["suggested_schema"]:
|
|
lines.append("- Suggested schema:")
|
|
lines.append("")
|
|
lines.append("```json")
|
|
lines.append(json.dumps(template["suggested_schema"], indent=2))
|
|
lines.append("```")
|
|
else:
|
|
lines.append("- Suggested schema: none")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Audit live render templates for template-input markers.")
|
|
parser.add_argument("--host", default=DEFAULT_HOST)
|
|
parser.add_argument("--email", default=DEFAULT_EMAIL)
|
|
parser.add_argument("--password", default=DEFAULT_PASSWORD)
|
|
parser.add_argument("--backend-container", default=DEFAULT_BACKEND_CONTAINER)
|
|
parser.add_argument("--write-markdown", help="Optional markdown report output path.")
|
|
parser.add_argument("--json", action="store_true", help="Emit full JSON report.")
|
|
args = parser.parse_args()
|
|
|
|
client = APIClient(args.host, args.email, args.password)
|
|
templates = client.list_render_templates()
|
|
|
|
report: list[dict[str, Any]] = []
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_path = Path(temp_dir)
|
|
for template in templates:
|
|
blend_file_path = template["blend_file_path"]
|
|
local_blend = temp_path / f"{template['id']}.blend"
|
|
_docker_cp(args.backend_container, blend_file_path, local_blend)
|
|
audit_payload = _run_blender_audit(local_blend)
|
|
markers = _collect_markers(audit_payload)
|
|
suggested_schema = suggest_workflow_input_schema(
|
|
(marker["key"], marker["value"]) for marker in markers
|
|
)
|
|
report.append(
|
|
{
|
|
"id": template["id"],
|
|
"name": template["name"],
|
|
"blend_file_path": blend_file_path,
|
|
"existing_schema_count": len(template.get("workflow_input_schema") or []),
|
|
"world_names": [item["name"] for item in audit_payload.get("worlds", [])],
|
|
"collection_names": [item["name"] for item in audit_payload.get("collections", [])],
|
|
"object_names": [item["name"] for item in audit_payload.get("objects", [])],
|
|
"markers": markers,
|
|
"suggested_schema": suggested_schema,
|
|
}
|
|
)
|
|
|
|
if args.write_markdown:
|
|
output_path = Path(args.write_markdown)
|
|
output_path.write_text(_render_markdown_report(report) + "\n", encoding="utf-8")
|
|
|
|
if args.json:
|
|
print(json.dumps(report, indent=2))
|
|
else:
|
|
for template in report:
|
|
print(f"{template['name']}: markers={len(template['markers'])}, suggested_schema={len(template['suggested_schema'])}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|