"""JWT authentication utilities.""" import uuid from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, HTTPBearer as _HTTPBearer from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.config import settings from app.database import get_db from app.models.user import User bearer_scheme_optional = _HTTPBearer(auto_error=False) pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") bearer_scheme = HTTPBearer() def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) def create_access_token(user_id: str, role: str) -> str: expires = datetime.utcnow() + timedelta(minutes=settings.jwt_access_token_expire_minutes) payload = {"sub": user_id, "role": role, "exp": expires} return jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm) def decode_token(token: str) -> dict: try: return jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]) except JWTError as exc: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), db: AsyncSession = Depends(get_db), ) -> User: payload = decode_token(credentials.credentials) user_id = payload.get("sub") if not user_id: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") result = await db.execute(select(User).where(User.id == uuid.UUID(user_id))) user = result.scalar_one_or_none() if not user or not user.is_active: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive") return user async def require_admin(user: User = Depends(get_current_user)) -> User: if user.role.value != "admin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") return user async def require_admin_or_pm(user: User = Depends(get_current_user)) -> User: if user.role.value not in ("admin", "project_manager"): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin or Project Manager access required", ) return user async def get_current_user_optional( credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme_optional), db: AsyncSession = Depends(get_db), ) -> Optional[User]: """Return current user if a valid Bearer token is provided, otherwise None.""" if not credentials: return None try: payload = decode_token(credentials.credentials) except HTTPException: return None user_id = payload.get("sub") if not user_id: return None result = await db.execute(select(User).where(User.id == uuid.UUID(user_id))) user = result.scalar_one_or_none() if not user or not user.is_active: return None return user