6c5873d51f
- @timed_step decorator with wall-clock + RSS tracking (pipeline_logger) - Blender timing laps for sharp edges and material assignment - MeshRegistry pattern: eliminate 13 scene.traverse() calls across viewers - Lazy material cloning (clone-on-first-write in both viewers) - _pipeline_session context manager: 7 create_engine() → 2 in render_thumbnail - KD-tree spatial pre-filter for sharp edge marking (bbox-based pruning) - Batch material library append: N bpy.ops.wm.append → single bpy.data.libraries.load - GMSH single-session batching: compound all solids into one tessellation call - Validate part-materials save endpoints against parsed_objects (prevents bogus keys) - ROADMAP updated with completion status Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
184 lines
6.5 KiB
Python
184 lines
6.5 KiB
Python
"""Structured pipeline logger.
|
|
|
|
Wraps Python logging + Redis SSE streaming for consistent, prefixed log output
|
|
from all Celery pipeline tasks. Every method:
|
|
- emits a Python `logging` line with a [STEP_NAME] prefix
|
|
- publishes to Redis via log_task_event for SSE streaming in the UI
|
|
"""
|
|
import functools
|
|
import logging
|
|
import resource
|
|
import time
|
|
from typing import Any
|
|
|
|
from app.core.task_logs import log_task_event
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class PipelineLogger:
|
|
"""Structured logger for a single pipeline execution context.
|
|
|
|
Usage in a Celery task::
|
|
|
|
pl = PipelineLogger(task_id=self.request.id, order_line_id=str(line.id))
|
|
pl.step_start("occ_glb_export", {"cad_file_id": cad_file_id})
|
|
...
|
|
pl.step_done("occ_glb_export", duration_s=8.4, result={"size_bytes": 204800})
|
|
"""
|
|
|
|
def __init__(self, task_id: str | None, order_line_id: str | None = None):
|
|
self.task_id = task_id or "unknown"
|
|
self.order_line_id = order_line_id
|
|
self._step_starts: dict[str, float] = {}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def step_start(self, step: str, context: dict[str, Any] | None = None) -> None:
|
|
self._step_starts[step] = time.time()
|
|
msg = f"[{step}] start"
|
|
if context:
|
|
msg += f" | {context}"
|
|
_log.info(msg)
|
|
log_task_event(self.task_id, msg, level="info")
|
|
|
|
def step_progress(self, step: str, pct: int, msg: str) -> None:
|
|
full = f"[{step}] {pct}% — {msg}"
|
|
_log.info(full)
|
|
log_task_event(self.task_id, full, level="info")
|
|
|
|
def step_done(self, step: str, duration_s: float | None = None, result: dict[str, Any] | None = None) -> None:
|
|
if duration_s is None:
|
|
start = self._step_starts.get(step)
|
|
duration_s = round(time.time() - start, 2) if start else None
|
|
parts = [f"[{step}] done"]
|
|
if duration_s is not None:
|
|
parts.append(f"{duration_s:.1f}s")
|
|
if result:
|
|
parts.append(str(result))
|
|
msg = " | ".join(parts)
|
|
_log.info(msg)
|
|
log_task_event(self.task_id, msg, level="info")
|
|
|
|
def step_error(self, step: str, error: str, exc: Exception | None = None) -> None:
|
|
msg = f"[{step}] ERROR — {error}"
|
|
if exc:
|
|
_log.exception(msg)
|
|
else:
|
|
_log.error(msg)
|
|
log_task_event(self.task_id, msg, level="error")
|
|
|
|
def info(self, step: str, msg: str) -> None:
|
|
full = f"[{step}] {msg}"
|
|
_log.info(full)
|
|
log_task_event(self.task_id, full, level="info")
|
|
|
|
def warning(self, step: str, msg: str) -> None:
|
|
full = f"[{step}] WARNING — {msg}"
|
|
_log.warning(full)
|
|
log_task_event(self.task_id, full, level="warning")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Context manager for a single step
|
|
# ------------------------------------------------------------------
|
|
|
|
def step(self, step_name: str, context: dict[str, Any] | None = None) -> "_StepContext":
|
|
return _StepContext(self, step_name, context)
|
|
|
|
|
|
class _StepContext:
|
|
"""Context manager that auto-calls step_start / step_done / step_error."""
|
|
|
|
def __init__(self, pl: PipelineLogger, step_name: str, context: dict | None):
|
|
self._pl = pl
|
|
self._name = step_name
|
|
self._context = context
|
|
|
|
def __enter__(self) -> "_StepContext":
|
|
self._pl.step_start(self._name, self._context)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
if exc_type is None:
|
|
self._pl.step_done(self._name)
|
|
else:
|
|
self._pl.step_error(self._name, str(exc_val), exc_val)
|
|
return False # do not suppress exceptions
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# @timed_step decorator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def timed_step(step_name: str, pipeline_logger: PipelineLogger | None = None):
|
|
"""Decorator that auto-times a function and logs via PipelineLogger.
|
|
|
|
Captures wall-clock duration and peak RSS delta. If a Redis connection
|
|
is available, stores metrics to ``pipeline:metrics:{context_id}`` as a
|
|
hash field ``{step_name}`` → JSON ``{duration_s, rss_delta_kb}``.
|
|
|
|
Usage::
|
|
|
|
pl = PipelineLogger(task_id=self.request.id)
|
|
|
|
@timed_step("extract_objects", pl)
|
|
def do_extraction(step_path):
|
|
...
|
|
|
|
Or without a logger (metrics still stored to Redis if context_id given)::
|
|
|
|
@timed_step("extract_objects")
|
|
def do_extraction(step_path):
|
|
...
|
|
"""
|
|
def decorator(fn):
|
|
@functools.wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
pl = pipeline_logger
|
|
rss_before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|
|
t0 = time.monotonic()
|
|
if pl:
|
|
pl.step_start(step_name)
|
|
try:
|
|
result = fn(*args, **kwargs)
|
|
duration = round(time.monotonic() - t0, 3)
|
|
rss_after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|
|
rss_delta_kb = rss_after - rss_before
|
|
metrics = {
|
|
"duration_s": duration,
|
|
"rss_delta_kb": rss_delta_kb,
|
|
}
|
|
if pl:
|
|
pl.step_done(step_name, duration_s=duration, result=metrics)
|
|
else:
|
|
_log.info(f"[{step_name}] done | {duration:.1f}s | rss_delta={rss_delta_kb}KB")
|
|
_store_metrics(step_name, metrics, kwargs.get("context_id"))
|
|
return result
|
|
except Exception as exc:
|
|
duration = round(time.monotonic() - t0, 3)
|
|
if pl:
|
|
pl.step_error(step_name, str(exc), exc)
|
|
else:
|
|
_log.exception(f"[{step_name}] ERROR — {exc}")
|
|
raise
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def _store_metrics(step_name: str, metrics: dict, context_id: str | None = None) -> None:
|
|
"""Store step metrics to Redis hash (best-effort, never raises)."""
|
|
if not context_id:
|
|
return
|
|
try:
|
|
import json
|
|
from app.config import settings
|
|
import redis
|
|
r = redis.from_url(settings.redis_url)
|
|
key = f"pipeline:metrics:{context_id}"
|
|
r.hset(key, step_name, json.dumps(metrics))
|
|
r.expire(key, 86400) # 24h TTL
|
|
except Exception:
|
|
pass # metrics storage is non-critical
|