Files
HartOMat/backend/app/utils/auth.py
T
Hartmut 251dd703ed feat(B2): add tenant model + migrations 035/036 + RLS policies
Migration 035: tenants table with 'Schaeffler' default seed.
Migration 036: tenant_id FK on all tables, RLS policies, backfill.
New domains/tenants/ with CRUD router (admin only).
All domain models extended with tenant_id FK.
core/database.py: get_db_for_tenant with RLS context setter.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-06 16:30:41 +01:00

94 lines
3.2 KiB
Python

"""JWT authentication utilities."""
import uuid
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, HTTPBearer as _HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.config import settings
from app.database import get_db
from app.models.user import User
bearer_scheme_optional = _HTTPBearer(auto_error=False)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
bearer_scheme = HTTPBearer()
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(user_id: str, role: str) -> str:
expires = datetime.utcnow() + timedelta(minutes=settings.jwt_access_token_expire_minutes)
payload = {"sub": user_id, "role": role, "exp": expires}
return jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
except JWTError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
payload = decode_token(credentials.credentials)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
result = await db.execute(select(User).where(User.id == uuid.UUID(user_id)))
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive")
return user
async def require_admin(user: User = Depends(get_current_user)) -> User:
if user.role.value != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
return user
async def require_admin_or_pm(user: User = Depends(get_current_user)) -> User:
if user.role.value not in ("admin", "project_manager"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin or Project Manager access required",
)
return user
async def get_current_user_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme_optional),
db: AsyncSession = Depends(get_db),
) -> Optional[User]:
"""Return current user if a valid Bearer token is provided, otherwise None."""
if not credentials:
return None
try:
payload = decode_token(credentials.credentials)
except HTTPException:
return None
user_id = payload.get("sub")
if not user_id:
return None
result = await db.execute(select(User).where(User.id == uuid.UUID(user_id)))
user = result.scalar_one_or_none()
if not user or not user.is_active:
return None
return user