reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.93k stars 1.2k forks source link

Flux of Mono ResponseSpec (webclient .retrieve) .OnErrorResume unusefull to substitute with a correct Mono when return Mono.error() #3884

Closed skyblackhawk closed 1 month ago

skyblackhawk commented 1 month ago
    @Override
    public Flux<?> getRatesFlux(String logHash, String apiUrl) {
        return ratesReactiveRepository
                .findAll()
                .flatMap(rate -> {
                    log.info("{} rate read from MongoDb: {}", logHash, rate);
                    String url = apiUrl + "/" + rate.getRateName();
                    log.debug("{}, START {} URI: {}",
                            logHash,
                            "controller.ratesServiceImpl.getRatesFlux",
                            url);
                    return webClient
                            .post()
                            .uri(url)
                            .accept(MediaType.APPLICATION_JSON)
                            .retrieve()
                            .onStatus(  HttpStatusCode::is5xxServerError,
                                        clientResponse -> Mono.error("INTERNAL SERVER ERROR for rate " + rate))
                            .onStatus(  HttpStatusCode::is4xxClientError,
                                        clientResponse -> Mono.error("EMPTY for rate " + rate))
                            .bodyToMono(ContentFragmentMainEntity.class);
                });
    }
Below service is called from controller:
        return aemRatesService
                .getRatesFlux(logHash, apiUrl)
                .onErrorContinue((err, i) -> log.error("ERROR LOAD SOME RATE VERIFY LOG"))
                .flatMap(response -> {
                    log.info("OK LOAD " + response.getCodice());
                    return Mono.just("SUCCESS LOAD RATE " + response.getCodice());
                })
                .collectList();

I tried to use .OnErrorResume but block flow return a single value not continue and in this implementation not continue stop after a 505 INTERNAL SERVER ERROR.

How can manage in a Flux a Mono.error to return a Mono.just object and continue in asyncronous flatmap?

Thanks a lot for your support.

chemicL commented 1 month ago

Hey, please have a look at our policy on questions.

If you'd like to file a bug report or enhancement please feel free to do so by opening an issue with minimal dependencies and a reproducible example. The code you provided uses WebClient and I can only guess what your intentions are and what triggers errors in the pipeline. Consider handling individual sub-stream in your flatMap for errors and just return an empty Mono if you have handled the error appropriately, e.g. you can use a combination of .doOnError and .onErrorReturn. It all depends on your particular case and business requirements.

skyblackhawk commented 1 month ago

In my case: .doOnError allow to add a log for example or same action .OnErrorResune substitute result with a value but in Mono in Flux(ResponseSpec).flatmap can't change Mono object.

The one solution that I found is used .OnErrorContinue but I lost error response of webclient 500 and 404.

I want to tell that there are technics to manage exception or error in case of Exception to continue flatmap.

I'm using java21 and SpringBoot webflux 3.2 and Mongo Reactive lon Mongo v7