reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.92k stars 1.19k forks source link

Implementation of static method "limitRate" to Flux that deals with Publisher from BiFunction with exposed offset and limit variables #2154

Closed maxim-bandurko-lsvt closed 3 years ago

maxim-bandurko-lsvt commented 4 years ago

Motivation

@simonbasle Proposing to implement additional variation of limitRate operator in the static method way, that accepts ability to pass BiFunction that returns Publisher to be subscribed to and exposes 2 variables calculations that can be passed to this publisher originator: long offset and long limit. Limit Rate will be passing data from Publisher in batches and returns new Flux.

@mp911de This enhancement will be beneficial, as allows to control how many items were produced, and how many items can be produced more by passed stream reference that BiFunction returns.

This will allow to have different use cases, but most commonly used will be streams that deals with databases, as having limit and offset values calculations that come from Flux signals, will allow to use them inside database queries and split process of data retrieval to rate limited batches.

Also that can be the first step for Spring Data in implementation of Pageable, that listens to native Flux signals. In case of WebFlux, that mostly uses Pageable from PageRequest, it may allow to change logic to native Flux requests qty. But in case of RSocket, with persistent streams, this new Pageable that right now can be generated inside function provided to static limitRate, will allow be utilized right before streams that deals with databases. That will secure server from breaking because of non-controllable high amount originated request numbers that may overload database usage.

Sure, actual proposal to "Flux Pageable" (or can say "Reactive Pageable", or some other term) that can deal with this static method should go to other Spring project, as in final result, Spring Data Repositories should be able automatically deal with this method, so that will be other topic for enhancement. But wanted to give basic example, that we can get once this static method be implemented to Reactor Core with existing Spring Data logic that we have:

Flux.limitRate(
    (long offset, long limit) -> {
        Pageable pageable = FluxRequest.of(offset, limit);
        return repository.findAll(pageable);
    },
    100
)

In my example, I have custom FluxRequest class that generates Pageable that allows to pass offset qty instead of page number (in case of simple demo, this class extends AbstractPageRequest and overrides getOffset method that works directly with passed offset value).

That is very basic example of usage of this limitRate static method that will allow to be plugged right before repositories streams calls. I've used this method with R2DBC and Elasticsearch, and that simplified the developing workflow a lot for me (at my FluxRequest I've utilized both variants offset and page number, in this case I can just skip to certain point of "start from" for data and after that just doing pages for next batches).

More agnostic example:

Flux.limitRate(
    (long offset, long limit) -> {
        return ............ some Flux .............;
    },
    100
)

My example of this method was based on @OlegDokuka enhancement proposal of `limitRate' https://github.com/reactor/reactor-core/pull/1879 that allows to not use prefetch, so would be nice to have this static method in several more variants too.

For future, next enhancement will be good of having implemented real reactive pageable that can work with native Flux signals, so developer won't be needed to generate new one Pageable for each batch request from limitRate. I can come and write this proposal also.

Desired solution

Considered alternatives

Additional context

mp911de commented 4 years ago

From a database perspective I’m missing the link where multiple queries actually make sense. Reactive drivers provide backpressure-aware streaming capabilities out of the box (MongoDB, R2DBC, Cassandra, Elasticsearch). Issuing multiple queries bypasses driver optimizations.

Prefetch translates pretty well to batch size/fetch size which is available on almost every driver.

True pagination has its place in reactive data access. Pagination for the sake of consuming a lot of data is an antipattern.

I’d like to learn more about the cases where this type of operator was used.

maxim-bandurko-lsvt commented 4 years ago

@mp911de Ok, let me provide more explanation and use cases on all this.

With WebFlux was straight forward of using reactive drivers with all out of box functionality, as all connected to request that spawns a stream for a short period of time. But when it comes of using persistent Flux streams that can be established and consumed data partially per demand (situation with RSocket as an example), everything got to be not so easy to be implemented as to not get server break with querying huge data sets (it is anti-pattern, but there are not so many applications that not deals with huge data sets in databases).

With persistent streams client can deals with huge amount of data to consume, but the actual process of consuming can be very time long. Client can request at one time 100 items, and in couple of mins more 10 items, and in 20 mins - 1k more items and in 5 mins later - 20 items, and so on, but stream should be always active and stay alive.

Here are several examples of use cases that I got trapped once started implementing RSocket transport to application.

  1. Elasticsearch

Provides very good access to huge data sets utilizing scroller, but requires data to be cached at server in stream that takes server resources. With persistent stream, once stream subscribed and got first request, it can stay in memory for a very long time till client cancel it or drops connection transport (can be hours, days, months...).

Idea of canceling the stream after some period of time is not good one, as it breaks the main concept logic and requires the way to re-stream with resume functionality (that can be sometimes very problematic in certain situations). So the only way is do massive prefetch or split the driver access to database into multiple queries, that will smooth server resources usage. That happened in my situation, when I had to implement the RSocket client be able to list all users from database in almost "infinity" way (front end using RSocket transport and was requesting certain small amount of more items to show at browser per mouse scroll event, mostly like 5-10 items per event). Data set was around 3 million records of users and scroller at Elasticsearch did a good job with prefetching all data to server, but it was consuming too much resources, that even data in average, had very low consuming rating by client ( < 1% were really used). In other words - server had to store data in memory that mostly won't be populated to client (not talking about bandwidth usage between server and database).

At those moment I had to use Pageable with Elasticsearch query and split stream requests onto batches not canceling the transport stream with client. I've modified limitRate as to be able to generate the Pageable based on consumed data as an offset and requested data (including pre-fetch value as to utilize scroller better and not have overloading the database with offsets in queries). That approach worked very good, as it was prefetching only 10k items per batch and I could play with prefetch rate as to get better server resources consumption.

  1. R2DBC

Has very huge issue with retrieving even not so big data set within one simple SELECT query (of course, everything is related to amount of columns that should be inside rows). In my situation it even failed with a stream of 1k rows.

Right with Elasticsearch data stream transport, on most beginning, I got to issue of how to migrate data from SQL to Elasticsearch dynamically and only with Spring server, so that migration procedure can run when it is needed natively by server. It is not possible to initialize a stream for 3 million records from SQL. Had to split it to batches using Pageable.

After that I was still in need to have a persistent stream from client and server be up for application, and have the ability to populate data per demand not prefetching whole stream from SQL. Same as I described in Elasticsearch paragraph, but with R2DBC I could have different patterns limiting rate for driver. I shouldn't do prefetch at all (as data constantly changes and can't populate old version of data from cache). I had to limit the rate to be highTide 100 only.

So that was one more use case, when you can't have prefetch in persistent stream and server has to surf only "fresh" data from SQL database.

If I am missing something, of how reactive drivers that we have at current moment can deal with native backpressure signals, I could be wrong with my use cases, but I wasn't able to find other ways of how to have drivers be "lazy" with data querying. And as I mentioned about WebFlux, when it comes to developing client applications to use persistent transport with RSocket (example, javascript application that connects to server via websockets, and both of them have Reactive Streams implementation), whole design pattern of data access logic at server changes to different direction. With such symbiosis of Reactive Streams at font end and back end, it requires to have everything to be as "lazy" as possible, and keeping the same stream that starts from database driver and ends in UI render process always up and hot for a very long time.

Also, here some use cases that are not connected with databases:

Let me know if needed some more use cases.

mp911de commented 4 years ago

Thanks for the detailed answer and for surfacing the issues that come with long-lived subscriptions. Keeping a server-side cursor open/connection busy for a long time is indeed an issue. Drivers work regarding backpressure pretty different already. MongoDB and Cassandra fetch a chunk of data into memory and emit items as they are requested. R2DBC, Redis, and Elasticsearch bring backpressure onto the transport channel by suspending reads. I can see in long-running subscriptions why this is a problem.

Probably there's no ideal approach though. Using pagination to minimize resource contention requires draining results from the resource and re-emitting those (collectList().flatMapIterable(Function.identity())) on the calling side. So from that perspective, the suggested operator makes sense. There should be a notion of when the overall stream terminates (e.g. when the resulting Publisher terminates empty).

maxim-bandurko-lsvt commented 4 years ago

Agree. Thing is that long live subscriptions will come more spread very soon, and would be really nice to have at least near ideal approach. On that, I have one more proposal for Reactive Pageable, that can be plugged at least to some drivers and will allow to work with these scenarios. For this pageable can be added additional option for "re-streaming" long live subscriptions (that is needed when RSocket connection was lost for some period of time and having servers scaled horizontally that not allows to use native RSocket "resume"). This option can include some kind "shift" value that can be passed to pageable, as to skip the certain portion of data in no-prefetching way (using same limit/offset).

Any way, mostly all proposals comes to implementation of this static method to reactor as a starting point. And may be, as for design pattern, would be even better to name this method as "fromLimitRate" or something like that (as it tells that method creates new Flux).

simonbasle commented 3 years ago

there is currently no bandwidth to address that issue, and it does need quite some exploration and design. if you still think this is very important and have ideas that you'd like to explore, I'd encourage you to do so in a dedicated experiment repository, creating your own operator (to be applied to Flux via transform). depending on how it goes, you can then either revive this discussion, incorporate it in your own codebase or invalidate the idea.

maxim-bandurko-lsvt commented 3 years ago

Had updated proposal with more agnostic example:

Flux.limitRate(
    (long offset, long limit) -> {
        return ............ some Flux .............;
    },
    100
)

As my original idea was just to be able to construct merged Fluxes based on offset and limit variables.

simonbasle commented 3 years ago

it is still a bit unclear to me. I would be interested in a more specification-like description of the operator's inputs and outputs:

limitRate is a way to rewrite the downstream requests and divide them into smaller batches. I'm not sure this concern applies here. I have a feeling that what you are trying to achieve is "generate a Flux given the current pending demand and overall number of produced items.

Another potential issue I see with this generalizing proposal is offset: even considering pagination alone, some use cases might be interested in the number of pages so far rather than the number of elements extracted from all pages so far...

So I'm wondering if we can actually come up with a generic enough yet useful enough abstraction.

maxim-bandurko-lsvt commented 3 years ago

@simonbasle Sure, you are correct:

I have a feeling that what you are trying to achieve is "generate a Flux given the current pending demand and overall number of produced items.

Going to provide more details and explanations.

My proposal came of one operator that Oleg was trying to implement couple years ago at: https://github.com/reactor/reactor-core/pull/1879 . But time passed and Reactor got a lot of changes, so looks like right now my original explanation can be more confusing.

Main idea was to have the same limitRate operator version (that allows to divide requests into batches), but with additional option to provide a function that accepts 2 variables: offset and limit:

Function has to return the generated Flux batch. Once produced items in current batch reached the limit, operator generates new batch and provides to function the new offset value of total produced items and same limit value, and so on, till some batch produce the complete signal and Flux will be completed. Marble diagram for this will be exactly the same like you have for fromPaginated: https://github.com/reactor/reactor-core/blob/75236dcc98ad035e387962d0f778b984225920cf/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/fromPaginated_unbounded.svg

Can say that the better title for this operator would be something like fromBatched. Here is how it can be described in Flux class:

    public static final <T> Flux<T> fromBatched(BiFunction<Long, Long, Flux<? extends T>> supplier, long offset, long limit) {
          return new FluxFromBatched<>(supplier, offset, limit);
    }

Example of using it:

Flux.fromBatched(
    (long offset, long limit) -> {
        return ............ some Flux .............;
    },
    100,
    50
)

value 100 is an original offset for first batch and value 50 is the limit that will be passed all the time to supplier function.

I understand that could be 2 types of usage for such "batched" operator, but qty of items produced is a base number that can be used to calculate a page number using org.springframework.data.domain.PageRequest. In this way operator can be used to all possible usecases.

My custom implementation of it also has such method:

    public static final <T> Flux<T> fromBatched(BiFunction<Long, Long, Flux<? extends T>> supplier, long offset, List<Long> limit) {
          return new FluxFromBatched<>(supplier, offset, limit);
    }

That accepts limit as a list of limit values to each batch part index. Example:

Flux.fromBatched(
    (long offset, long limit) -> {
        return ............ some Flux .............;
    },
    100,
    List.of(50, 500, 5000)
)

The flow will be: batch 1 - will produce 50 items, batch 2 - 500 items, batch 3 and other batches - 5000 items.

maxim-bandurko-lsvt commented 3 years ago

@simonbasle I can come up with operator implementation draft for it if you want to include it to Reactor Core