mirror of
https://github.com/lorsanstand/Aether.git
synced 2026-06-19 12:05:16 +03:00
56 lines
2.0 KiB
Python
56 lines
2.0 KiB
Python
import logging
|
|
from typing import Optional
|
|
import uuid
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from jose import jwt, JWTError
|
|
|
|
from app.utils.OAuth2WithCookie import OAuth2PasswordBearerWithCookie
|
|
from app.config import settings
|
|
from app.users.models import UserModel
|
|
from app.users.service import UserService
|
|
from app.exceptions import InvalidTokenException
|
|
|
|
log = logging.getLogger(__name__)
|
|
oauth2_scheme = OAuth2PasswordBearerWithCookie(tokenUrl="/api/v1/auth/login")
|
|
|
|
async def get_current_user(token: str = Depends(oauth2_scheme)) -> Optional[UserModel]:
|
|
try:
|
|
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=settings.ALGORITHM)
|
|
user_id = int(payload.get("sub"))
|
|
log.debug("Successfully get current_user id", extra={"user_id": user_id})
|
|
|
|
if user_id is None:
|
|
log.warning("User id is None")
|
|
raise InvalidTokenException
|
|
except (Exception, JWTError) as ex:
|
|
if isinstance(ex, InvalidTokenException):
|
|
raise ex
|
|
|
|
if isinstance(ex, JWTError):
|
|
log.error("JWT error")
|
|
raise ex
|
|
|
|
log.error("Unknown exception")
|
|
raise ex
|
|
|
|
current_user = await UserService.get_user(user_id)
|
|
|
|
if not current_user.is_active:
|
|
log.debug("User is not active", extra={"user_id": current_user.id})
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="User is not active")
|
|
|
|
return current_user
|
|
|
|
|
|
async def get_current_superuser(current_user: UserModel = Depends(get_current_user)) -> Optional[UserModel]:
|
|
if not current_user.is_superuser:
|
|
log.debug("User not enough privileges", extra={"user_id": current_user.id})
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="Not enough privileges")
|
|
|
|
return current_user
|
|
|
|
async def get_current_verified_user(current_user: UserModel = Depends(get_current_user)):
|
|
if not current_user.is_verified:
|
|
log.debug("User has not confirmed the email.", extra={"user_id": current_user.id})
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="verify email") |