Files
HartOMat/backend/app/main.py
T
Hartmut ea31ed657c feat(refactor/phase1): foundation infrastructure for modular pipeline
Phase 1 of PLAN_REFACTOR.md — all four sub-tasks implemented:

1.1 PipelineLogger (backend/app/core/pipeline_logger.py)
  - Structured step_start/step_done/step_error/step_progress API
  - Publishes to Python logging AND Redis SSE via log_task_event
  - Context manager `pl.step("name")` for auto-timing

1.2 RenderJobDocument (backend/app/domains/rendering/job_document.py)
  - Pydantic JSONB schema: state machine + per-step records + timing
  - begin_step/finish_step/fail_step/skip_step helpers
  - Migration 048: adds render_job_doc JSONB column to order_lines
  - OrderLine model updated with render_job_doc field

1.3 TenantContextMiddleware (backend/app/core/middleware.py)
  - Decodes JWT, stores tenant_id + role in request.state
  - get_db updated to auto-apply RLS SET LOCAL from request.state
  - Registered in main.py (runs before every request)
  - JWT now embeds tenant_id claim via create_access_token()
  - Login endpoint passes tenant_id to token creation

1.4 ProcessStep Registry (backend/app/core/process_steps.py)
  - StepName StrEnum with all 20 pipeline step names
  - Single source of truth for log prefixes, DB records, UI labels

Also adds db_utils.py with set_tenant_sync() + get_sync_session()
for use inside Celery tasks (bypass-safe RLS helper).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 19:25:08 +01:00

149 lines
5.7 KiB
Python

from contextlib import asynccontextmanager
import uuid
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pathlib import Path
from app.config import settings
from app.database import engine, Base
from app.core.websocket import manager as ws_manager
from app.core.middleware import TenantContextMiddleware
# Import routers from domain locations
from app.domains.auth.router import router as auth_router
from app.domains.imports.router import uploads_router, templates_router
from app.domains.orders.router import orders_router, order_items_router
from app.domains.admin.router import admin_router, analytics_router, worker_router
from app.domains.products.router import products_router, cad_router
from app.domains.materials.router import router as materials_router
from app.domains.rendering.router import render_templates_router, output_types_router
from app.domains.notifications.router import router as notifications_router
from app.domains.billing.router import pricing_router, invoice_router
from app.domains.tenants.router import router as tenants_router
from app.domains.rendering.workflow_router import router as workflows_router
from app.domains.media.router import router as media_router
from app.api.routers.asset_libraries import router as asset_libraries_router
from app.domains.admin.dashboard_router import router as dashboard_router
from app.api.routers.task_logs import router as task_logs_router
@asynccontextmanager
async def lifespan(app: FastAPI):
# Create upload directories
for subdir in ("step_files", "excel_files", "thumbnails", "renders", "blend-templates"):
Path(settings.upload_dir, subdir).mkdir(parents=True, exist_ok=True)
# Start WebSocket Redis subscriber
await ws_manager.start_redis_subscriber()
yield
await ws_manager.stop()
app = FastAPI(
title="Schaeffler Automat API",
version="0.1.0",
description="Media-creation pipeline for Schaeffler CAD/bearing product orders",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:3000", "http://frontend:5173", "http://localhost:8888"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(TenantContextMiddleware)
# Mount static files for thumbnails (dir created in lifespan; skip if not writable)
thumbnails_dir = Path(settings.upload_dir) / "thumbnails"
try:
thumbnails_dir.mkdir(parents=True, exist_ok=True)
app.mount("/thumbnails", StaticFiles(directory=str(thumbnails_dir)), name="thumbnails")
except (PermissionError, OSError):
pass # Running outside Docker without upload dir — thumbnails won't be served statically
# Mount static files for renders
renders_dir = Path(settings.upload_dir) / "renders"
try:
renders_dir.mkdir(parents=True, exist_ok=True)
app.mount("/renders", StaticFiles(directory=str(renders_dir)), name="renders")
except (PermissionError, OSError):
pass
# Include routers (via domain locations)
app.include_router(auth_router, prefix="/api")
app.include_router(uploads_router, prefix="/api")
app.include_router(orders_router, prefix="/api")
app.include_router(templates_router, prefix="/api")
app.include_router(admin_router, prefix="/api")
app.include_router(order_items_router, prefix="/api")
app.include_router(cad_router, prefix="/api")
app.include_router(materials_router, prefix="/api")
app.include_router(worker_router, prefix="/api")
app.include_router(analytics_router, prefix="/api")
app.include_router(pricing_router, prefix="/api")
app.include_router(invoice_router, prefix="/api")
app.include_router(products_router, prefix="/api")
app.include_router(output_types_router, prefix="/api")
app.include_router(render_templates_router, prefix="/api")
app.include_router(notifications_router, prefix="/api")
app.include_router(tenants_router, prefix="/api")
app.include_router(workflows_router)
app.include_router(media_router)
app.include_router(asset_libraries_router, prefix="/api")
app.include_router(dashboard_router, prefix="/api")
app.include_router(task_logs_router, prefix="/api")
@app.get("/health")
async def health():
return {"status": "ok", "service": "schaefflerautomat-backend"}
@app.websocket("/api/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str = Query(..., description="JWT access token"),
):
"""WebSocket endpoint for real-time events.
Clients connect with ?token=<jwt>. Events are scoped by tenant_id.
"""
from app.utils.auth import decode_token
from app.database import AsyncSessionLocal
from sqlalchemy import select
from app.models.user import User
# Authenticate via token query param (WS cannot send Authorization header)
try:
payload = decode_token(token)
user_id = payload.get("sub")
if not user_id:
await websocket.close(code=4001)
return
except HTTPException:
await websocket.close(code=4001)
return
# Load user to get tenant_id
async with AsyncSessionLocal() as db:
result = await db.execute(select(User).where(User.id == uuid.UUID(user_id)))
user = result.scalar_one_or_none()
if not user or not user.is_active:
await websocket.close(code=4001)
return
tenant_id = str(user.tenant_id) if user.tenant_id else user_id
await ws_manager.connect(websocket, tenant_id)
try:
while True:
# Keep alive — clients send periodic pings as text
await websocket.receive_text()
except WebSocketDisconnect:
await ws_manager.disconnect(websocket, tenant_id)
except Exception:
await ws_manager.disconnect(websocket, tenant_id)