diff --git a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md index e08eb2d..64c3ffe 100644 --- a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md +++ b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md @@ -1,5 +1,7 @@ # Workflow Delivery Checklist +Parallel execution ownership and stage gates are defined in [`docs/workflows/WORKERS.md`](/home/hartmut/Documents/Copilot/schaefflerautomat/docs/workflows/WORKERS.md). + ## Phase Checklist ### Phase 1 @@ -30,7 +32,7 @@ - [x] Node outputs are persisted and reusable - [x] Graph runtime supports legacy fallback - [x] `legacy`, `graph`, and `shadow` modes exist -- Progress: Workflow configs now normalize to an explicit execution mode, the editor exposes and persists `legacy`/`graph`/`shadow`, production order-line dispatch can opt into graph mode with hard fallback to legacy on graph failure, and workflow runs now persist their execution mode for safer status tracking and rollout inspection. +- Progress: Workflow configs now normalize to an explicit execution mode, the editor exposes and persists `legacy`/`graph`/`shadow`, production order-line dispatch can opt into graph mode with hard fallback to legacy on graph failure, workflow runs persist their execution mode, `notify` handoff is armed only for authoritative graph renders, and `output_save` is now graph-authoritative for still renders, turntable/video renders, and `.blend` exports while shadow runs remain observer-only. ### Phase 5 @@ -39,11 +41,20 @@ - [ ] All node settings are editable - [ ] Validate, dry-run, and dispatch are available - [ ] Runs are visible with node-level status and logs +- [ ] Editor authoring follows family-safe module contracts instead of ad hoc node metadata + +### Phase 7 + +- [x] Output-type create defaults match current backend constraints +- [ ] Output types model workflow invocation profiles +- [ ] Output types validate against workflow family and artifact contract +- [ ] Admin create/edit flow is workflow-first instead of renderer-first ### Phase 6 - [x] Shadow mode parity execution dispatches real graph observer runs alongside authoritative legacy dispatch - Progress: Workflow runs now expose a comparison endpoint that resolves authoritative legacy outputs and matching shadow artifacts, including file hashes, image dimensions, and mean pixel delta for parity inspection. +- Progress: `scripts/test_render_pipeline.py --workflow-still-smoke --execution-mode shadow` now provisions the canonical still smoke contract, runs preflight, dispatches via the real order/output-type workflow linkage, resolves the resulting workflow run, and prints the shadow comparison verdict. - [ ] Golden cases pass against legacy outputs - [ ] Rollout can be enabled per workflow or output type - [ ] Rollback to legacy is immediate @@ -62,6 +73,7 @@ - backend node definition - validated settings schema - default params + - family and module contract metadata - executor coverage or explicit disabled status ### QG-3: Legacy Safety Gate @@ -78,21 +90,40 @@ - media asset creation - notifications - core render log fields +- For graph still renders with downstream `output_save`, no duplicate self-published `MediaAsset` is created before the authoritative graph save step completes. +- For graph turntable/video renders with downstream `output_save`, no duplicate self-published `MediaAsset` is created before the authoritative graph save step completes. +- For graph `.blend` exports with downstream `output_save`, no duplicate self-published `MediaAsset` is created before the authoritative graph save step completes. ### QG-5: Editor Gate - Workflow configs survive save/load roundtrip without loss. - Invalid graphs are blocked before dispatch. - All node settings needed for parity are present in the editor. +- Family-specific authoring prevents invalid `cad_file`/`order_line` graph composition. + +### QG-7: Invocation Gate + +- Output type creation and editing use valid backend defaults. +- Output types bind to workflows through an explicit invocation contract. +- Legacy output types remain renderable during migration. ### QG-6: Rollout Gate - Shadow mode has been exercised on representative workflows. - Graph runtime error rate is at or below legacy error rate. - Rollout and rollback are possible per workflow or output type. +- Canonical still rollout smoke commands: + - `python scripts/test_render_pipeline.py --workflow-still-smoke --execution-mode legacy` + - `python scripts/test_render_pipeline.py --workflow-still-smoke --execution-mode graph` + - `python scripts/test_render_pipeline.py --workflow-still-smoke --execution-mode shadow` +- Rollout approval rule for the canonical still workflow: + - `shadow` must finish with a successful order line and a comparison verdict of `pass` + - `warn` or `fail` means legacy remains authoritative + - `graph` may only be enabled on real output types after the shadow command passes cleanly ## Definition of Done - `/workflows` is production-capable for authoring and running workflows. - Legacy functionality is available in graph form with parity coverage. - Legacy execution still exists as a supported fallback. +- Output types are modeled as workflow invocation profiles, not as loose legacy render presets. diff --git a/scripts/test_render_pipeline.py b/scripts/test_render_pipeline.py index 14f180e..7d6637d 100644 --- a/scripts/test_render_pipeline.py +++ b/scripts/test_render_pipeline.py @@ -158,14 +158,20 @@ class APIClient: def post(self, path: str, **kwargs) -> requests.Response: return self.session.post(f"{self.host}/api{path}", **kwargs) + def put(self, path: str, **kwargs) -> requests.Response: + return self.session.put(f"{self.host}/api{path}", **kwargs) + + def patch(self, path: str, **kwargs) -> requests.Response: + return self.session.patch(f"{self.host}/api{path}", **kwargs) + def delete(self, path: str, **kwargs) -> requests.Response: return self.session.delete(f"{self.host}/api{path}", **kwargs) -def build_graph_still_config() -> dict: +def build_graph_still_config(*, execution_mode: str = "graph") -> dict: return { "version": 1, - "ui": {"preset": "still_graph", "execution_mode": "graph"}, + "ui": {"preset": "still_graph", "execution_mode": execution_mode}, "nodes": [ { "id": "setup", @@ -200,6 +206,130 @@ def build_graph_still_config() -> dict: } +def get_workflows(client: APIClient) -> list[dict]: + resp = client.get("/workflows") + if resp.status_code != 200: + return [] + data = resp.json() + return data if isinstance(data, list) else [] + + +def find_named(items: list[dict], name: str) -> dict | None: + return next((item for item in items if item.get("name") == name), None) + + +def smoke_output_type_name(execution_mode: str) -> str: + return f"[Workflow Smoke] Still {execution_mode.title()}" + + +def smoke_workflow_name(execution_mode: str) -> str: + return f"[Workflow Smoke] Canonical Still {execution_mode.title()}" + + +def ensure_workflow_still_smoke_resources( + client: APIClient, + *, + execution_mode: str, +) -> dict: + output_type_name = smoke_output_type_name(execution_mode) + workflow_name = smoke_workflow_name(execution_mode) + output_types = get_output_types(client, include_inactive=True) + output_type = find_named(output_types, output_type_name) + + invocation_overrides = { + "width": 1024, + "height": 1024, + "engine": "cycles", + "samples": 64, + } + output_type_payload = { + "name": output_type_name, + "description": f"Canonical still workflow smoke profile ({execution_mode})", + "renderer": "blender", + "render_settings": invocation_overrides, + "output_format": "png", + "sort_order": 0, + "is_active": True, + "compatible_categories": [], + "render_backend": "celery", + "is_animation": False, + "transparent_bg": False, + "workflow_family": "order_line", + "artifact_kind": "still_image", + "invocation_overrides": invocation_overrides, + "workflow_definition_id": None, + } + + if output_type is None: + resp = client.post("/output-types", json=output_type_payload) + if resp.status_code not in (200, 201): + raise RuntimeError( + f"Workflow smoke output type create failed: {resp.status_code} {resp.text[:400]}" + ) + output_type = resp.json() + ok(f"Provisioned smoke output type: {output_type_name}") + else: + resp = client.patch(f"/output-types/{output_type['id']}", json=output_type_payload) + if resp.status_code != 200: + raise RuntimeError( + f"Workflow smoke output type update failed: {resp.status_code} {resp.text[:400]}" + ) + output_type = resp.json() + info(f"Reusing smoke output type: {output_type_name}") + + workflow = None + if execution_mode != "legacy": + workflows = get_workflows(client) + workflow = find_named(workflows, workflow_name) + workflow_payload = { + "name": workflow_name, + "output_type_id": output_type["id"], + "config": build_graph_still_config(execution_mode=execution_mode), + "is_active": True, + } + if workflow is None: + resp = client.post("/workflows", json=workflow_payload) + if resp.status_code not in (200, 201): + raise RuntimeError( + f"Workflow smoke workflow create failed: {resp.status_code} {resp.text[:400]}" + ) + workflow = resp.json() + ok(f"Provisioned smoke workflow: {workflow_name}") + else: + resp = client.put( + f"/workflows/{workflow['id']}", + json={ + "name": workflow_payload["name"], + "config": workflow_payload["config"], + "is_active": workflow_payload["is_active"], + }, + ) + if resp.status_code != 200: + raise RuntimeError( + f"Workflow smoke workflow update failed: {resp.status_code} {resp.text[:400]}" + ) + workflow = resp.json() + info(f"Reusing smoke workflow: {workflow_name}") + + resp = client.patch( + f"/output-types/{output_type['id']}", + json={"workflow_definition_id": workflow["id"], "is_active": True}, + ) + if resp.status_code != 200: + raise RuntimeError( + f"Workflow smoke output type link failed: {resp.status_code} {resp.text[:400]}" + ) + output_type = resp.json() + else: + workflow = None + + return { + "output_type": output_type, + "workflow": workflow, + "execution_mode": execution_mode, + } + + # --------------------------------------------------------------------------- # Test: Render health endpoint # --------------------------------------------------------------------------- @@ -298,6 +428,86 @@ def test_step_upload(client: APIClient, step_file: Path) -> str | None: return None +# --------------------------------------------------------------------------- +# Helpers: Product / Order / Workflow tracking +# --------------------------------------------------------------------------- + +def get_or_create_test_product(client: APIClient, cad_file_id: str) -> str | None: + product_id = None + resp_products = client.get("/products/?limit=100") + if resp_products.status_code == 200: + products = resp_products.json() + if isinstance(products, dict): + products = products.get("items", []) + for p in products: + if str(p.get("cad_file_id")) == cad_file_id: + product_id = str(p["id"]) + info(f"Using existing product: {p.get('name', p['id'])[:40]}") + break + + if product_id: + return product_id + + resp_create = client.post("/products/", json={ + "name": f"Test Product {cad_file_id[:8]}", + "pim_id": f"TEST-{cad_file_id[:8]}", + "is_active": True, + "cad_file_id": cad_file_id, + }) + if resp_create.status_code not in (200, 201): + fail(f"Product creation failed: {resp_create.status_code} {resp_create.text[:200]}") + return None + product_id = resp_create.json()["id"] + ok(f"Created test product: {product_id[:8]}...") + return product_id + + +def create_test_order( + client: APIClient, + *, + product_id: str, + output_type_ids: list[str], + test_label: str, +) -> dict | None: + resp_order = client.post( + "/orders", + json={ + "notes": f"Render pipeline integration test: {test_label}", + "items": [], + "lines": [ + {"product_id": product_id, "output_type_id": ot_id} + for ot_id in output_type_ids + ], + }, + ) + if resp_order.status_code not in (200, 201): + fail(f"Order creation failed: {resp_order.status_code} {resp_order.text[:300]}") + return None + + order = resp_order.json() + order_id = order["id"] + ok(f"Order created: {order.get('order_number')} (id={order_id[:8]}...)") + return order + + +def wait_for_workflow_run( + client: APIClient, + *, + workflow_id: str, + line_id: str, + timeout_seconds: int = 60, +) -> dict | None: + deadline = time.time() + timeout_seconds + while time.time() < deadline: + resp = client.get(f"/workflows/{workflow_id}/runs") + if resp.status_code == 200: + for run in resp.json(): + if run.get("order_line_id") == line_id: + return run + time.sleep(2) + return None + + # --------------------------------------------------------------------------- # Test: Order creation + submit + dispatch + wait # --------------------------------------------------------------------------- @@ -314,52 +524,18 @@ def test_order_render( section(f"3. Order Render — {test_label}") info(f"Output types: {len(output_type_ids)}") - # Get a product that uses this CAD file - # Find or create a product linked to this CAD file - product_id = None - resp_products = client.get("/products/?limit=100") - if resp_products.status_code == 200: - products = resp_products.json() - if isinstance(products, dict): - products = products.get("items", []) - for p in products: - if str(p.get("cad_file_id")) == cad_file_id: - product_id = str(p["id"]) - info(f"Using existing product: {p.get('name', p['id'])[:40]}") - break - + product_id = get_or_create_test_product(client, cad_file_id) if not product_id: - # Create a minimal test product - resp_create = client.post("/products/", json={ - "name": f"Test Product {cad_file_id[:8]}", - "pim_id": f"TEST-{cad_file_id[:8]}", - "is_active": True, - "cad_file_id": cad_file_id, - }) - if resp_create.status_code not in (200, 201): - fail(f"Product creation failed: {resp_create.status_code} {resp_create.text[:200]}") - return False - product_id = resp_create.json()["id"] - ok(f"Created test product: {product_id[:8]}...") - - resp_order = client.post( - "/orders", - json={ - "notes": f"Render pipeline integration test: {test_label}", - "items": [], - "lines": [ - {"product_id": product_id, "output_type_id": ot_id} - for ot_id in output_type_ids - ], - }, - ) - if resp_order.status_code not in (200, 201): - fail(f"Order creation failed: {resp_order.status_code} {resp_order.text[:300]}") return False - order = resp_order.json() - order_id = order["id"] - ok(f"Order created: {order.get('order_number')} (id={order_id[:8]}...)") + order = create_test_order( + client, + product_id=product_id, + output_type_ids=output_type_ids, + test_label=test_label, + ) + if order is None: + return False return _submit_and_wait( client, @@ -488,14 +664,125 @@ def _submit_and_wait( return False +def test_workflow_still_smoke( + client: APIClient, + cad_file_id: str, + *, + execution_mode: str, +) -> bool: + section(f"3. Workflow Still Smoke — {execution_mode}") + + smoke_resources = ensure_workflow_still_smoke_resources( + client, + execution_mode=execution_mode, + ) + output_type = smoke_resources["output_type"] + workflow = smoke_resources["workflow"] + + info( + f"Smoke contract: output_type={output_type['name']} " + f"workflow={workflow['name'] if workflow else 'legacy-only'}" + ) + + product_id = get_or_create_test_product(client, cad_file_id) + if not product_id: + return False + + order = create_test_order( + client, + product_id=product_id, + output_type_ids=[output_type["id"]], + test_label=f"Workflow Still Smoke [{execution_mode}]", + ) + if order is None: + return False + + lines = order.get("lines", []) + if len(lines) != 1: + fail("Workflow still smoke expects exactly one order line") + return False + line_id = lines[0]["id"] + + if workflow is not None: + resp_preflight = client.get( + f"/workflows/{workflow['id']}/preflight", + params={"context_id": line_id}, + ) + if resp_preflight.status_code != 200: + fail(f"Workflow preflight failed: {resp_preflight.status_code} {resp_preflight.text[:300]}") + return False + preflight = resp_preflight.json() + info( + "Preflight: " + f"execution_mode={preflight.get('execution_mode')} " + f"context={preflight.get('context_kind')} " + f"allowed={preflight.get('graph_dispatch_allowed')}" + ) + if not preflight.get("graph_dispatch_allowed"): + fail(f"Workflow preflight blocked dispatch: {preflight.get('summary')}") + for issue in preflight.get("issues", []): + info(f" {issue.get('code')}: {issue.get('message')}") + return False + ok(f"Workflow preflight passed for {execution_mode} mode") + + success = _submit_and_wait( + client, + order, + [output_type["id"]], + use_graph_dispatch=False, + ) + + workflow_run = None + if workflow is not None: + workflow_run = wait_for_workflow_run( + client, + workflow_id=workflow["id"], + line_id=line_id, + ) + if workflow_run is None: + warn("Workflow run could not be resolved after dispatch") + else: + ok( + f"Workflow run tracked: mode={workflow_run.get('execution_mode')} " + f"run={workflow_run.get('id')[:8]}..." + ) + + if success and execution_mode == "shadow" and workflow_run is not None: + resp_cmp = client.get(f"/workflows/runs/{workflow_run['id']}/comparison") + if resp_cmp.status_code != 200: + warn(f"Shadow comparison lookup failed: {resp_cmp.status_code} {resp_cmp.text[:300]}") + return success + + comparison = resp_cmp.json() + rollout_gate = evaluate_rollout_gate_from_comparison(comparison) + verdict = rollout_gate["verdict"] + info( + "Shadow comparison: " + f"status={comparison.get('status')} " + f"exact_match={comparison.get('exact_match')} " + f"mean_pixel_delta={comparison.get('mean_pixel_delta')}" + ) + if verdict == "pass": + ok("Shadow rollout gate PASS — canonical still workflow is ready for workflow-first rollout") + elif verdict == "warn": + warn("Shadow rollout gate WARN — keep legacy authoritative and review drift") + else: + warn("Shadow rollout gate FAIL — keep legacy authoritative") + for reason in rollout_gate["reasons"]: + info(f" {reason}") + + return success + + # --------------------------------------------------------------------------- # Get output types # --------------------------------------------------------------------------- -def get_output_types(client: APIClient) -> list[dict]: - resp = client.get("/output-types/") +def get_output_types(client: APIClient, *, include_inactive: bool = False) -> list[dict]: + params = {"include_inactive": "true"} if include_inactive else None + resp = client.get("/output-types/", params=params) if resp.status_code != 200: - resp = client.get("/output-types") + resp = client.get("/output-types", params=params) if resp.status_code != 200: return [] data = resp.json() @@ -517,16 +804,34 @@ def main(): parser.add_argument("--sample", action="store_true", help="Quick sample test (1 STEP, 1 OT)") parser.add_argument("--full", action="store_true", help="Full test (all output types)") parser.add_argument("--graph", action="store_true", help="Dispatch sample/full renders via /api/workflows/dispatch") + parser.add_argument( + "--workflow-still-smoke", + action="store_true", + help="Run the canonical still workflow smoke path via real order dispatch", + ) + parser.add_argument( + "--execution-mode", + choices=["legacy", "graph", "shadow"], + default="shadow", + help="Execution mode for --workflow-still-smoke (default: shadow)", + ) parser.add_argument("--step", default=str(SAMPLE_STEP), help="Path to STEP file") args = parser.parse_args() - if not any([args.health, args.sample, args.full]): + if not any([args.health, args.sample, args.full, args.workflow_still_smoke]): parser.print_help() sys.exit(0) print(f"\n{BLUE}Render Pipeline Test{RESET}") print(f"Host: {args.host}") - print(f"Mode: {'health' if args.health else 'sample' if args.sample else 'full'}") + mode_label = "health" + if args.workflow_still_smoke: + mode_label = f"workflow-still-smoke[{args.execution_mode}]" + elif args.sample: + mode_label = "sample" + elif args.full: + mode_label = "full" + print(f"Mode: {mode_label}") # Login try: @@ -555,16 +860,21 @@ def main(): _print_summary() sys.exit(1) - # Get output types - output_types = get_output_types(client) - if not output_types: - fail("No active output types found") - _print_summary() - sys.exit(1) + if args.workflow_still_smoke: + test_workflow_still_smoke( + client, + cad_file_id, + execution_mode=args.execution_mode, + ) - info(f"Found {len(output_types)} active output types: {[ot['name'] for ot in output_types]}") + elif args.sample: + output_types = get_output_types(client) + if not output_types: + fail("No active output types found") + _print_summary() + sys.exit(1) - if args.sample: + info(f"Found {len(output_types)} active output types: {[ot['name'] for ot in output_types]}") # Pick the first non-animation output type (fastest) ot = next( (ot for ot in output_types if not ot.get("is_animation") and "LQ" in ot["name"].upper()), @@ -580,6 +890,13 @@ def main(): ) elif args.full: + output_types = get_output_types(client) + if not output_types: + fail("No active output types found") + _print_summary() + sys.exit(1) + + info(f"Found {len(output_types)} active output types: {[ot['name'] for ot in output_types]}") # Test each output type individually for ot in output_types: if ot.get("is_animation"):