Closed himalacharya closed 3 years ago
you must pass an instance of fastapi-jwt-auth to get the current user in the function using dependency injection, for example
def get_user_from_headers(authorize: AuthJWT = Depends()):
# you don't need to check authorization if exists or not because fastapi-jwt-auth will handle it for you
authorize.get_jwt_subject() or get_remote_address
I hope this answer can help you 🙏
It doesn't work. I have used https://pypi.org/project/slowapi/ to rate limit the endpoint based on Ip address. It worked and want to limit based on user. Using authorize.get_jwt_subject() throws an error.
class LimiterClass:
def __init__(self):
self.limiter = Limiter(key_func=get_user_id_or_ip, strategy="moving-window", default_limits=[DEFAULT_RATE_LIMIT])
def get_user_id_or_ip(authorize: AuthJWT = Depends()):
print("here",decrypt_data(authorize.get_jwt_subject()) )
return decrypt_data(authorize.get_jwt_subject()) or get_remote_address
If I remove decrypt_data(authorize.get_jwt_subject()) , it works. Is this issue related to this library or slowapi ?
ahh I see the issue is get_user_id_or_ip() function cannot use dependency injection instead of that you can passing request instance manually to AuthJWT class
def get_user_id_or_ip(request: Request):
authorize = AuthJWT(request) # initial instance fastapi-jwt-auth
authorize.jwt_optional() # for validation jwt token
return authorize.get_jwt_subject() or get_remote_address
final code I tested on my machine
from fastapi import FastAPI, Depends, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from pydantic import BaseModel
class User(BaseModel):
username: str
password: str
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
@AuthJWT.load_config
def get_config():
return Settings()
def get_user_id_or_ip(request: Request):
authorize = AuthJWT(request) # initial instance fastapi-jwt-auth
authorize.jwt_optional() # for validation jwt token
return authorize.get_jwt_subject() or get_remote_address
limiter = Limiter(key_func=get_user_id_or_ip)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.message}
)
@app.post('/login')
def login(user: User, Authorize: AuthJWT = Depends()):
if user.username != "test" or user.password != "test":
raise HTTPException(status_code=401,detail="Bad username or password")
access_token = Authorize.create_access_token(subject=user.username)
return {"access_token": access_token}
@app.get("/")
@limiter.limit("5/minute")
async def homepage(request: Request):
return {"message": "success"}
return authorize.get_jwt_subject() or get_remote_address
Thank you very much
It works fine for unexpired JWT access token. When token expires , it doesn't work. Following error is generated for expired access token.
ERROR:uvicorn.error:Exception in ASGI application
Traceback (most recent call last):
File "C:\****\anaconda3\lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 394, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "C:\****\anaconda3\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "C:\****\anaconda3\lib\site-packages\fastapi\applications.py", line 199, in __call__
await super().__call__(scope, receive, send)
File "C:\****\anaconda3\lib\site-packages\starlette\applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File "C:\****\anaconda3\lib\site-packages\starlette\middleware\errors.py", line 181, in __call__
raise exc from None
File "C:\****\anaconda3\lib\site-packages\starlette\middleware\errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "C:\****\anaconda3\lib\site-packages\starlette\middleware\base.py", line 25, in __call__
response = await self.dispatch_func(request, self.call_next)
File "C:\****\anaconda3\lib\site-packages\slowapi\middleware.py", line 51, in dispatch
return exception_handler(request, e)
File "C:\****l\anaconda3\lib\site-packages\slowapi\extension.py", line 88, in _rate_limit_exceeded_handler
{"error": f"Rate limit exceeded: {exc.detail}"}, status_code=429
AttributeError: 'JWTDecodeError' object has no attribute 'detail'
For expired token, I need to return HTTP 403 Forbidden access.
can you give me detailed information about your code? its error because in JWTDecodeError exception only has message and status_code
In test.py from where app runs:
import uvicorn
from app.main import app
if __name__ == "__main__":
uvicorn.run("test:app", host="0.0.0.0", port=8000, reload=True)
In main.py
from fastapi import FastAPI
import os
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException
from app.utility.error import http422_error_handler, http_error_handler,authjwt_exception_handler
from app.utility.config import API_PREFIX, DEBUG, PROJECT_NAME, VERSION, Secret
from app.api.api_v1.api import router as api_router
from app.core.services.events import create_start_app_handler, create_stop_app_handler
from pydantic import BaseModel
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
from app.core.services import limiter
from slowapi.middleware import SlowAPIMiddleware
private_key = """
-----BEGIN RSA PRIVATE KEY-----
*****
-----END RSA PRIVATE KEY-----
"""
public_key = """
-----BEGIN PUBLIC KEY-----
*****
-----END PUBLIC KEY-----
"""
class JwtSettings(BaseModel):
authjwt_secret_key: str = str(Secret)
@AuthJWT.load_config
def get_config():
return JwtSettings()
def get_application() -> FastAPI:
application = FastAPI(title=PROJECT_NAME, debug=DEBUG, version=VERSION)
application.add_event_handler(
"startup", create_start_app_handler(application))
application.add_event_handler(
"shutdown", create_stop_app_handler(application))
application.add_exception_handler(AuthJWTException, authjwt_exception_handler)
application.add_exception_handler(HTTPException, http_error_handler)
application.add_exception_handler(
RequestValidationError, http422_error_handler)
#Rate limit on endpoint using slowapi adapted from https://github.com/laurents/slowapi
application.state.limiter = limiter.LimiterClass().limiter
application.add_exception_handler(limiter.RateLimitExceeded,limiter._rate_limit_exceeded_handler)
application.add_middleware(SlowAPIMiddleware)
application.include_router(api_router, prefix=API_PREFIX)
return application
app = get_application()
In app.core.services , limiter.py file is as
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from app.utility.config import DEFAULT_RATE_LIMIT
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from fastapi_jwt_auth import AuthJWT
from app.core.security.security_utils import decrypt_data
class LimiterClass:
def __init__(self):
self.limiter = Limiter(key_func=get_user_id_or_ip, strategy="moving-window", default_limits=[DEFAULT_RATE_LIMIT])
"""
Method : Get user_id for JWT access token , IP address for not having token
@Param :
1. request : type-> Request
Return: User id or IP address
"""
def get_user_id_or_ip(request : Request):
authorize = AuthJWT(request) # initial instance fastapi-jwt-auth
authorize.jwt_optional() # for validation jwt token
return decrypt_data(authorize.get_jwt_subject()) or get_remote_address
After applying ratelimit, it doesnot show AuthJWT excception message for expired token instead shows error message as
AttributeError: 'JWTDecodeError' object has no attribute 'detail'
But it throws an error for unexpired refresh token also. For unexpired refresh token , same error generates as for expired access token. But AttributeError is
AttributeError: 'AccessTokenRequired' object has no attribute 'detail'
Code works fine if I ratelimit based on only IP address.
Can you share the code in the authjwt_exception_handler function? I tested on my machine it works very well
async def authjwt_exception_handler(request: Request, exc: AuthJWTException):
return JSONResponse(
status_code=401,
content={
'data': None,
'code': 401,
'status': False,
'message': exc.message
}
)
I don't know exactly why the error is appearing, from me the code its good and should be run very well
Problem in my application is due to AuthJWT Exception detail message. When expired token is provided then it tries to find user but got AuthJWT exception. However, in the above code you provided as an example, ratelimit doesnot work after the signature has expired. If I made too fast clicks, it should ratelimit that user. But always following message comes
Ratelimit doesn't work. Here is the code based on your above code
from fastapi import FastAPI, Depends, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from pydantic import BaseModel
import uvicorn
from datetime import timedelta
class User(BaseModel):
username: str
password: str
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
@AuthJWT.load_config
def get_config():
return Settings()
def get_user_id_or_ip(request: Request):
authorize = AuthJWT(request) # initial instance fastapi-jwt-auth
authorize.jwt_optional() # for validation jwt token
return authorize.get_jwt_subject() or get_remote_address
limiter = Limiter(key_func=get_user_id_or_ip)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.message}
)
@app.post('/login')
def login(user: User, Authorize: AuthJWT = Depends()):
if user.username != "test" or user.password != "test":
raise HTTPException(status_code=401,detail="Bad username or password")
access_token = Authorize.create_access_token(subject=user.username,expires_time= timedelta(
minutes=1) )
return {"access_token": access_token}
@app.get('/user')
@limiter.limit("5/minute")
async def user(request:Request , Authorize: AuthJWT = Depends()):
Authorize.jwt_required()
current_user = Authorize.get_jwt_subject()
return {"user": current_user}
@app.get("/")
@limiter.limit("5/minute")
async def homepage(request: Request):
return {"message": "success"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
Yeah, it should doesn't work if the signature has expired you should refresh the token before accessing the endpoint, authorize.jwt_optional validate the token before getting id or IP on ratelimit
btw the example above little bit wrong when get id or ip for ratelimit library, you should return request.client.host directly because if from function its return callable instance
def get_user_id_or_ip(request: Request):
authorize = AuthJWT(request) # initial instance fastapi-jwt-auth
authorize.jwt_optional() # for validation jwt token
return authorize.get_jwt_subject() or request.client.host
you can see the storage on limiter to check you doing right or not
@app.get("/user/me")
@limiter.limit("5/minute")
async def v_2(request: Request):
print(request.app.state.limiter._storage.storage) # to check storage on limiter
return {"message": "success"}
Ok . A last query on this issue to my application work: If validation jwt token fails then return request.client,host, it could solve the problem in someway
def get_user_id_or_ip(request : Request):
authorize = AuthJWT(request) # initial instance fastapi-jwt-auth
authorize.jwt_optional() # for validation jwt token
###Needs a logic here if validation fails i.e. AuthJWTexception then return request.client.host
#If validation works then return user
#If no token return request.client.host
return decrypt_data(authorize.get_jwt_subject()) or request.client.host
How to write a logic to check if authorize.jwt_optional() validates or AuthJWTexception?
There is some approach that can be done, e.g like this
def get_user_id_or_ip(request: Request):
authorize = AuthJWT(request) # initial instance fastapi-jwt-auth
try:
authorize.jwt_optional() # for validation jwt token
return authorize.get_jwt_subject() or request.client.host
except AuthJWTException:
return request.client.host
There is some approach that can be done, e.g like this
def get_user_id_or_ip(request: Request): authorize = AuthJWT(request) # initial instance fastapi-jwt-auth try: authorize.jwt_optional() # for validation jwt token return authorize.get_jwt_subject() or request.client.host except AuthJWTException: return request.client.host
Thank you
I have consulted with author of slowapi about above method and got following feedback raising security implication, https://github.com/laurentS/slowapi/issues/31#issuecomment-758260137
Yeah you have a security issue since you except the error from jwt and mixing access control into rate-limiting, but it's okay because you validate again in your endpoint
# error from jwt it's excepted which has a security impact
def get_user_id_or_ip(request: Request):
authorize = AuthJWT(request) # initial instance fastapi-jwt-auth
try:
authorize.jwt_optional() # for validation jwt token
return authorize.get_jwt_subject() or request.client.host
except AuthJWTException:
return request.client.host
# but you validate again in your endpoint
@app.get("/user/me")
@limiter.limit("5/minute")
async def v_2(request: Request, authorize: AuthJWT = Depends()):
authorize.jwt_required()
print(request.app.state.limiter._storage.storage)
return {"message": "success"}
For ratelimiting the endpoint I want to use two approaches i)based on IP address (unprotected endpoint, no JWT access token)
It works fine
2 ) based on current user, current user has to be retrieved from JWT access token. JWT access token is created using this fastapi-jwt-auth and user is in get-jwt-subject.
Doing this, I couldn't find current-user. How to find current_user if request.headers has authorization?