diff --git a/backend/app/domains/rendering/schemas.py b/backend/app/domains/rendering/schemas.py index cbae437..2db6c2d 100644 --- a/backend/app/domains/rendering/schemas.py +++ b/backend/app/domains/rendering/schemas.py @@ -188,3 +188,28 @@ class WorkflowRunOut(BaseModel): created_at: datetime node_results: list[WorkflowNodeResultOut] = [] model_config = {"from_attributes": True} + + +class WorkflowComparisonArtifactOut(BaseModel): + path: str | None + storage_key: str | None + exists: bool + file_size_bytes: int | None + sha256: str | None + mime_type: str | None + image_width: int | None + image_height: int | None + + +class WorkflowRunComparisonOut(BaseModel): + workflow_run_id: uuid.UUID + workflow_def_id: uuid.UUID | None + order_line_id: uuid.UUID | None + execution_mode: str + status: str + summary: str + authoritative_output: WorkflowComparisonArtifactOut + observer_output: WorkflowComparisonArtifactOut + exact_match: bool | None + dimensions_match: bool | None + mean_pixel_delta: float | None diff --git a/backend/app/domains/rendering/workflow_comparison_service.py b/backend/app/domains/rendering/workflow_comparison_service.py new file mode 100644 index 0000000..8b46995 --- /dev/null +++ b/backend/app/domains/rendering/workflow_comparison_service.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import hashlib +import mimetypes +import uuid +from dataclasses import dataclass +from pathlib import Path + +from PIL import Image, ImageChops, ImageStat +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.domains.media.models import MediaAsset +from app.domains.orders.models import OrderLine +from app.domains.rendering.models import WorkflowRun +from app.domains.rendering.schemas import WorkflowComparisonArtifactOut, WorkflowRunComparisonOut + + +@dataclass(slots=True) +class _ArtifactComparison: + path: str | None + storage_key: str | None + exists: bool + file_size_bytes: int | None + sha256: str | None + mime_type: str | None + image_width: int | None + image_height: int | None + + def to_schema(self) -> WorkflowComparisonArtifactOut: + return WorkflowComparisonArtifactOut( + path=self.path, + storage_key=self.storage_key, + exists=self.exists, + file_size_bytes=self.file_size_bytes, + sha256=self.sha256, + mime_type=self.mime_type, + image_width=self.image_width, + image_height=self.image_height, + ) + + +def _normalize_storage_key(path: str | None) -> str | None: + if not path: + return None + normalized = path.replace("\\", "/") + marker = "/uploads/" + if marker in normalized: + return normalized.split(marker, 1)[1] + return normalized.lstrip("/") + + +def _build_artifact(path: str | None) -> _ArtifactComparison: + if not path: + return _ArtifactComparison( + path=None, + storage_key=None, + exists=False, + file_size_bytes=None, + sha256=None, + mime_type=None, + image_width=None, + image_height=None, + ) + + file_path = Path(path) + exists = file_path.exists() + mime_type, _ = mimetypes.guess_type(str(file_path)) + + sha256 = None + file_size_bytes = None + image_width = None + image_height = None + if exists and file_path.is_file(): + file_size_bytes = file_path.stat().st_size + digest = hashlib.sha256() + with file_path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + sha256 = digest.hexdigest() + try: + with Image.open(file_path) as image: + image_width, image_height = image.size + except Exception: + image_width = None + image_height = None + + return _ArtifactComparison( + path=str(file_path), + storage_key=_normalize_storage_key(str(file_path)), + exists=exists, + file_size_bytes=file_size_bytes, + sha256=sha256, + mime_type=mime_type, + image_width=image_width, + image_height=image_height, + ) + + +def _compute_mean_pixel_delta( + authoritative_path: str | None, + observer_path: str | None, +) -> float | None: + if not authoritative_path or not observer_path: + return None + + authoritative_file = Path(authoritative_path) + observer_file = Path(observer_path) + if not authoritative_file.exists() or not observer_file.exists(): + return None + + try: + with Image.open(authoritative_file) as authoritative_image, Image.open(observer_file) as observer_image: + authoritative_rgba = authoritative_image.convert("RGBA") + observer_rgba = observer_image.convert("RGBA") + if authoritative_rgba.size != observer_rgba.size: + return None + diff = ImageChops.difference(authoritative_rgba, observer_rgba) + mean_channels = ImageStat.Stat(diff).mean + return sum(mean_channels) / (len(mean_channels) * 255.0) + except Exception: + return None + + +async def _load_shadow_asset_by_workflow_run( + db: AsyncSession, + workflow_run_id: uuid.UUID, +) -> str | None: + asset_result = await db.execute( + select(MediaAsset) + .where(MediaAsset.workflow_run_id == workflow_run_id) + .order_by(MediaAsset.created_at.desc()) + .limit(1) + ) + asset = asset_result.scalar_one_or_none() + if asset is None: + return None + + storage_key = asset.storage_key.lstrip("/") + if storage_key.startswith("app/uploads/"): + return f"/{storage_key}" + return f"/app/uploads/{storage_key}" + + +def _find_shadow_file(order_line: OrderLine, workflow_run: WorkflowRun) -> str | None: + shadow_suffix = f"shadow-{str(workflow_run.id)[:8]}" + candidate_roots: list[Path] = [] + + if order_line.result_path: + candidate_roots.append(Path(order_line.result_path).parent) + + candidate_roots.append(Path("/app/uploads/renders") / str(order_line.id)) + + seen_roots: set[Path] = set() + candidates: list[Path] = [] + for root in candidate_roots: + if root in seen_roots: + continue + seen_roots.add(root) + if not root.exists(): + continue + matches = [path for path in root.iterdir() if path.is_file() and shadow_suffix in path.name] + candidates.extend(matches) + + if not candidates: + return None + + candidates.sort(key=lambda path: (path.stat().st_mtime, path.name), reverse=True) + return str(candidates[0]) + + +async def build_workflow_run_comparison( + db: AsyncSession, + workflow_run_id: uuid.UUID, +) -> WorkflowRunComparisonOut | None: + run_result = await db.execute(select(WorkflowRun).where(WorkflowRun.id == workflow_run_id)) + workflow_run = run_result.scalar_one_or_none() + if workflow_run is None: + return None + + order_line = None + if workflow_run.order_line_id is not None: + order_line = await db.get(OrderLine, workflow_run.order_line_id) + + authoritative_path = order_line.result_path if order_line is not None else None + + observer_path = await _load_shadow_asset_by_workflow_run(db, workflow_run.id) + if observer_path is None and order_line is not None: + observer_path = _find_shadow_file(order_line, workflow_run) + + authoritative_output = _build_artifact(authoritative_path) + observer_output = _build_artifact(observer_path) + + if not authoritative_output.exists: + status = "missing_authoritative" + summary = "Authoritative legacy output is missing." + exact_match = None + dimensions_match = None + mean_pixel_delta = None + elif not observer_output.exists: + status = "missing_observer" + summary = "Observer workflow output is missing." + exact_match = None + dimensions_match = None + mean_pixel_delta = None + else: + exact_match = authoritative_output.sha256 == observer_output.sha256 + dimensions_match = ( + authoritative_output.image_width == observer_output.image_width + and authoritative_output.image_height == observer_output.image_height + and authoritative_output.image_width is not None + and observer_output.image_width is not None + ) + mean_pixel_delta = _compute_mean_pixel_delta(authoritative_output.path, observer_output.path) + if exact_match: + status = "matched" + summary = "Observer output matches the authoritative legacy output byte-for-byte." + else: + status = "different" + if dimensions_match is False: + summary = "Observer output differs from the authoritative output and the image dimensions changed." + elif mean_pixel_delta is not None: + summary = "Observer output differs from the authoritative output." + else: + summary = "Observer output differs from the authoritative output and could not be pixel-compared." + + return WorkflowRunComparisonOut( + workflow_run_id=workflow_run.id, + workflow_def_id=workflow_run.workflow_def_id, + order_line_id=workflow_run.order_line_id, + execution_mode=workflow_run.execution_mode, + status=status, + summary=summary, + authoritative_output=authoritative_output.to_schema(), + observer_output=observer_output.to_schema(), + exact_match=exact_match, + dimensions_match=dimensions_match, + mean_pixel_delta=mean_pixel_delta, + ) diff --git a/backend/app/domains/rendering/workflow_router.py b/backend/app/domains/rendering/workflow_router.py index e660bba..778b167 100644 --- a/backend/app/domains/rendering/workflow_router.py +++ b/backend/app/domains/rendering/workflow_router.py @@ -15,8 +15,10 @@ from app.domains.rendering.schemas import ( WorkflowDefinitionCreate, WorkflowDefinitionUpdate, WorkflowDefinitionOut, + WorkflowRunComparisonOut, WorkflowRunOut, ) +from app.domains.rendering.workflow_comparison_service import build_workflow_run_comparison from app.domains.rendering.workflow_config_utils import canonicalize_workflow_config from app.domains.rendering.workflow_node_registry import ( StepCategory, @@ -199,6 +201,18 @@ async def list_workflow_runs( return result.scalars().all() +@router.get("/runs/{run_id}/comparison", response_model=WorkflowRunComparisonOut) +async def get_workflow_run_comparison( + run_id: uuid.UUID, + _user: User = Depends(require_admin_or_pm), + db: AsyncSession = Depends(get_db), +): + comparison = await build_workflow_run_comparison(db, run_id) + if comparison is None: + raise HTTPException(status_code=404, detail="Workflow run not found") + return comparison + + class WorkflowDispatchResponse(BaseModel): workflow_run: WorkflowRunOut context_id: str diff --git a/backend/tests/domains/test_workflow_dispatch_service.py b/backend/tests/domains/test_workflow_dispatch_service.py index 4aa1cb3..06e56cb 100644 --- a/backend/tests/domains/test_workflow_dispatch_service.py +++ b/backend/tests/domains/test_workflow_dispatch_service.py @@ -4,6 +4,7 @@ import uuid from pathlib import Path import pytest +from PIL import Image from sqlalchemy import select from sqlalchemy.orm import selectinload @@ -545,3 +546,90 @@ async def test_workflow_dispatch_endpoint_returns_workflow_run_with_node_results assert node_results["template"]["status"] == "completed" assert node_results["template"]["output"]["use_materials"] is False assert node_results["output"]["status"] == "skipped" + + +@pytest.mark.asyncio +async def test_workflow_run_comparison_endpoint_reports_identical_shadow_output( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_run = WorkflowRun( + order_line_id=order_line.id, + execution_mode="shadow", + status="completed", + ) + db.add(workflow_run) + await db.flush() + + render_dir = tmp_path / "comparison" / str(order_line.id) + render_dir.mkdir(parents=True, exist_ok=True) + authoritative_path = render_dir / "authoritative.png" + shadow_path = render_dir / f"authoritative_shadow-{str(workflow_run.id)[:8]}.png" + + Image.new("RGBA", (8, 8), (0, 128, 255, 255)).save(authoritative_path) + Image.new("RGBA", (8, 8), (0, 128, 255, 255)).save(shadow_path) + + order_line.result_path = str(authoritative_path) + order_line.render_status = "completed" + await db.commit() + + response = await client.get( + f"/api/workflows/runs/{workflow_run.id}/comparison", + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + assert body["workflow_run_id"] == str(workflow_run.id) + assert body["execution_mode"] == "shadow" + assert body["status"] == "matched" + assert body["exact_match"] is True + assert body["dimensions_match"] is True + assert body["mean_pixel_delta"] == 0.0 + assert body["authoritative_output"]["path"] == str(authoritative_path) + assert body["observer_output"]["path"] == str(shadow_path) + assert body["authoritative_output"]["image_width"] == 8 + assert body["observer_output"]["image_height"] == 8 + + +@pytest.mark.asyncio +async def test_workflow_run_comparison_endpoint_reports_missing_shadow_output( + client, + db, + admin_user, + auth_headers, + tmp_path, +): + order_line = await _seed_renderable_order_line(db, admin_user, tmp_path) + workflow_run = WorkflowRun( + order_line_id=order_line.id, + execution_mode="shadow", + status="completed", + ) + db.add(workflow_run) + await db.flush() + + render_dir = tmp_path / "comparison-missing" / str(order_line.id) + render_dir.mkdir(parents=True, exist_ok=True) + authoritative_path = render_dir / "authoritative.png" + Image.new("RGBA", (4, 4), (255, 64, 64, 255)).save(authoritative_path) + + order_line.result_path = str(authoritative_path) + order_line.render_status = "completed" + await db.commit() + + response = await client.get( + f"/api/workflows/runs/{workflow_run.id}/comparison", + headers=auth_headers, + ) + + assert response.status_code == 200 + body = response.json() + assert body["status"] == "missing_observer" + assert body["exact_match"] is None + assert body["observer_output"]["exists"] is False + assert body["authoritative_output"]["exists"] is True diff --git a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md index a6693df..e08eb2d 100644 --- a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md +++ b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md @@ -43,6 +43,7 @@ ### Phase 6 - [x] Shadow mode parity execution dispatches real graph observer runs alongside authoritative legacy dispatch +- Progress: Workflow runs now expose a comparison endpoint that resolves authoritative legacy outputs and matching shadow artifacts, including file hashes, image dimensions, and mean pixel delta for parity inspection. - [ ] Golden cases pass against legacy outputs - [ ] Rollout can be enabled per workflow or output type - [ ] Rollback to legacy is immediate diff --git a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md index c9f3b94..f108ed8 100644 --- a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md +++ b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md @@ -93,7 +93,7 @@ ### Tickets - `E6-T1` Add shadow mode parity execution. `completed` -- `E6-T2` Build output comparison tooling. +- `E6-T2` Build output comparison tooling. `completed` - `E6-T3` Define golden test cases. - `E6-T4` Roll out per workflow or output type. - `E6-T5` Keep legacy fallback after rollout.