from __future__ import annotations import hashlib import mimetypes import uuid from dataclasses import dataclass from pathlib import Path from PIL import Image, ImageChops, ImageStat from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.core.render_paths import resolve_result_path, result_path_to_storage_key from app.domains.media.models import MediaAsset from app.domains.orders.models import OrderLine from app.domains.rendering.models import WorkflowRun from app.domains.rendering.schemas import WorkflowComparisonArtifactOut, WorkflowRunComparisonOut ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA = 1e-6 ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA = 0.02 @dataclass(slots=True) class _ArtifactComparison: path: str | None storage_key: str | None exists: bool file_size_bytes: int | None sha256: str | None mime_type: str | None image_width: int | None image_height: int | None def to_schema(self) -> WorkflowComparisonArtifactOut: return WorkflowComparisonArtifactOut( path=self.path, storage_key=self.storage_key, exists=self.exists, file_size_bytes=self.file_size_bytes, sha256=self.sha256, mime_type=self.mime_type, image_width=self.image_width, image_height=self.image_height, ) def evaluate_rollout_gate( *, authoritative_output: _ArtifactComparison, observer_output: _ArtifactComparison, exact_match: bool | None, dimensions_match: bool | None, mean_pixel_delta: float | None, ) -> dict[str, object]: thresholds = { "pass_max_mean_pixel_delta": ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA, "warn_max_mean_pixel_delta": ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA, } reasons: list[str] = [] if not authoritative_output.exists: verdict = "fail" reasons.append("Authoritative legacy output is missing; keep legacy fallback active.") elif not observer_output.exists: verdict = "fail" reasons.append("Observer workflow output is missing; rollout cannot be approved.") elif exact_match: verdict = "pass" reasons.append("Observer output matches the authoritative legacy output byte-for-byte.") elif dimensions_match is False: verdict = "fail" reasons.append("Observer output dimensions differ from the authoritative legacy output.") elif mean_pixel_delta is None: verdict = "fail" reasons.append("Observer output could not be pixel-compared against the authoritative output.") elif mean_pixel_delta <= ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA: verdict = "pass" reasons.append("Observer output is visually identical within the pass threshold.") elif mean_pixel_delta <= ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA: verdict = "warn" reasons.append( "Observer output differs slightly from the authoritative output but remains within the warn threshold." ) else: verdict = "fail" reasons.append( "Observer output exceeds the allowed parity threshold; keep legacy fallback active." ) if mean_pixel_delta is not None and not exact_match: reasons.append( f"Mean pixel delta {mean_pixel_delta:.6f}; " f"pass<={ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA:.6f}, " f"warn<={ROLLOUT_WARN_MAX_MEAN_PIXEL_DELTA:.6f}." ) rollout_ready = verdict == "pass" rollout_status = "ready_for_rollout" if rollout_ready else "hold_legacy_authoritative" return { "verdict": verdict, "ready": rollout_ready, "status": rollout_status, "reasons": reasons, "thresholds": thresholds, "workflow_rollout_ready": rollout_ready, "workflow_rollout_status": rollout_status, "output_type_rollout_ready": rollout_ready, "output_type_rollout_status": rollout_status, } def _normalize_storage_key(path: str | None) -> str | None: return result_path_to_storage_key(path) def _build_artifact(path: str | None) -> _ArtifactComparison: if not path: return _ArtifactComparison( path=None, storage_key=None, exists=False, file_size_bytes=None, sha256=None, mime_type=None, image_width=None, image_height=None, ) resolved_path = resolve_result_path(path) file_path = resolved_path or Path(path) exists = file_path.exists() mime_type, _ = mimetypes.guess_type(str(file_path)) sha256 = None file_size_bytes = None image_width = None image_height = None if exists and file_path.is_file(): file_size_bytes = file_path.stat().st_size digest = hashlib.sha256() with file_path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) sha256 = digest.hexdigest() try: with Image.open(file_path) as image: image_width, image_height = image.size except Exception: image_width = None image_height = None return _ArtifactComparison( path=str(file_path), storage_key=_normalize_storage_key(str(file_path)), exists=exists, file_size_bytes=file_size_bytes, sha256=sha256, mime_type=mime_type, image_width=image_width, image_height=image_height, ) def _compute_mean_pixel_delta( authoritative_path: str | None, observer_path: str | None, ) -> float | None: if not authoritative_path or not observer_path: return None authoritative_file = Path(authoritative_path) observer_file = Path(observer_path) if not authoritative_file.exists() or not observer_file.exists(): return None try: with Image.open(authoritative_file) as authoritative_image, Image.open(observer_file) as observer_image: authoritative_rgba = authoritative_image.convert("RGBA") observer_rgba = observer_image.convert("RGBA") if authoritative_rgba.size != observer_rgba.size: return None diff = ImageChops.difference(authoritative_rgba, observer_rgba) mean_channels = ImageStat.Stat(diff).mean return sum(mean_channels) / (len(mean_channels) * 255.0) except Exception: return None async def _load_shadow_asset_by_workflow_run( db: AsyncSession, workflow_run_id: uuid.UUID, ) -> str | None: asset_result = await db.execute( select(MediaAsset) .where(MediaAsset.workflow_run_id == workflow_run_id) .order_by(MediaAsset.created_at.desc()) .limit(1) ) asset = asset_result.scalar_one_or_none() if asset is None: return None resolved = resolve_result_path(asset.storage_key) return str(resolved) if resolved is not None else None def _find_shadow_file(order_line: OrderLine, workflow_run: WorkflowRun) -> str | None: shadow_suffix = f"shadow-{str(workflow_run.id)[:8]}" candidate_roots: list[Path] = [] if order_line.result_path: resolved_result = resolve_result_path(order_line.result_path) if resolved_result is not None: candidate_roots.append(resolved_result.parent) upload_root = Path(settings.upload_dir) candidate_roots.append(upload_root / "renders" / str(order_line.id)) candidate_roots.append(upload_root / "step_files" / "renders" / str(order_line.id)) candidate_roots.append(upload_root / "step_files" / "renders") seen_roots: set[Path] = set() candidates: list[Path] = [] for root in candidate_roots: if root in seen_roots: continue seen_roots.add(root) if not root.exists(): continue matches = [path for path in root.iterdir() if path.is_file() and shadow_suffix in path.name] candidates.extend(matches) if not candidates: return None candidates.sort(key=lambda path: (path.stat().st_mtime, path.name), reverse=True) return str(candidates[0]) async def build_workflow_run_comparison( db: AsyncSession, workflow_run_id: uuid.UUID, ) -> WorkflowRunComparisonOut | None: run_result = await db.execute(select(WorkflowRun).where(WorkflowRun.id == workflow_run_id)) workflow_run = run_result.scalar_one_or_none() if workflow_run is None: return None order_line = None if workflow_run.order_line_id is not None: order_line = await db.get(OrderLine, workflow_run.order_line_id) authoritative_path = order_line.result_path if order_line is not None else None observer_path = await _load_shadow_asset_by_workflow_run(db, workflow_run.id) if observer_path is None and order_line is not None: observer_path = _find_shadow_file(order_line, workflow_run) authoritative_output = _build_artifact(authoritative_path) observer_output = _build_artifact(observer_path) rollout_gate = evaluate_rollout_gate( authoritative_output=authoritative_output, observer_output=observer_output, exact_match=None, dimensions_match=None, mean_pixel_delta=None, ) if not authoritative_output.exists: status = "missing_authoritative" summary = "Authoritative legacy output is missing." exact_match = None dimensions_match = None mean_pixel_delta = None elif not observer_output.exists: status = "missing_observer" summary = "Observer workflow output is missing." exact_match = None dimensions_match = None mean_pixel_delta = None else: exact_match = authoritative_output.sha256 == observer_output.sha256 dimensions_match = ( authoritative_output.image_width == observer_output.image_width and authoritative_output.image_height == observer_output.image_height and authoritative_output.image_width is not None and observer_output.image_width is not None ) mean_pixel_delta = _compute_mean_pixel_delta(authoritative_output.path, observer_output.path) if exact_match: status = "matched" summary = "Observer output matches the authoritative legacy output byte-for-byte." elif mean_pixel_delta is not None and mean_pixel_delta <= ROLLOUT_PASS_MAX_MEAN_PIXEL_DELTA and dimensions_match: status = "matched" summary = "Observer output matches the authoritative legacy output within the visual pass threshold." else: status = "different" if dimensions_match is False: summary = "Observer output differs from the authoritative output and the image dimensions changed." elif mean_pixel_delta is not None: summary = "Observer output differs from the authoritative output." else: summary = "Observer output differs from the authoritative output and could not be pixel-compared." rollout_gate = evaluate_rollout_gate( authoritative_output=authoritative_output, observer_output=observer_output, exact_match=exact_match, dimensions_match=dimensions_match, mean_pixel_delta=mean_pixel_delta, ) return WorkflowRunComparisonOut( workflow_run_id=workflow_run.id, workflow_def_id=workflow_run.workflow_def_id, order_line_id=workflow_run.order_line_id, execution_mode=workflow_run.execution_mode, status=status, summary=summary, rollout_gate_verdict=str(rollout_gate["verdict"]), workflow_rollout_ready=bool(rollout_gate["workflow_rollout_ready"]), workflow_rollout_status=str(rollout_gate["workflow_rollout_status"]), rollout_reasons=[str(reason) for reason in rollout_gate["reasons"]], rollout_thresholds={ str(key): float(value) for key, value in dict(rollout_gate["thresholds"]).items() }, authoritative_output=authoritative_output.to_schema(), observer_output=observer_output.to_schema(), exact_match=exact_match, dimensions_match=dimensions_match, mean_pixel_delta=mean_pixel_delta, )