spring-projects / spring-data-r2dbc

Provide support to increase developer productivity in Java when using Reactive Relational Database Connectivity. Uses familiar Spring concepts such as a DatabaseClient for core API usage and lightweight repository style data access.
Apache License 2.0
708 stars 133 forks source link

`AfterConvertCallback` breaks the element order #777

Closed adelabd closed 2 years ago

adelabd commented 2 years ago

When we try to get a list of sorted entities, the order is lost if we use the EntityCallback. I can confirm that the order is well respected in the AfterConvertCallback (I put a breakpoint in the onAfterConvert method and I can see each country sorted by countryName) However, as the AfterConvertCallback is making some reactive call, the order is lost at the end and the method findAllByOrderByCountryName does not return an ordered flux anymore.

If I remove the EntityCallback, the sorting is well respected.

Entity country

public class Country {

    @Id
    private Long id;

    @NotNull
    private String countryName;

    @Transient
    private List<City> cities;

}

Country repository using ReactiveSorting

public interface CountryRepository  extends ReactiveSortingRepository<Country, Long> {
    Flux<Country> findAllByOrderByCountryName();
}
public interface CityRepository extends ReactiveCrudRepository<City, Long> {
    Flux<City> findAllByCountryId(Long countryId);
}
@Component
@AllArgsConstructor
public class CountryAfterConvert implements AfterConvertCallback<Country> {

    @Lazy
    private final CityRepository cityRepository;

    @Override
    public Publisher<Country> onAfterConvert(Country entity, SqlIdentifier table) {
        return cityRepository.findAllByCountryId(entity.getId())
                .collectList()
                .map(l -> {
                    entity.setCities(l);
                    return entity;
                });
    }
}
mp911de commented 2 years ago

That's indeed a bug on our side as we use flatMap on Flux when applying the callback.

adelabd commented 2 years ago

Maybe just using flatMapSequential should fix the problem.

mp911de commented 2 years ago

concatMap is the right choice.

adelabd commented 2 years ago

I don't think so. concatMap => Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.

In this specific case, it's not required to wait the previous inner complete before subscribe to the new one. We just need to keep the order at the end, not to be sequential during the execution of each item. flatMapSequential seems to be more appropriate for this use case.

flatMapSequential => Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

mp911de commented 2 years ago

With a flatMapSequential operator we would increase concurrency by number of stream elements * 16, and that impact is much more harmful than delaying subscription/completion. In general, callbacks on result streams aren't intended to create a N+1 problem.