kefirjs / kefir

A Reactive Programming library for JavaScript
https://kefirjs.github.io/kefir/
MIT License
1.87k stars 97 forks source link

[question] How does ".combine" work? #224

Closed jeron-diovis closed 7 years ago

jeron-diovis commented 7 years ago

Here is very simple snippet, ready-to-run in console:

console.clear()

// ---

log = str => x => {
    console.log(str, x)
    return x
}

// ---

createSampler = () => Kefir.constant("start")

createS1 = initial => {
    console.log("create S1")
    return Kefir.constant({
        state: initial + "_s1",
        error: initial + "_e1",
    })
}

createS2 = initial => {
    console.log("create S2")
    return Kefir.constant({
        state: initial + "_s2",
        error: initial + "_e2",
    })
}

// ---

combine = streams => {
    const state$ = Kefir.combine(streams.map($ => $.map(x => x.state)))
        .map(log("state"))

    const error$ = Kefir.combine(streams.map($ => $.map(x => x.error)))
        .map(log("error"))

    return Kefir.combine([ state$, error$ ])
}

// ---

function test1() {
    console.log("--- test1 ---")
    const sampler$ = createSampler()
    return combine([ sampler$.flatMap(createS1), sampler$.flatMap(createS2) ])
}

function test2() {
    console.log("--- test2 ---")
    const sampler$ = createSampler()
    return sampler$.flatMap(x => {
        return combine([ createS1(x), createS2(x) ])
    })
}

// ---

test1().log("[test1]")
test2().log("[test2]")

console.log("--- done ---")

It's output is:

--- test1 ---
create S1
create S2
state ["start_s1", "start_s2"]
[test1] <end:current>
--- test2 ---
create S1
create S2
state ["start_s1", "start_s2"]
error ["start_e1", "start_e2"]
[test2] <value:current> [Array[2], Array[2]]
[test2] <end:current>
--- done ---

Why test1 does not emit value, although in both cases S1 and S2 streams was successfully created?

Alternatively, I tried to define my combine function like this:

combine = streams => {
    const combo$ = Kefir.combine(streams)

    const state$ = combo$.map($ => $.map(x => x.state))
        .map(log("state"))

    const error$ = combo$.map($ => $.map(x => x.error))
        .map(log("error"))

    return Kefir.combine([ state$, error$ ])
}

And it became even worse:

--- test1 ---
create S1
create S2
state ["start_s1", "start_s2"]
[test1] <end:current>
--- test2 ---
create S1`
create S2
state ["start_s1", "start_s2"]
[test2] <end:current>
--- done ---

Could you please explain in details, what actually going on in this example? Am I doing something I should never do?

rpominov commented 7 years ago

Hey! It's not so easy to explain or even understand what is happening in this example. There're two things that complicate it: 1) you use Kefir.constant() observables which are kinda corner case, 2) also you create diamond shape with combine. Each of these may need separate explanations. I'll try to explain though.

Diamond shape

Classic diamond shape can be created like this:

const stream = ... // some stream
const diamond = Kefir.combine([stream.map(f), stream.map(g)])

If we draw a dependency graph it will look something like this:

      stream
        /\
       /  \
 .map(f)  .map(g)
      \   /
       \ /
      diamond 

It's good advice to try to avoid diamonds because of glitches: #97

And here is what you're doing in test1:

               sampler$
                 / \
                /   \
.flatMap(createS1)  .flatMap(createS2)
         \       \/       /
          \      /\      /
           \    /  \    /
            \  /    \  /
          state$    error$ 
              \      /
               \    /
                \  /
               result

It's like a very beautiful diamond, that is actually very bad, haha.

One thing to note here is that flatMap(x => Kefir.constant(x + 1)) is the same as map(x => x + 1), so if you would't use Kefir.constant in createS1/createS2 you'd avoid at least one layer of this complicated diamond structure.

Kefir.constant() corner case

What makes the two examples work differently though is that in first one you essentially do:

const stream = Kefir.constant(...).flatMap(...)
Kefir.combine([stream, stream])

But in the second one:

const property = Kefir.constant(...)
Kefir.combine([property, property])

Kefir.constant(...) creates a property, but .flatMap(...) always returns a stream. And what you get in you particular example is a "bad" stream, because it has only one value that it emits synchronously. With a stream like this only first subscriber gets the value, and Kefir.combine() subscribes to it twice but get the value only from one version of the stream.

This problem described in the docs http://rpominov.github.io/kefir/#current-in-streams and was discussed a lot in issues.

jeron-diovis commented 7 years ago

I used Kefir.constant() here just as the simplest example of property. In my real code all that constants are normal property observables (so I can't skip any layers in my "diamond"), but it's extremely important for me that the first emitted value from sampler$ passes through entire structure and in the end result observable also emits.


As for diamond shapes, I understood the problem with them. But sometimes they seems quite useful. Let say I have observables A and B containing structures of the same shape:

{ state: Object, handler: Function }

State can change, handler is constant (and does not depend from state). I want to combine them like this:

{
    state: { ...state_A, ...state_B },
    handler: () => handler_A() && handler_B(),
}

I can easily do this in combinator function. But if handlers never change, why do I need to create new combined handler each time? Instead I can just combine combinedState$ and combinedHandler$.take(1).

How to get same result without a diamond? Probably, with some kind of memoization for combinator function it's possible, but anyway it will leak out of just streams abstraction.


The most importnant thing I realized after reading your answer and related links is that I had totally underestimated (or misunderstood) the value of .toProperty() method. I always used it only as startsWith, providing getCurrent function – while it has a much more power. And I missed this advice from doc:

Also it's a good practice to convert all streams that might emit current values to properties by using the toProperty method.

In my example all that was needed to make it work is:

combine = streams => {
    const combo$ = Kefir.combine(streams)
        .toProperty()                       // <-----

    const state$ = combo$.map($ => $.map(x => x.state))
        .map(log("state"))

    const error$ = combo$.map($ => $.map(x => x.error))
        .map(log("error"))

    return Kefir.combine([ state$, error$ ])
}

Now it emits each time when Kefir.combine subscribes to it. And works even with all those shiny diamonds.

I'll close this, as my problem is solved :)