reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.97k stars 1.2k forks source link

[Suggestion] Create an operator that merges multiple ordered flux's into a single flow with optional fields for flux's with gaps in there keys #3645

Open vibbix opened 11 months ago

vibbix commented 11 months ago

Combine a Flux.zip-akin operator with a key-selecting variant of Flux.mergeComparing for publishers that should be merged based on keys, for both finite and unbounded sources, of any combination in length.

Motivation

I use Reactor everyday in my data pipeline work, to pretty great success. The lazy operators are amazing at handling complex merge operations across many distinct sources. One of the things I run into however is the case when I am trying to fan-in multiple sources of data that have different lengths. and mismatched (but ordered) keys.

Example use-case

An example of this would be merging in 4 different JSON arrays, where a "match-key" would be missing from some of the sets, or that some of the sets have totally different lengths, and would short circuit early.

I have used Flux.groupBy in the past, but that doesn't work in a unbounded Flux case I tend to create a custom interleave for these situations, but a generic solution would be incredibly helpful.

Desired solution

An example signature for this kind of operator that I have experimented with:

    /**
     * This operator merges 4 different flux's together into a single flux based on matching keys.
     * In the case of a source either not having a matched key, or ending early, an empty optional is returned.
     * The Flux's do not have to be the same length, and may have different(but ordered) keys
     * <br>
     * Each source is read until their end.
     * It's assumed that all the sources are already ordered, and that K is comparable
     * @param <K> the key type; Required to be comparable. The smallest value is picked to combine
     * @param <T1> type of the value from source1
     * @param <T2> type of the value from source2
     * @param <T3> type of the value from source3
     * @param <T4> type of the value from source4
     * @param source1 The first Publisher source to combine values from
     * @param source2 The second Publisher source to combine values from
     * @param source3 The third Publisher source to combine values from
     * @param source4 The forth Publisher source to combine values from
     * @param prefetch the minimum size of the internal queue per flux
     * @return a flux based on the produced combinations
     */
    public static <K extends Comparable<? super K>, T1, T2, T3, T4>
    Flux<Tuple5<K, Optional<T1>, Optional<T2>, Optional<T3>, Optional<T4>>>
    zipOnKeyOptional(Flux<? extends Map.Entry<K,T1>> source1,
                     Flux<? extends Map.Entry<K,T2>> source2,
                     Flux<? extends Map.Entry<K,T3>> source3,
                     Flux<? extends Map.Entry<K,T4>> source4, int prefetch);

Desired output

---
title: s
---
stateDiagram-v2    
    sourceOne --> Combiner
    sourceTwo --> Combiner
    sourceThree --> Combiner
    sourceFour --> Combiner

    state sourceOne {
        s11: (1,1)
        s12: (2,2)
        s13: (3,3)
        s14: (4,4)
        s15: (5,5)

        [*] --> s11
        s11 --> s12
        s12 --> s13
        s13 --> s14
        s14 --> s15
        s15 --> [*]
    }

    state sourceTwo {
        [*] --> [*]
    }

    state sourceThree {
        s31: (1,1)
        s32: (2,2)
        s33: (4,4)

        [*] --> s31
        s31 --> s32
        s32 --> s33 
        s33 --> [*]
    }

    state sourceFour {
        s41: (1,1)
        s42: (3,3)
        s43: (4,4)

        [*] --> s41
        s41 --> s42
        s42 --> s43 
        s43 --> [*]
    }

    state Combiner {
        sc1: 1 [1, null, 1,    1]
        sc2: 2 [2, null, 2,    null]  
        sc3: 3 [3, null, null, 3]
        sc4: 4 [4, null, 4,    4]
        sc5: 5 [5, null, null, null]
        [*] --> sc1
        sc1 --> sc2
        sc2 --> sc3
        sc3 --> sc4
        sc4 --> sc5
        sc5 --> [*]
    }

Test Case

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple5;
import reactor.util.function.Tuples;

import java.util.Map;
import java.util.Optional;

import static java.util.Map.entry;
import static java.util.Optional.of;
import static java.util.Optional.empty;

public class ReactiveUtilsTest {
    @Test
    void testZipOnKeyOptional() {
        Flux<Map.Entry<Integer, Integer>> fluxOne = Flux.range(1,5).map(i -> entry(i,i));
        Flux<Map.Entry<Integer, Integer>> fluxTwo = Flux.empty();
        Flux<Map.Entry<Integer, Integer>> fluxThree = Flux.just(entry(1,1), entry(2,2), entry(4,4));
        Flux<Map.Entry<Integer, Integer>> fluxFour = Flux.just(entry(1,1), entry(3,3), entry(4,4));

        Flux<Tuple5<Integer, Optional<Integer>, Optional<Integer>, Optional<Integer>, Optional<Integer>>>
                actual = Flux.zipOnKeyOptional(fluxOne, fluxTwo, fluxThree, fluxFour, 4);
        StepVerifier.create(actual)
                .expectNext(Tuples.of(1, of(1), empty(), of(1), of(1)))
                .expectNext(Tuples.of(2, of(2), empty(), of(2), empty()))
                .expectNext(Tuples.of(3, of(3), empty(), empty(), of(3)))
                .expectNext(Tuples.of(4, of(4), empty(), of(4), of(4)))
                .expectNext(Tuples.of(5, of(5), empty(), empty(), empty()))
                .verifyComplete();
    }
}

Considered alternatives

  • Flux.groupBy doesn't work in unbounded / infinite publisher situations.
  • groupedFlux's also can't be joined in a structured-concurrency kind of way, like Mono.zip
  • I typically implement these functions by having:
    1. Having each flux be mapped to a marker interface that allows me to apply them to a POJO builder
    2. Merging the Flux's that now are cast to the marker interface with an operator like Flux.mergeComparingDelayError(...)
    3. Use Flux.windowUntilChanged to group the entities
    4. flatMap with reduceWith accumulator to build the tuple out
    5. the mapped object is aligned key wise, and has sane default Optional.empty() for unmatched fields
OlegDokuka commented 11 months ago

Hi, @vibbix!

Thanks for sharing your interesting use case! Am I understanding correctly that you need a zip version which will be zipping until the longest source is done, while the other sources which has ended should return fallback or null value?

Cheers, Oleh

chemicL commented 11 months ago

Wondering whether the existing API ideas can be used to achieve this result with the combinator variants, e.g.

zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)

when made configurable to wait for the last active source instead of terminating upon the first finishing one. The combinator would need to probably return a structure that contains both the output tuple and a decision object that says for particular source whether the value was consumed or should be reused for another zipping round.

The notion of zipping on key is quite limiting potentially.

vibbix commented 11 months ago

@OlegDokuka Having a Flux.zipDelayComplete Being able to zip data sources with default fallback values for mismatched lengths would be very useful too. I tend to hack this together by using along the line of Flux.concat(source1, Flux.create(...)) that produces Map.Entry<Integer,Optional<T>>(Integer.MAX_VALUE,Optional.empty()) until the parent subscription cancels. It involves a lot of unboxing, but I built some static helpers to make it easier.

@chemicL

The combinator would need to probably return a structure that contains both the output tuple and a decision object that says for particular source whether the value was consumed or should be reused for another zipping round.

This would be great, and if there was something like this where I can request that the output tuple "replenish" the producer slot that I consumed, I would build these sort of functions on top of that.

in Considered Alternatives I built out this functionality today using Flux.mergeOrderComparing + flux.WindowUntilChanged with generic Map.Entry's & Marker interfaces, but we lose some type-safety and the concept of which "slot"/source publisher the result comes from.

In my case it's more about keeping a row-level/horizontal data structure in-tact. Making this a built-in Reactor operator would ensure the entire workflow is type-safe, that each source publisher is having it's backpressure dealt with correctly, and that buffer bloat is minimized.

The notion of zipping on key is quite limiting potentially.

It could be a great shortcut for this common use case. I tend to have different types in each zip'd source publisher, and even cases where I read different keys from the same object(although this is certainly a more unusual case). I have been workshopping different method signatures for a couple months, and this is the closest I got to a clean signature for a external implementation. Otherwise, each source flux would need a corresponding Function<? extends T1, ? extends K> key combinator.

chemicL commented 11 months ago

@vibbix would you be so kind to provide some more test cases with some corner cases to help us better understand and consider a possible design? For one, I'm wondering if the keys can appear more than once.

If at all you'd be willing to provide the code for the implementation that works so far that would also be beneficial.

vibbix commented 11 months ago

@chemicL I created a example of what I tend to use now here: vibbix/rx-experiments. The attached README.md has a description of my thought process in the design as well. I have been working on some examples on what a coordinator structure could look like to handle the incoming values as well.

For one, I'm wondering if the keys can appear more than once.

In my design, I assume that any publisher that has multiple of the same keys incoming have to be grouped prior.

chemicL commented 10 months ago

@vibbix thank you. We appreciate your input. We are in the planning process currently and will get back when we have some priorities. Just to get a sense of work involved - are you interested in contributing something once we settle on design or would you expect the team or community to provide an implementation?