Files
HartOMat/backend/app/core/pipeline_logger.py
T
Hartmut ea31ed657c feat(refactor/phase1): foundation infrastructure for modular pipeline
Phase 1 of PLAN_REFACTOR.md — all four sub-tasks implemented:

1.1 PipelineLogger (backend/app/core/pipeline_logger.py)
  - Structured step_start/step_done/step_error/step_progress API
  - Publishes to Python logging AND Redis SSE via log_task_event
  - Context manager `pl.step("name")` for auto-timing

1.2 RenderJobDocument (backend/app/domains/rendering/job_document.py)
  - Pydantic JSONB schema: state machine + per-step records + timing
  - begin_step/finish_step/fail_step/skip_step helpers
  - Migration 048: adds render_job_doc JSONB column to order_lines
  - OrderLine model updated with render_job_doc field

1.3 TenantContextMiddleware (backend/app/core/middleware.py)
  - Decodes JWT, stores tenant_id + role in request.state
  - get_db updated to auto-apply RLS SET LOCAL from request.state
  - Registered in main.py (runs before every request)
  - JWT now embeds tenant_id claim via create_access_token()
  - Login endpoint passes tenant_id to token creation

1.4 ProcessStep Registry (backend/app/core/process_steps.py)
  - StepName StrEnum with all 20 pipeline step names
  - Single source of truth for log prefixes, DB records, UI labels

Also adds db_utils.py with set_tenant_sync() + get_sync_session()
for use inside Celery tasks (bypass-safe RLS helper).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 19:25:08 +01:00

107 lines
3.7 KiB
Python

"""Structured pipeline logger.
Wraps Python logging + Redis SSE streaming for consistent, prefixed log output
from all Celery pipeline tasks. Every method:
- emits a Python `logging` line with a [STEP_NAME] prefix
- publishes to Redis via log_task_event for SSE streaming in the UI
"""
import logging
import time
from typing import Any
from app.core.task_logs import log_task_event
_log = logging.getLogger(__name__)
class PipelineLogger:
"""Structured logger for a single pipeline execution context.
Usage in a Celery task::
pl = PipelineLogger(task_id=self.request.id, order_line_id=str(line.id))
pl.step_start("occ_glb_export", {"cad_file_id": cad_file_id})
...
pl.step_done("occ_glb_export", duration_s=8.4, result={"size_bytes": 204800})
"""
def __init__(self, task_id: str | None, order_line_id: str | None = None):
self.task_id = task_id or "unknown"
self.order_line_id = order_line_id
self._step_starts: dict[str, float] = {}
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def step_start(self, step: str, context: dict[str, Any] | None = None) -> None:
self._step_starts[step] = time.time()
msg = f"[{step}] start"
if context:
msg += f" | {context}"
_log.info(msg)
log_task_event(self.task_id, msg, level="info")
def step_progress(self, step: str, pct: int, msg: str) -> None:
full = f"[{step}] {pct}% — {msg}"
_log.info(full)
log_task_event(self.task_id, full, level="info")
def step_done(self, step: str, duration_s: float | None = None, result: dict[str, Any] | None = None) -> None:
if duration_s is None:
start = self._step_starts.get(step)
duration_s = round(time.time() - start, 2) if start else None
parts = [f"[{step}] done"]
if duration_s is not None:
parts.append(f"{duration_s:.1f}s")
if result:
parts.append(str(result))
msg = " | ".join(parts)
_log.info(msg)
log_task_event(self.task_id, msg, level="info")
def step_error(self, step: str, error: str, exc: Exception | None = None) -> None:
msg = f"[{step}] ERROR — {error}"
if exc:
_log.exception(msg)
else:
_log.error(msg)
log_task_event(self.task_id, msg, level="error")
def info(self, step: str, msg: str) -> None:
full = f"[{step}] {msg}"
_log.info(full)
log_task_event(self.task_id, full, level="info")
def warning(self, step: str, msg: str) -> None:
full = f"[{step}] WARNING — {msg}"
_log.warning(full)
log_task_event(self.task_id, full, level="warning")
# ------------------------------------------------------------------
# Context manager for a single step
# ------------------------------------------------------------------
def step(self, step_name: str, context: dict[str, Any] | None = None) -> "_StepContext":
return _StepContext(self, step_name, context)
class _StepContext:
"""Context manager that auto-calls step_start / step_done / step_error."""
def __init__(self, pl: PipelineLogger, step_name: str, context: dict | None):
self._pl = pl
self._name = step_name
self._context = context
def __enter__(self) -> "_StepContext":
self._pl.step_start(self._name, self._context)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self._pl.step_done(self._name)
else:
self._pl.step_error(self._name, str(exc_val), exc_val)
return False # do not suppress exceptions