Closed thekalinga closed 5 years ago
RateLimiterSubscriber#isCallPermitted
is a blocking operation. So all the calls(+ threads) if any of the threads use the same rate limiter will be blocked (blocking thread is not a good idea to begin with) when rate limiter does not have enough tokens to issueThere are a total of three aspects to rate limit in reactive streams in general
Flux.limitRequest
& Flux.limitRate
?. The only usecase I can think of is sharing a single ratelimiter in multiple places, but I'm not sure how useful this will be]And each of them should be opt-in. Currently (2) is not at all considered as part of rate limiting, (1) & (3) come in a single package (they are not opt-in, but combined), that too by blocking thread
So, the current implementation handles none of the three scenarios correctly
Hi @thekalinga, Thank you for bringing up this conversation. I think we should wait for @madgnome for comment. If I get your ideas right it should be possible to construct your solution on top of Metrics.getAvailablePermissions and RateLimiter. reservePermission
Any how if you want to create PR and demonstrate benefits of your solution, it is more than welcome.
@storozhukBM I'm attempting to modify exiting source to see if I can make the changes. But I've couple of hurdles
wasPermitGranted
I'm attempting to modify it now. Will share an update if I make any progress
Sure, thanks.
It seems that you are mixing 2 different concerns in the same issue and labelling everything broken.
As you have mentioned RateLimiterSubscriber#isCallPermitted
is a blocking operation.
This is not the most efficient solution but this is not wrong and it is compliant with the reactive stream specification since publishers are not blocked (thanks to publishOn()
usage).
When I implemented the reactor API, non blocking API for rate limiter was not available. Now that RateLimiter#reservePermission
is available we may optimize the current solution. I will have a look.
The current reactor implementation follows rxjava implementation, it was implemented this way at the time for consistency with the existing API.
As you have said rate limiting can happen in multiple places: 1) subscription (flux and mono) 2) upstream requests 3) elements flowing downstream (for flux)
The current implementation does 1 and 3.
callRemoteService(request)
.compose(RateLimiterOperator.of(rateLimiter))
callRemoteService
will only be subscribed to if permits are available and elements will only flow if permits are available.
You can achieve something similar to 2 by modifying the chain of operators:
Flux.just(request1, request2...)
.compose(RateLimiterOperator.of(rateLimiter))
.flatMap(r -> callRemoteService(r))
Here callRemoteService
will be rate limited.
The current operator makes more sense with Mono
than it does on Flux
since we would only check permit once on subscription.
For Flux we could change the implementation (or introduce a new operator) to instead rate limit the requests.
Do you have example of usecases where you want to ratelimit? that would help design a better api.
How do we proceed?
I think we should modify rateLimit to rate limit on the subscription and change the implementation to be non blocking using reservePermission
I can raise a PR for it if we agree on the plan.
Yes, a PR would be awesome
I totally agree to what @thekalinga said.
I fixed the Reactor and RxJava implementation. Will be solved in 0.15.0
The new RateLimiterOperator will only limit upstream subscription and not downstream events.
Please ignore this completey, Just look at below comment as it makes more sense than this
Rate limiter is supposed to protect a resource against unexpected number of calls like lets say number of times an external API can called/secMeaning, we guard the actual calls, not the results from external callIn our current Reactor implementation (I assume the same is true for RxJava2 aswell), we are rate limiting the downstream results, not the actual upstream subscription (Refer toMonoCallable#subscribe
)Rate limiter should limit the number of time a method is called, so, forMono.fromCallable
the upstream subscription should be delayed (usingdelaySubscription
) so that the actual method under rate limit never gets executed till a rate limit token is availableIn the current implementation~~
ResilienceBaseSubscriber#onSubscribe
- is a blind request to upstream without any rate limitRateLimiterSubscriber#hookOnNext
- we are rate limiting downstream elements aswell, which is wrong completely, as we should never care about how many elements upstream is producing, we are interested only in rate limiting the upstream calls~~For rate limiting downstream elements within aFlux
, we haveFlux.limitRate
&Flux.limitRequest
So the current limiter is completely broken