"""Structured pipeline logger. Wraps Python logging + Redis SSE streaming for consistent, prefixed log output from all Celery pipeline tasks. Every method: - emits a Python `logging` line with a [STEP_NAME] prefix - publishes to Redis via log_task_event for SSE streaming in the UI """ import functools import logging import resource import time from typing import Any from app.core.task_logs import log_task_event _log = logging.getLogger(__name__) class PipelineLogger: """Structured logger for a single pipeline execution context. Usage in a Celery task:: pl = PipelineLogger(task_id=self.request.id, order_line_id=str(line.id)) pl.step_start("occ_glb_export", {"cad_file_id": cad_file_id}) ... pl.step_done("occ_glb_export", duration_s=8.4, result={"size_bytes": 204800}) """ def __init__(self, task_id: str | None, order_line_id: str | None = None): self.task_id = task_id or "unknown" self.order_line_id = order_line_id self._step_starts: dict[str, float] = {} # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def step_start(self, step: str, context: dict[str, Any] | None = None) -> None: self._step_starts[step] = time.time() msg = f"[{step}] start" if context: msg += f" | {context}" _log.info(msg) log_task_event(self.task_id, msg, level="info") def step_progress(self, step: str, pct: int, msg: str) -> None: full = f"[{step}] {pct}% — {msg}" _log.info(full) log_task_event(self.task_id, full, level="info") def step_done(self, step: str, duration_s: float | None = None, result: dict[str, Any] | None = None) -> None: if duration_s is None: start = self._step_starts.get(step) duration_s = round(time.time() - start, 2) if start else None parts = [f"[{step}] done"] if duration_s is not None: parts.append(f"{duration_s:.1f}s") if result: parts.append(str(result)) msg = " | ".join(parts) _log.info(msg) log_task_event(self.task_id, msg, level="info") def step_error(self, step: str, error: str, exc: Exception | None = None) -> None: msg = f"[{step}] ERROR — {error}" if exc: _log.exception(msg) else: _log.error(msg) log_task_event(self.task_id, msg, level="error") def info(self, step: str, msg: str) -> None: full = f"[{step}] {msg}" _log.info(full) log_task_event(self.task_id, full, level="info") def warning(self, step: str, msg: str) -> None: full = f"[{step}] WARNING — {msg}" _log.warning(full) log_task_event(self.task_id, full, level="warning") # ------------------------------------------------------------------ # Context manager for a single step # ------------------------------------------------------------------ def step(self, step_name: str, context: dict[str, Any] | None = None) -> "_StepContext": return _StepContext(self, step_name, context) class _StepContext: """Context manager that auto-calls step_start / step_done / step_error.""" def __init__(self, pl: PipelineLogger, step_name: str, context: dict | None): self._pl = pl self._name = step_name self._context = context def __enter__(self) -> "_StepContext": self._pl.step_start(self._name, self._context) return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is None: self._pl.step_done(self._name) else: self._pl.step_error(self._name, str(exc_val), exc_val) return False # do not suppress exceptions # --------------------------------------------------------------------------- # @timed_step decorator # --------------------------------------------------------------------------- def timed_step(step_name: str, pipeline_logger: PipelineLogger | None = None): """Decorator that auto-times a function and logs via PipelineLogger. Captures wall-clock duration and peak RSS delta. If a Redis connection is available, stores metrics to ``pipeline:metrics:{context_id}`` as a hash field ``{step_name}`` → JSON ``{duration_s, rss_delta_kb}``. Usage:: pl = PipelineLogger(task_id=self.request.id) @timed_step("extract_objects", pl) def do_extraction(step_path): ... Or without a logger (metrics still stored to Redis if context_id given):: @timed_step("extract_objects") def do_extraction(step_path): ... """ def decorator(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): pl = pipeline_logger rss_before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss t0 = time.monotonic() if pl: pl.step_start(step_name) try: result = fn(*args, **kwargs) duration = round(time.monotonic() - t0, 3) rss_after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss rss_delta_kb = rss_after - rss_before metrics = { "duration_s": duration, "rss_delta_kb": rss_delta_kb, } if pl: pl.step_done(step_name, duration_s=duration, result=metrics) else: _log.info(f"[{step_name}] done | {duration:.1f}s | rss_delta={rss_delta_kb}KB") _store_metrics(step_name, metrics, kwargs.get("context_id")) return result except Exception as exc: duration = round(time.monotonic() - t0, 3) if pl: pl.step_error(step_name, str(exc), exc) else: _log.exception(f"[{step_name}] ERROR — {exc}") raise return wrapper return decorator def _store_metrics(step_name: str, metrics: dict, context_id: str | None = None) -> None: """Store step metrics to Redis hash (best-effort, never raises).""" if not context_id: return try: import json from app.config import settings import redis r = redis.from_url(settings.redis_url) key = f"pipeline:metrics:{context_id}" r.hset(key, step_name, json.dumps(metrics)) r.expire(key, 86400) # 24h TTL except Exception: pass # metrics storage is non-critical