MushroomMaula / fastapi_login

FastAPI-Login tries to provide similar functionality as Flask-Login does.
https://pypi.org/project/fastapi-login
MIT License
639 stars 58 forks source link

Exception raised on incorrect scopes #46

Closed kigawas closed 3 years ago

kigawas commented 3 years ago

For now, if the scope is not correct, it'll just raise an unspecific exception:

async def __call__(self, request: Request, security_scopes: SecurityScopes = None):
    """
    Provides the functionality to act as a Dependency

    Args:
        request (fastapi.Request):The incoming request, this is set automatically
            by FastAPI

    Returns:
        The user object or None

    Raises:
        LoginManager.not_authenticated_exception: If set by the user and `self.auto_error` is set to False

    """

    token = await self._get_token(request)

    if token is None:
        # No token is present in the request and no Exception has been raised (auto_error=False)
        raise self.not_authenticated_exception

    # when the manager was invoked using fastapi.Security(manager, scopes=[...])
    # we have to check if all required scopes are contained in the token
    if security_scopes is not None and security_scopes.scopes:
        if not self.has_scopes(token, security_scopes):
            raise self.not_authenticated_exception

    return await self.get_current_user(token)

This can be improved by introducing another exception. Or maybe we may also improve the class to let users handle the payload themselves?

MushroomMaula commented 3 years ago

Hey, overall I like this idea and have been thinking what would be an elegant way to let the user "hook" into errors. However I don't like adding a new exception just for the case when scopes are missing. If new actions need to be taken, you can also write your own dependency kind of like this:

def required_scopes(request: Request, scopes: List[str]):
    """
    Assures the required scopes are present in the token
    """
    token  = manager._get_token(request)
    if not manager.has_scopes(token, scopes):
         # handle this case
        ...

Please be aware that if you use this, in combination with the manager as a dependency, that the manager should not be added as a dependency using fastapi.Security, as else errors will be raised from the manager, when the required scopes are not met.

If handeling the payload is really needed manager._get_payload can be used to access the body of a token. On the other hand jwt.decode(token, str(manager.secret), algorithms=[manager.algorithm]) can also be used directly.

If you have any suggestions of how to let the user handler errors or other kind of events please let me know.

Maarcosv99 commented 3 years ago

You can create a dependency that takes scopes as a parameter and call request: Request on call, creating your own authentication check.

I have an example, but it was suitable for my application.

from typing import List

from fastapi import Request, Depends
from fastapi.security import OAuth2PasswordBearer

from app.core.config import settings

from app.security import manager, auth

reusable_oauth2 = OAuth2PasswordBearer(tokenUrl=f"/{settings.API_V1_STR}/auth/token/")

async def DataByToken(request: Request):
    if not request.cookies.get(manager.cookie_name):
        raise manager.not_authenticated_exception

    token = request.cookies.get(manager.cookie_name)
    data = manager._get_payload(token)
    data.update({"token": request.cookies.get(manager.cookie_name)})
    return data

class UserPermissions:
    def __init__(self, roles: List[str] = []):
        self.roles = roles

    async def __call__(self, data = Depends(DataByToken)):
        if self.roles:
            for role in self.roles:
                if role in data["scopes"]:
                    user = await auth.get_authenticated_user(data["sub"], role)
                    if not user:
                        raise manager.not_authenticated_exception
                    return user

        raise manager.not_authenticated_exception

I use it this way:

@router.get("/me", response_model=schemas.User)
async def user_profile(user = Depends(UserPermissions(["admin", "student"]))):
    return user
kigawas commented 3 years ago

The exception problem was fixed in #48