Files
Aether/backend/app/auth/dependencies.py
T
2026-05-09 10:18:09 +03:00

57 lines
2.1 KiB
Python

import logging
from typing import Optional
from fastapi import Depends, HTTPException, status
from jose import jwt, JWTError
from app.utils.OAuth2WithCookie import OAuth2PasswordBearerWithCookie
from app.core.config import settings
from app.users.models import UserModel
from app.users.service import UserService
from app.core.exceptions import InvalidTokenException
log = logging.getLogger(__name__)
oauth2_scheme = OAuth2PasswordBearerWithCookie(tokenUrl="/api/v1/auth/login")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> Optional[UserModel]:
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=settings.ALGORITHM)
user_id = int(payload.get("sub"))
log.debug("Successfully get current_user id", extra={"user_id": user_id})
if user_id is None:
log.warning("User id is None")
raise InvalidTokenException
except (Exception, JWTError) as ex:
if isinstance(ex, InvalidTokenException):
raise ex
if isinstance(ex, JWTError):
log.error("JWT error")
raise ex
log.error("Unknown exception")
raise ex
current_user = await UserService.get_user(user_id)
if not current_user.is_active:
log.debug("User is not active", extra={"user_id": current_user.id})
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="User is not active")
return current_user
async def get_current_superuser(current_user: UserModel = Depends(get_current_user)) -> Optional[UserModel]:
if not current_user.is_superuser:
log.debug("User not enough privileges", extra={"user_id": current_user.id})
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="Not enough privileges")
return current_user
async def get_current_verified_user(current_user: UserModel = Depends(get_current_user)):
if not current_user.is_verified:
log.debug("User has not confirmed the email", extra={"user_id": str(current_user.id)})
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="Verify email")
return current_user