Closed vladimir-narkevich-idf closed 1 year ago
I investigated this case in a little more detail the problem started in spring 2.7.6
Same issue here.
same issue with me on spring 2.7.10
After some investigation, I found the issue. The issue lies in the FluxZip handling of the SimpleReactiveRetryPolicy and RetryPublisherHttpClient which zip the retry signal publishers with an unbounded flux range publisher. Now this wouldn't cause any issues with the older versions of reactor-core but reactor released an update in v3.4.25 where they added the discard support for FluxZip and MonoZip.
So now the time at which we see the application as blocked, the cpu is discarding all the published integers (1 to MAX) from the ZipInner subscriber queue.
Long story short, if you are using SimpleReactiveRetryPolicy, you will face this issue with newer versions of reactor.
As a workaround until this is resolved, you can override the SimpleReactiveRetryPolicy retry method to limit the range to a number you think your retry config will never exceed.
@Override
public Retry retry() {
return Retry.from(errors -> errors
.zipWith(Flux.range(1, **MAX_ALLOWED_RETRIES** + 1), (signal, index) -> {
long delay = retryDelay(signal.failure(), index);
if (delay >= 0) {
return Tuples.of(delay, signal);
} else {
throw Exceptions.propagate(signal.failure());
}
}).concatMap(
tuple2 -> tuple2.getT1() > 0
? Mono.delay(Duration.ofMillis(tuple2.getT1()), scheduler)
.map(time -> tuple2.getT2().failure())
: Mono.just(tuple2.getT2().failure())));
}
In order to make this work, you'll also need to override RetryPublisherHttpClient.wrapWithOutOfRetriesLog and do the same thing there. This can be tricky since it is a private snippet in reactivefeign. I had to import the entire package to modify it.
private Retry wrapWithOutOfRetriesLog(ReactiveHttpRequest request) {
return new Retry() {
@Override
public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
return Flux.<Object>from(retry.generateCompanion(retrySignals))
.onErrorResume(throwable -> Mono.just(new OutOfRetriesWrapper(throwable, request)))
.zipWith(Flux.range(1, **MAX_ALLOWED_RETRIES** + 1), (object, index) -> {
if(object instanceof OutOfRetriesWrapper){
OutOfRetriesWrapper wrapper = (OutOfRetriesWrapper) object;
if(index == 1){
throw Exceptions.propagate(wrapper.getCause());
} else {
logger.debug("[{}]---> USED ALL RETRIES", feignMethodKey, wrapper.getCause());
throw Exceptions.propagate(
exceptionPropagationPolicy == ExceptionPropagationPolicy.UNWRAP
? wrapper.getCause()
: new OutOfRetriesException(wrapper.getCause(), request));
}
} else {
return object;
}
});
}
};
}
This requires a change in playtika reactive feign to either allow a configurable retry range or to add support for delegating the discard handling to the queue holder. Hope this helps..
this puts a cap on max retries, I'd rather prefer a fix like this one https://github.com/PlaytikaOSS/feign-reactive/pull/607
I noticed that the Flux.range(1, **MAX_RETRIES**)
should actually be Flux.range(1, **MAX_RETRIES** +1)
. This is because the first time it's running is when the initial call fails and then you start the retries.
@dimaradchenko thanks for pointing out, updated the pull request
hi, i am having a interesting issue, i have written a simple rest controller that makes a call to an external service with feign client. When i enable retry policy i get a successfully response from feign client but my rest controller gets blocked for about 2 minutes if I disable retry policy I receive response without block. My code look like this