ea31ed657c
Phase 1 of PLAN_REFACTOR.md — all four sub-tasks implemented:
1.1 PipelineLogger (backend/app/core/pipeline_logger.py)
- Structured step_start/step_done/step_error/step_progress API
- Publishes to Python logging AND Redis SSE via log_task_event
- Context manager `pl.step("name")` for auto-timing
1.2 RenderJobDocument (backend/app/domains/rendering/job_document.py)
- Pydantic JSONB schema: state machine + per-step records + timing
- begin_step/finish_step/fail_step/skip_step helpers
- Migration 048: adds render_job_doc JSONB column to order_lines
- OrderLine model updated with render_job_doc field
1.3 TenantContextMiddleware (backend/app/core/middleware.py)
- Decodes JWT, stores tenant_id + role in request.state
- get_db updated to auto-apply RLS SET LOCAL from request.state
- Registered in main.py (runs before every request)
- JWT now embeds tenant_id claim via create_access_token()
- Login endpoint passes tenant_id to token creation
1.4 ProcessStep Registry (backend/app/core/process_steps.py)
- StepName StrEnum with all 20 pipeline step names
- Single source of truth for log prefixes, DB records, UI labels
Also adds db_utils.py with set_tenant_sync() + get_sync_session()
for use inside Celery tasks (bypass-safe RLS helper).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
72 lines
2.3 KiB
Python
72 lines
2.3 KiB
Python
"""Database utilities for use inside Celery tasks (sync context).
|
|
|
|
Celery tasks bypass FastAPI middleware, so tenant RLS context must be set
|
|
manually. Use _set_tenant() immediately after creating a sync DB session.
|
|
|
|
Usage::
|
|
|
|
from app.core.db_utils import set_tenant_sync, get_sync_session
|
|
|
|
with get_sync_session() as db:
|
|
set_tenant_sync(db, tenant_id, role)
|
|
# queries here will be RLS-filtered
|
|
"""
|
|
import contextlib
|
|
from typing import Generator
|
|
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
|
|
from app.config import settings
|
|
|
|
|
|
def set_tenant_sync(session: Session, tenant_id: str | None, role: str | None = None) -> None:
|
|
"""Set RLS tenant context on a *synchronous* SQLAlchemy session.
|
|
|
|
Call this at the very start of any sync DB block inside a Celery task
|
|
when you need tenant isolation. Admins bypass RLS; all other roles get
|
|
a scoped context. If tenant_id is None the call is a no-op (global
|
|
access, i.e. no RLS enforcement).
|
|
"""
|
|
if not tenant_id:
|
|
return
|
|
if role == "admin":
|
|
session.execute(text("SET LOCAL app.current_tenant_id = 'bypass'"))
|
|
else:
|
|
session.execute(
|
|
text("SET LOCAL app.current_tenant_id = :tid"),
|
|
{"tid": tenant_id},
|
|
)
|
|
|
|
|
|
# Lazily created sync engine (reused across tasks in the same worker process)
|
|
_sync_engine = None
|
|
|
|
|
|
def _get_sync_engine():
|
|
global _sync_engine
|
|
if _sync_engine is None:
|
|
sync_url = settings.database_url.replace("+asyncpg", "").replace("+aiosqlite", "")
|
|
_sync_engine = create_engine(sync_url, pool_pre_ping=True, pool_size=5, max_overflow=10)
|
|
return _sync_engine
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def get_sync_session(tenant_id: str | None = None, role: str | None = None) -> Generator[Session, None, None]:
|
|
"""Context manager that yields a synchronous DB session with optional RLS.
|
|
|
|
Prefer using the existing async session patterns in FastAPI routes.
|
|
This helper is intended for Celery tasks only.
|
|
"""
|
|
factory = sessionmaker(bind=_get_sync_engine(), expire_on_commit=False)
|
|
with factory() as session:
|
|
if tenant_id:
|
|
set_tenant_sync(session, tenant_id, role)
|
|
try:
|
|
yield session
|
|
except Exception:
|
|
session.rollback()
|
|
raise
|
|
else:
|
|
session.commit()
|