Files
Hartmut 9f54bc3ab1 feat(phase4+5): role hierarchy, tenant config, fallback material, dead code removal
Phase 4.1 — Role Hierarchy:
  - UserRole enum: add global_admin (platform operator) + tenant_admin
    (per-tenant admin); keep legacy 'admin' for backward compat
  - Role sets: ADMIN_ROLES, TENANT_ADMIN_ROLES, PM_ROLES, RLS_BYPASS_ROLES
  - New auth guards: require_global_admin(), require_tenant_admin_or_above(),
    require_pm_or_above(), is_admin(), is_privileged()
  - Legacy require_admin / require_admin_or_pm now check both old+new roles
  - Migration 049: ADD VALUE global_admin + tenant_admin with AUTOCOMMIT
    workaround; backfills admin → global_admin
  - Seed: new admin users created with global_admin role

Phase 4.3 — RLS bypass updated for global_admin in get_db + set_tenant_context

Phase 4.4 — Tenant Feature Flags:
  - Migration 050: tenant_config JSONB on tenants table
  - Tenant model: tenant_config field + get_config() accessor
  - Defaults: max_concurrent_renders=3, fallback_material, invoice_prefix etc.

Phase 5.1 — Fallback Material:
  - blender_render.py: replace PALETTE_LINEAR/PALETTE_HEX/_assign_palette_material
    with _assign_failed_material() → SCHAEFFLER_059999_FailedMaterial (magenta)
  - Unmatched parts now logged explicitly before rendering

Phase 5.2 — Remove EEVEE fallback:
  - render_blender.py: EEVEE→Cycles silent retry removed; hard failure on EEVEE error

Phase 5.3 — Remove Blender version check:
  - render_blender.py: deleted MIN_BLENDER_VERSION = (5, 0, 1) constant

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 19:42:10 +01:00

150 lines
5.7 KiB
Python

"""JWT authentication utilities."""
import uuid
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, HTTPBearer as _HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.config import settings
from app.database import get_db
from app.models.user import User
bearer_scheme_optional = _HTTPBearer(auto_error=False)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
bearer_scheme = HTTPBearer()
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(user_id: str, role: str, tenant_id: str | None = None) -> str:
expires = datetime.utcnow() + timedelta(minutes=settings.jwt_access_token_expire_minutes)
payload: dict = {"sub": user_id, "role": role, "exp": expires}
if tenant_id:
payload["tenant_id"] = tenant_id
return jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
except JWTError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
payload = decode_token(credentials.credentials)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
result = await db.execute(select(User).where(User.id == uuid.UUID(user_id)))
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive")
return user
def _role_value(user: User) -> str:
"""Return the string value of a user's role (works for both enum and plain str)."""
role = user.role
return role.value if hasattr(role, "value") else str(role)
# ── New role-hierarchy guards ─────────────────────────────────────────────────
async def require_global_admin(user: User = Depends(get_current_user)) -> User:
"""GlobalAdmin only (platform operator). Replaces require_admin() for new code."""
from app.domains.auth.models import ADMIN_ROLES
if _role_value(user) not in ADMIN_ROLES:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Global admin access required")
return user
async def require_tenant_admin_or_above(user: User = Depends(get_current_user)) -> User:
"""TenantAdmin, GlobalAdmin, or legacy admin."""
from app.domains.auth.models import TENANT_ADMIN_ROLES
if _role_value(user) not in TENANT_ADMIN_ROLES:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Tenant admin access required")
return user
async def require_pm_or_above(user: User = Depends(get_current_user)) -> User:
"""ProjectManager, TenantAdmin, GlobalAdmin, or legacy admin."""
from app.domains.auth.models import PM_ROLES
if _role_value(user) not in PM_ROLES:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Project manager or above access required",
)
return user
def is_admin(user: User) -> bool:
"""Helper: True if user has any admin-level role."""
from app.domains.auth.models import ADMIN_ROLES
return _role_value(user) in ADMIN_ROLES
def is_privileged(user: User) -> bool:
"""Helper: True if user can manage orders/renders (PM or above)."""
from app.domains.auth.models import PM_ROLES
return _role_value(user) in PM_ROLES
# ── Legacy guards (kept for backward compatibility) ───────────────────────────
# These now accept both old ("admin") and new ("global_admin") roles.
async def require_admin(user: User = Depends(get_current_user)) -> User:
"""Backward-compat alias for require_global_admin(). Prefer require_global_admin() in new code."""
from app.domains.auth.models import ADMIN_ROLES
if _role_value(user) not in ADMIN_ROLES:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
return user
async def require_admin_or_pm(user: User = Depends(get_current_user)) -> User:
"""Backward-compat alias for require_pm_or_above(). Prefer require_pm_or_above() in new code."""
from app.domains.auth.models import PM_ROLES
if _role_value(user) not in PM_ROLES:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin or Project Manager access required",
)
return user
async def get_current_user_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme_optional),
db: AsyncSession = Depends(get_db),
) -> Optional[User]:
"""Return current user if a valid Bearer token is provided, otherwise None."""
if not credentials:
return None
try:
payload = decode_token(credentials.credentials)
except HTTPException:
return None
user_id = payload.get("sub")
if not user_id:
return None
result = await db.execute(select(User).where(User.id == uuid.UUID(user_id)))
user = result.scalar_one_or_none()
if not user or not user.is_active:
return None
return user