9f54bc3ab1
Phase 4.1 — Role Hierarchy:
- UserRole enum: add global_admin (platform operator) + tenant_admin
(per-tenant admin); keep legacy 'admin' for backward compat
- Role sets: ADMIN_ROLES, TENANT_ADMIN_ROLES, PM_ROLES, RLS_BYPASS_ROLES
- New auth guards: require_global_admin(), require_tenant_admin_or_above(),
require_pm_or_above(), is_admin(), is_privileged()
- Legacy require_admin / require_admin_or_pm now check both old+new roles
- Migration 049: ADD VALUE global_admin + tenant_admin with AUTOCOMMIT
workaround; backfills admin → global_admin
- Seed: new admin users created with global_admin role
Phase 4.3 — RLS bypass updated for global_admin in get_db + set_tenant_context
Phase 4.4 — Tenant Feature Flags:
- Migration 050: tenant_config JSONB on tenants table
- Tenant model: tenant_config field + get_config() accessor
- Defaults: max_concurrent_renders=3, fallback_material, invoice_prefix etc.
Phase 5.1 — Fallback Material:
- blender_render.py: replace PALETTE_LINEAR/PALETTE_HEX/_assign_palette_material
with _assign_failed_material() → SCHAEFFLER_059999_FailedMaterial (magenta)
- Unmatched parts now logged explicitly before rendering
Phase 5.2 — Remove EEVEE fallback:
- render_blender.py: EEVEE→Cycles silent retry removed; hard failure on EEVEE error
Phase 5.3 — Remove Blender version check:
- render_blender.py: deleted MIN_BLENDER_VERSION = (5, 0, 1) constant
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
150 lines
5.7 KiB
Python
150 lines
5.7 KiB
Python
"""JWT authentication utilities."""
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, HTTPBearer as _HTTPBearer
|
|
from jose import JWTError, jwt
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
|
|
from app.config import settings
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
|
|
bearer_scheme_optional = _HTTPBearer(auto_error=False)
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
bearer_scheme = HTTPBearer()
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
return pwd_context.verify(plain, hashed)
|
|
|
|
|
|
def create_access_token(user_id: str, role: str, tenant_id: str | None = None) -> str:
|
|
expires = datetime.utcnow() + timedelta(minutes=settings.jwt_access_token_expire_minutes)
|
|
payload: dict = {"sub": user_id, "role": role, "exp": expires}
|
|
if tenant_id:
|
|
payload["tenant_id"] = tenant_id
|
|
return jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
|
|
|
|
|
|
def decode_token(token: str) -> dict:
|
|
try:
|
|
return jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
|
|
except JWTError as exc:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> User:
|
|
payload = decode_token(credentials.credentials)
|
|
user_id = payload.get("sub")
|
|
if not user_id:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
|
|
|
|
result = await db.execute(select(User).where(User.id == uuid.UUID(user_id)))
|
|
user = result.scalar_one_or_none()
|
|
if not user or not user.is_active:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive")
|
|
return user
|
|
|
|
|
|
def _role_value(user: User) -> str:
|
|
"""Return the string value of a user's role (works for both enum and plain str)."""
|
|
role = user.role
|
|
return role.value if hasattr(role, "value") else str(role)
|
|
|
|
|
|
# ── New role-hierarchy guards ─────────────────────────────────────────────────
|
|
|
|
async def require_global_admin(user: User = Depends(get_current_user)) -> User:
|
|
"""GlobalAdmin only (platform operator). Replaces require_admin() for new code."""
|
|
from app.domains.auth.models import ADMIN_ROLES
|
|
if _role_value(user) not in ADMIN_ROLES:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Global admin access required")
|
|
return user
|
|
|
|
|
|
async def require_tenant_admin_or_above(user: User = Depends(get_current_user)) -> User:
|
|
"""TenantAdmin, GlobalAdmin, or legacy admin."""
|
|
from app.domains.auth.models import TENANT_ADMIN_ROLES
|
|
if _role_value(user) not in TENANT_ADMIN_ROLES:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Tenant admin access required")
|
|
return user
|
|
|
|
|
|
async def require_pm_or_above(user: User = Depends(get_current_user)) -> User:
|
|
"""ProjectManager, TenantAdmin, GlobalAdmin, or legacy admin."""
|
|
from app.domains.auth.models import PM_ROLES
|
|
if _role_value(user) not in PM_ROLES:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Project manager or above access required",
|
|
)
|
|
return user
|
|
|
|
|
|
def is_admin(user: User) -> bool:
|
|
"""Helper: True if user has any admin-level role."""
|
|
from app.domains.auth.models import ADMIN_ROLES
|
|
return _role_value(user) in ADMIN_ROLES
|
|
|
|
|
|
def is_privileged(user: User) -> bool:
|
|
"""Helper: True if user can manage orders/renders (PM or above)."""
|
|
from app.domains.auth.models import PM_ROLES
|
|
return _role_value(user) in PM_ROLES
|
|
|
|
|
|
# ── Legacy guards (kept for backward compatibility) ───────────────────────────
|
|
# These now accept both old ("admin") and new ("global_admin") roles.
|
|
|
|
async def require_admin(user: User = Depends(get_current_user)) -> User:
|
|
"""Backward-compat alias for require_global_admin(). Prefer require_global_admin() in new code."""
|
|
from app.domains.auth.models import ADMIN_ROLES
|
|
if _role_value(user) not in ADMIN_ROLES:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
|
|
return user
|
|
|
|
|
|
async def require_admin_or_pm(user: User = Depends(get_current_user)) -> User:
|
|
"""Backward-compat alias for require_pm_or_above(). Prefer require_pm_or_above() in new code."""
|
|
from app.domains.auth.models import PM_ROLES
|
|
if _role_value(user) not in PM_ROLES:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin or Project Manager access required",
|
|
)
|
|
return user
|
|
|
|
|
|
async def get_current_user_optional(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme_optional),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> Optional[User]:
|
|
"""Return current user if a valid Bearer token is provided, otherwise None."""
|
|
if not credentials:
|
|
return None
|
|
try:
|
|
payload = decode_token(credentials.credentials)
|
|
except HTTPException:
|
|
return None
|
|
user_id = payload.get("sub")
|
|
if not user_id:
|
|
return None
|
|
result = await db.execute(select(User).where(User.id == uuid.UUID(user_id)))
|
|
user = result.scalar_one_or_none()
|
|
if not user or not user.is_active:
|
|
return None
|
|
return user
|