Skip to content

Instantly share code, notes, and snippets.

@vladmeh
Forked from RomanAVolodin/auth.py
Created May 5, 2024 09:06
Show Gist options
  • Select an option

  • Save vladmeh/9228eec93ccba9b966a3c50617b46f08 to your computer and use it in GitHub Desktop.

Select an option

Save vladmeh/9228eec93ccba9b966a3c50617b46f08 to your computer and use it in GitHub Desktop.

Revisions

  1. @RomanAVolodin RomanAVolodin revised this gist Aug 9, 2023. No changes.
  2. @RomanAVolodin RomanAVolodin revised this gist Aug 9, 2023. 1 changed file with 7 additions and 0 deletions.
    7 changes: 7 additions & 0 deletions main.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,7 @@
    ...

    app.include_router(
    users_router, prefix='/api/v1/users', tags=['users'], dependencies=[Depends(get_current_user_global)],
    )

    ...
  3. @RomanAVolodin RomanAVolodin created this gist Aug 9, 2023.
    45 changes: 45 additions & 0 deletions auth.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,45 @@
    from functools import wraps

    from async_fastapi_jwt_auth import AuthJWT
    from fastapi import Depends, Request, status
    from fastapi.security import HTTPBearer
    from sqlalchemy.ext.asyncio import AsyncSession

    from db.db import get_session
    from helpers.auth_request import AuthRequest
    from helpers.exceptions import AuthException
    from models.user import UserRole
    from schemas.user import UserInDb
    from services.user_repository import users_crud


    def roles_required(roles_list: list[UserRole]):
    def decorator(function):
    @wraps(function)
    async def wrapper(*args, **kwargs):
    user: UserInDb = kwargs.get('request').custom_user
    if not user or user.role not in [x.value for x in roles_list]:
    raise AuthException(
    'This operation is forbidden for you', status_code=status.HTTP_403_FORBIDDEN,
    )
    return await function(*args, **kwargs)
    return wrapper
    return decorator


    class JWTBearer(HTTPBearer):
    def __init__(self, auto_error: bool = True):
    super().__init__(auto_error=auto_error)

    async def __call__(self, request: Request, db: AsyncSession = Depends(get_session)) -> UserInDb | None:
    authorize = AuthJWT(req=request)
    await authorize.jwt_optional()
    user_id = await authorize.get_jwt_subject()
    if not user_id:
    return None
    user = await users_crud.get(db=db, id=user_id)
    return UserInDb.from_orm(user)


    async def get_current_user_global(request: AuthRequest, user: AsyncSession = Depends(JWTBearer())):
    request.custom_user = user
    7 changes: 7 additions & 0 deletions auth_request.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,7 @@
    from fastapi import Request

    from schemas.user import UserInDb


    class AuthRequest(Request):
    custom_user: UserInDb
    29 changes: 29 additions & 0 deletions endpoints.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,29 @@
    import uuid

    from fastapi import APIRouter, Depends, HTTPException, Query, status
    from sqlalchemy.ext.asyncio import AsyncSession

    from core.settings import settings
    from db.db import get_session
    from helpers.auth import roles_required
    from helpers.auth_request import AuthRequest
    from models.user import UserRole
    from schemas.login_history import LoginHistoryResponse
    from schemas.user import UserInDb, UserResponse, UserShort, UserUpdateRoleDto
    from services.login_history_repository import history_crud
    from services.user_repository import users_crud

    router = APIRouter()


    @router.get('/', response_model=list[UserShort])
    @roles_required(roles_list=[UserRole.admin, UserRole.privileged_user])
    async def read_users(
    *,
    request: AuthRequest,
    db: AsyncSession = Depends(get_session),
    skip: int = Query(0, description='Items to skip', ge=0),
    limit: int = Query(settings.pagination_limit, description='Items amount on page', ge=1),
    ) -> list[UserShort]:
    entities = await users_crud.get_multi(db=db, skip=skip, limit=limit)
    return entities