"""RenderJobDocument — structured JSONB job ticket stored in order_lines.render_job_doc. Acts as the single source of truth for a render job's state machine. Stored as JSONB in order_lines.render_job_doc; keep order_lines.render_log for backward compat (deprecated, removed in Phase 3). Usage:: from app.domains.rendering.job_document import RenderJobDocument, JobState, StepRecord doc = RenderJobDocument.new(order_line_id=str(line.id), celery_task_id=self.request.id) doc.begin_step("occ_glb_export") ... doc.finish_step("occ_glb_export", output={"glb_path": str(glb), "size_bytes": sz}) doc.set_state(JobState.COMPLETED, result={"output_path": str(out)}) # Persist to DB (inside Celery sync task): line.render_job_doc = doc.to_dict() db.commit() """ import time from datetime import datetime, timezone from enum import StrEnum from typing import Any from pydantic import BaseModel, Field def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() # ── State machine ───────────────────────────────────────────────────────────── class JobState(StrEnum): PENDING = "pending" QUEUED = "queued" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class StepStatus(StrEnum): PENDING = "pending" RUNNING = "running" DONE = "done" FAILED = "failed" SKIPPED = "skipped" # ── Data models ─────────────────────────────────────────────────────────────── class StepRecord(BaseModel): name: str status: StepStatus = StepStatus.PENDING started_at: str | None = None completed_at: str | None = None duration_s: float | None = None output: dict[str, Any] | None = None error: str | None = None class RenderJobDocument(BaseModel): version: int = 1 job_id: str # == order_line_id created_at: str = Field(default_factory=_now_iso) updated_at: str = Field(default_factory=_now_iso) state: JobState = JobState.PENDING celery_task_id: str | None = None steps: list[StepRecord] = Field(default_factory=list) error: str | None = None result: dict[str, Any] | None = None # ── Factory ────────────────────────────────────────────────────── @classmethod def new(cls, order_line_id: str, celery_task_id: str | None = None) -> "RenderJobDocument": return cls(job_id=order_line_id, celery_task_id=celery_task_id) @classmethod def from_dict(cls, d: dict | None) -> "RenderJobDocument | None": if not d: return None try: return cls.model_validate(d) except Exception: return None # ── Mutation helpers ───────────────────────────────────────────── def set_state(self, state: JobState, result: dict[str, Any] | None = None, error: str | None = None) -> None: self.state = state self.updated_at = _now_iso() if result is not None: self.result = result if error is not None: self.error = error def begin_step(self, step_name: str) -> StepRecord: """Mark a step as running. Creates it if not present.""" rec = self._get_or_create_step(step_name) rec.status = StepStatus.RUNNING rec.started_at = _now_iso() self.updated_at = _now_iso() if self.state == JobState.PENDING or self.state == JobState.QUEUED: self.state = JobState.RUNNING return rec def finish_step( self, step_name: str, output: dict[str, Any] | None = None, duration_s: float | None = None, ) -> StepRecord: rec = self._get_or_create_step(step_name) rec.status = StepStatus.DONE rec.completed_at = _now_iso() if duration_s is not None: rec.duration_s = round(duration_s, 2) elif rec.started_at: try: start = datetime.fromisoformat(rec.started_at) rec.duration_s = round((datetime.now(timezone.utc) - start).total_seconds(), 2) except Exception: pass if output is not None: rec.output = output self.updated_at = _now_iso() return rec def fail_step(self, step_name: str, error: str) -> StepRecord: rec = self._get_or_create_step(step_name) rec.status = StepStatus.FAILED rec.completed_at = _now_iso() rec.error = error self.updated_at = _now_iso() return rec def skip_step(self, step_name: str, reason: str | None = None) -> StepRecord: rec = self._get_or_create_step(step_name) rec.status = StepStatus.SKIPPED if reason: rec.output = {"reason": reason} self.updated_at = _now_iso() return rec # ── Serialisation ──────────────────────────────────────────────── def to_dict(self) -> dict: return self.model_dump(mode="json") # ── Internal ───────────────────────────────────────────────────── def _get_or_create_step(self, step_name: str) -> StepRecord: for rec in self.steps: if rec.name == step_name: return rec rec = StepRecord(name=step_name) self.steps.append(rec) return rec