"""JWT authentication utilities.""" import uuid from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, HTTPBearer as _HTTPBearer from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.config import settings from app.database import get_db from app.models.user import User bearer_scheme_optional = _HTTPBearer(auto_error=False) pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") bearer_scheme = HTTPBearer() def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) def create_access_token(user_id: str, role: str, tenant_id: str | None = None) -> str: expires = datetime.utcnow() + timedelta(minutes=settings.jwt_access_token_expire_minutes) payload: dict = {"sub": user_id, "role": role, "exp": expires} if tenant_id: payload["tenant_id"] = tenant_id return jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm) def decode_token(token: str) -> dict: try: return jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]) except JWTError as exc: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), db: AsyncSession = Depends(get_db), ) -> User: payload = decode_token(credentials.credentials) user_id = payload.get("sub") if not user_id: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") result = await db.execute(select(User).where(User.id == uuid.UUID(user_id))) user = result.scalar_one_or_none() if not user or not user.is_active: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive") return user def _role_value(user: User) -> str: """Return the string value of a user's role (works for both enum and plain str).""" role = user.role return role.value if hasattr(role, "value") else str(role) # ── New role-hierarchy guards ───────────────────────────────────────────────── async def require_global_admin(user: User = Depends(get_current_user)) -> User: """GlobalAdmin only (platform operator). Replaces require_admin() for new code.""" from app.domains.auth.models import ADMIN_ROLES if _role_value(user) not in ADMIN_ROLES: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Global admin access required") return user async def require_tenant_admin_or_above(user: User = Depends(get_current_user)) -> User: """TenantAdmin, GlobalAdmin, or legacy admin.""" from app.domains.auth.models import TENANT_ADMIN_ROLES if _role_value(user) not in TENANT_ADMIN_ROLES: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Tenant admin access required") return user async def require_pm_or_above(user: User = Depends(get_current_user)) -> User: """ProjectManager, TenantAdmin, GlobalAdmin, or legacy admin.""" from app.domains.auth.models import PM_ROLES if _role_value(user) not in PM_ROLES: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Project manager or above access required", ) return user def is_admin(user: User) -> bool: """Helper: True if user has any admin-level role.""" from app.domains.auth.models import ADMIN_ROLES return _role_value(user) in ADMIN_ROLES def is_privileged(user: User) -> bool: """Helper: True if user can manage orders/renders (PM or above).""" from app.domains.auth.models import PM_ROLES return _role_value(user) in PM_ROLES # ── Legacy guards (kept for backward compatibility) ─────────────────────────── # These now accept both old ("admin") and new ("global_admin") roles. async def require_admin(user: User = Depends(get_current_user)) -> User: """Backward-compat alias for require_global_admin(). Prefer require_global_admin() in new code.""" from app.domains.auth.models import ADMIN_ROLES if _role_value(user) not in ADMIN_ROLES: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") return user async def require_admin_or_pm(user: User = Depends(get_current_user)) -> User: """Backward-compat alias for require_pm_or_above(). Prefer require_pm_or_above() in new code.""" from app.domains.auth.models import PM_ROLES if _role_value(user) not in PM_ROLES: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin or Project Manager access required", ) return user async def get_current_user_optional( credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme_optional), db: AsyncSession = Depends(get_db), ) -> Optional[User]: """Return current user if a valid Bearer token is provided, otherwise None.""" if not credentials: return None try: payload = decode_token(credentials.credentials) except HTTPException: return None user_id = payload.get("sub") if not user_id: return None result = await db.execute(select(User).where(User.id == uuid.UUID(user_id))) user = result.scalar_one_or_none() if not user or not user.is_active: return None return user