Files
HartOMat/backend/app/domains/rendering/job_document.py
T
Hartmut ea31ed657c feat(refactor/phase1): foundation infrastructure for modular pipeline
Phase 1 of PLAN_REFACTOR.md — all four sub-tasks implemented:

1.1 PipelineLogger (backend/app/core/pipeline_logger.py)
  - Structured step_start/step_done/step_error/step_progress API
  - Publishes to Python logging AND Redis SSE via log_task_event
  - Context manager `pl.step("name")` for auto-timing

1.2 RenderJobDocument (backend/app/domains/rendering/job_document.py)
  - Pydantic JSONB schema: state machine + per-step records + timing
  - begin_step/finish_step/fail_step/skip_step helpers
  - Migration 048: adds render_job_doc JSONB column to order_lines
  - OrderLine model updated with render_job_doc field

1.3 TenantContextMiddleware (backend/app/core/middleware.py)
  - Decodes JWT, stores tenant_id + role in request.state
  - get_db updated to auto-apply RLS SET LOCAL from request.state
  - Registered in main.py (runs before every request)
  - JWT now embeds tenant_id claim via create_access_token()
  - Login endpoint passes tenant_id to token creation

1.4 ProcessStep Registry (backend/app/core/process_steps.py)
  - StepName StrEnum with all 20 pipeline step names
  - Single source of truth for log prefixes, DB records, UI labels

Also adds db_utils.py with set_tenant_sync() + get_sync_session()
for use inside Celery tasks (bypass-safe RLS helper).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 19:25:08 +01:00

162 lines
5.8 KiB
Python

"""RenderJobDocument — structured JSONB job ticket stored in order_lines.render_job_doc.
Acts as the single source of truth for a render job's state machine.
Stored as JSONB in order_lines.render_job_doc; keep order_lines.render_log
for backward compat (deprecated, removed in Phase 3).
Usage::
from app.domains.rendering.job_document import RenderJobDocument, JobState, StepRecord
doc = RenderJobDocument.new(order_line_id=str(line.id), celery_task_id=self.request.id)
doc.begin_step("occ_glb_export")
...
doc.finish_step("occ_glb_export", output={"glb_path": str(glb), "size_bytes": sz})
doc.set_state(JobState.COMPLETED, result={"output_path": str(out)})
# Persist to DB (inside Celery sync task):
line.render_job_doc = doc.to_dict()
db.commit()
"""
import time
from datetime import datetime, timezone
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, Field
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
# ── State machine ─────────────────────────────────────────────────────────────
class JobState(StrEnum):
PENDING = "pending"
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class StepStatus(StrEnum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
SKIPPED = "skipped"
# ── Data models ───────────────────────────────────────────────────────────────
class StepRecord(BaseModel):
name: str
status: StepStatus = StepStatus.PENDING
started_at: str | None = None
completed_at: str | None = None
duration_s: float | None = None
output: dict[str, Any] | None = None
error: str | None = None
class RenderJobDocument(BaseModel):
version: int = 1
job_id: str # == order_line_id
created_at: str = Field(default_factory=_now_iso)
updated_at: str = Field(default_factory=_now_iso)
state: JobState = JobState.PENDING
celery_task_id: str | None = None
steps: list[StepRecord] = Field(default_factory=list)
error: str | None = None
result: dict[str, Any] | None = None
# ── Factory ──────────────────────────────────────────────────────
@classmethod
def new(cls, order_line_id: str, celery_task_id: str | None = None) -> "RenderJobDocument":
return cls(job_id=order_line_id, celery_task_id=celery_task_id)
@classmethod
def from_dict(cls, d: dict | None) -> "RenderJobDocument | None":
if not d:
return None
try:
return cls.model_validate(d)
except Exception:
return None
# ── Mutation helpers ─────────────────────────────────────────────
def set_state(self, state: JobState, result: dict[str, Any] | None = None, error: str | None = None) -> None:
self.state = state
self.updated_at = _now_iso()
if result is not None:
self.result = result
if error is not None:
self.error = error
def begin_step(self, step_name: str) -> StepRecord:
"""Mark a step as running. Creates it if not present."""
rec = self._get_or_create_step(step_name)
rec.status = StepStatus.RUNNING
rec.started_at = _now_iso()
self.updated_at = _now_iso()
if self.state == JobState.PENDING or self.state == JobState.QUEUED:
self.state = JobState.RUNNING
return rec
def finish_step(
self,
step_name: str,
output: dict[str, Any] | None = None,
duration_s: float | None = None,
) -> StepRecord:
rec = self._get_or_create_step(step_name)
rec.status = StepStatus.DONE
rec.completed_at = _now_iso()
if duration_s is not None:
rec.duration_s = round(duration_s, 2)
elif rec.started_at:
try:
start = datetime.fromisoformat(rec.started_at)
rec.duration_s = round((datetime.now(timezone.utc) - start).total_seconds(), 2)
except Exception:
pass
if output is not None:
rec.output = output
self.updated_at = _now_iso()
return rec
def fail_step(self, step_name: str, error: str) -> StepRecord:
rec = self._get_or_create_step(step_name)
rec.status = StepStatus.FAILED
rec.completed_at = _now_iso()
rec.error = error
self.updated_at = _now_iso()
return rec
def skip_step(self, step_name: str, reason: str | None = None) -> StepRecord:
rec = self._get_or_create_step(step_name)
rec.status = StepStatus.SKIPPED
if reason:
rec.output = {"reason": reason}
self.updated_at = _now_iso()
return rec
# ── Serialisation ────────────────────────────────────────────────
def to_dict(self) -> dict:
return self.model_dump(mode="json")
# ── Internal ─────────────────────────────────────────────────────
def _get_or_create_step(self, step_name: str) -> StepRecord:
for rec in self.steps:
if rec.name == step_name:
return rec
rec = StepRecord(name=step_name)
self.steps.append(rec)
return rec