cujojs / most

Ultra-high performance reactive programming
MIT License
3.49k stars 231 forks source link

A newbee's question about how this works at all #204

Closed jeron-diovis closed 8 years ago

jeron-diovis commented 8 years ago

I am novice in reactive streams, and, playing with most.js now, I found that I don't understand something very essential about how streams should work. Hope you can help me a bit.

I'm trying to implement a simple dispatcher and store, which updates it's state on each event. The scan method seems to be exactly what is needed for this:

var dispatch;
var events = require("most").create(add => dispatch = add); 
events.drain();
// I know that there is a "most-subject" package which does exactly what is done above, 
// but don't want to add dependencies here

events.observe(v => console.log("new event", v));

dispatch(1);

var store = events.scan(
  (state, patch) => {
    console.log("scanner", patch);
    // update state here
    return state;
  },
  { foo: 0 }
);

store.observe(v => console.log("store update", v));

dispatch(2);

As I understand, the first observer should be called twice (once per event), and scanner callback and second observer – once each (because they were added after triggering first event).

On practice, however, console shows this:

new event 1
new event 2
store update { foo: 0 }

Scanner is never called, no matter how much events I push in stream. But if I remove first dispatch call (before creating store), everything works just as I expected.

Why is this and what exactly happens here? There should be some fundamental principle which I miss, but reading docs didn't give me an answer so far.

briancavalier commented 8 years ago

Hi @jeron-diovis. "Exporting" the add, end, and error functions out of the create closure isn't a supported use case. The intent of most.create is to represent a self-contained process that generates events. The documentation could probably be more clear on that ... sorry!. I'll add a note about not pulling those functions out of the closure.

In general, most.js takes a declarative, rather than imperative, approach. Reasoning about the timing of imperatively generated events is difficult. But, this is JavaScript, so there are times when you need to adapt to other imperative APIs, etc. In those cases, @TylorS's most-subject is the best solution.

jeron-diovis commented 8 years ago

So I tried to make work a dirty hack) Ok, thanks for clarifying this.

However, with most-subject not everything is so simple. That package provides two functions: subject and holdSubject, the difference is that second one "replays" last X events for each new consumer added. With holdSubject I managed to make my "scanner" work. With subject – nope. I can see the difference from previous approach – events are now processed synchronously with observer.next() calls, but scanner still isn't called:

function test(events, dispatch) {
  console.log(">>>> start test");

  events.observe(v => console.log("new event", v));

  dispatch(1);

  var store = events.scan(
    (state, e) => {
      console.log("scanner", e);
      return state;
    },
    { foo: 0 }
  );

  store.observe(v => console.log("store update", v));

  dispatch(2);

  console.log("<<<< end test");
}

function run() {
  var S = require("most-subject").subject();
  test(S.stream, function (data) {
    console.log("dispatch(" + data + ")");
    S.observer.next(data);
  });
}

run();

produces following output:

>>>> start test
dispatch(1)
new event 1
dispatch(2)
new event 2
<<<< end test
store update { foo: 0 }

I read sources, and found that with holdSubject it works exactly due to it's "replay" feature, in particular here is what happens:

Trace
    at events.observe.events.scan.foo (/Users/jeron_diovis/dev/projects/reactive-flux/src/test.js:11:15)
    at ScanSink.event (/Users/jeron_diovis/dev/projects/reactive-flux/node_modules/most/lib/combinator/accumulate.js:34:15)
    at pushEvents (/Users/jeron_diovis/dev/projects/reactive-flux/node_modules/most-subject/lib/Replay.js:24:10)
    at MulticastSource.replayAdd [as add] (/Users/jeron_diovis/dev/projects/reactive-flux/node_modules/most-subject/lib/Replay.js:31:5)
    at MulticastSource.run (/Users/jeron_diovis/dev/projects/reactive-flux/node_modules/most/lib/source/MulticastSource.js:16:15)
    at Accumulate.run (/Users/jeron_diovis/dev/projects/reactive-flux/node_modules/most/lib/combinator/accumulate.js:63:21)
    at ContinueWithSink._continue (/Users/jeron_diovis/dev/projects/reactive-flux/node_modules/most/lib/combinator/continueWith.js:66:21)
    at ContinueWithSink.end (/Users/jeron_diovis/dev/projects/reactive-flux/node_modules/most/lib/combinator/continueWith.js:54:10)
    at PropagateTask.emit (/Users/jeron_diovis/dev/projects/reactive-flux/node_modules/most/lib/source/core.js:25:7)
    at PropagateTask.run (/Users/jeron_diovis/dev/projects/reactive-flux/node_modules/most/lib/scheduler/PropagateTask.js:36:7)

While with subject steps pushEvents => ScanSink.event => events.observe.events.scan.foo are omitted.

So, my questions here are:

  1. What actually means "running a stream"? What is the difference between "running a stream" and "emitting events by a stream"?
  2. Is the approach with holdSubject the only correct solution I'm looking for?
jeron-diovis commented 8 years ago

Well, it does not matter. I switched to Kefir.js, and it satisfies all my needs.