mostjs / core

Most.js core event stream
http://mostcore.rtfd.io
MIT License
402 stars 36 forks source link

multicast streams can loose their initial start-up events #672

Closed semmel closed 10 months ago

semmel commented 10 months ago

Start-up events of a multicast-ed stream are swallowed by simultaneous events of derived streams.

Demo

var
  M = require('@most/core'),
  MS = require('@most/scheduler'),
  log = t => M.tap(val => { console.log(t, val); }),

  s = M.multicast(M.startWith(1, M.at(2000, 2))),
  a = log("a")(s),
  b = log("b")(M.startWith(0, s));
Promise.all([M.runEffects(a, MS.newDefaultScheduler()), M.runEffects(b, MS.newDefaultScheduler())]);
// s: 1 --- 2
// a: 1 --- 2
// b: 0 --- 2  !! 
// expected b: 0 1 --- 2 

When you remove multicast from s everything is fine: b: 0 1 --- 2.

Surprisingly extending a with startWith "repairs" things too:

s = M.multicast(M.startWith(1, M.at(2000, 2)));
a = log("a")(M.startWith(0, s));
b = log("b")(M.startWith(0, s));
// s:   1 --- 2
// a: 0 1 --- 2
// b: 0 1 --- 2
TylorS commented 10 months ago

Hey @semmel, I'm sorry things aren't totally intuitive around the use of multicast. It pretty reasonable, as its the only operator in the standard library which makes a stream graph impure (i.e. two subscribers might see different things), and will surface design choices most has made for various reasons.

I'll try to explain using the examples you have provided, but this is "to be expected" when it comes to any push-based reactive abstraction, like most, rxjs, etc. when sharing source streams with multiple subscribers.

In the first example, you have "source" stream M.startWith(1, M.at(2000, 2)). A stream which should start with a value of 1 and after 2 seconds, emit a value of 2. This is effectively sugar for the following, and I'll write it like this going forward hopefully for more clarity as I try to pull it apart in more and more detail

const source = M.continueWith(M.now(1), () => M.at(2000, 2))

Let's quickly unpack this. In conjunction with the always-async guarantee in most, M.now(1) will schedule the emission of 1 effectively using

Promise.resolve(1).then((n) => {
  sink.event(scheduler.currentTime(), n); 
  sink.end(scheduler.currentTime())
})

at which point the continueWith operator will intercept the first sink.end() call, to start running M.at(2000, 2) which will effectively run using

setTimeout(() => {
  sink.event(scheduler.currentTime(), 2)
  sink.end(scheduler.currentTime()) // the real end of the source stream
}, 2000)

Then s = M.multicast(source) is utilized to create our stream we can share with multiple subscribers. This is impure because the Stream that is returned utilizes reference counting to determine when to start running your source Stream when the reference count goes from 0 to 1, and disposing of any resources when the reference count goes from 1 back to 0.

Knowing all of this, we can get into the meat of this example, written here in "long form"

const log = t => M.tap(val => { console.log(t, val); })

const source = M.continueWith(M.now(1), () => M.at(2000, 2))
const s = M.multicast(source)
const a = log("a")(s)
const b = log("b")(M.continueWith(M.now(0), () => s))

Promise.all([M.runEffects(a, MS.newDefaultScheduler()), M.runEffects(b, MS.newDefaultScheduler())])

Given all that we know about the parts involved now, lets describe the runtime behavior.

1.) a is subscribed to 2.) a subscribes to s 3.) The reference count within s goes from 0 to 1 4.) s subscribes to source 5.) source schedules M.now(1) for emission, which yields to the event loop 6.) b is subscribed to 7.) b schedules M.now(0) for emission, which yields to the event loop 8.) all sync code has run, so the event loop ticks 9.) source emits 0 through s into a, causing a log message 10.) source schedules M.at(2000, 2) for emission, which yields to the event loop 11.) b emits 0, causing a log message 12.) bsubscribes to s 13.) the reference count within s goes from 1 to 2, keeping the subscription to source stable 14.) Event loop ticks and 2 seconds pass 15.) source emits 2 through s 16.) s emits 2, in the order of subscription, through a first, and then through b second 17.) source signals to s its done emitting events 18.) s signals to a its done emitting events 19.) the reference count in s goes from 2 to 1 20.) s signals to b its done emitting events 21.) the reference count in s goes from 1 to 0 22.) cleanup occurs and process ends

Hopefully amongst these details, you'll see that by the time that the stream b subscribes to the multicast stream s in step 12 above, the event with the value of 1 has already been emitted through the subscription to s from stream a in step 9, leading to the behavior you're witnessing.

Okay, now that we've unpacked all the details of the first example, hopefully its also more clear as to why the second example "repairs" this situation by making a equivalent to b.

1.) a is subscribed to 2.) a schedules M.now(0) for emission, which yields to the event loop 3.) b is subscribed to 4.) b schedules M.now(0) for emission, which yields to the event loop 5.) all sync code has run, so the event loop ticks 6.) a emits 0, causing a log message 7.) a subscribes to s 8.) The reference count within s goes from 0 to 1 9.) s subscribes to source 10.) source schedules M.now(1) for emission, which yields to the event loop 11.) b emits 0, causing a log message 12.) b subscribes to s 13.) the reference count within s goes from 1 to 2, keeping the subscription to source stable 14.) all sync code has run, so the event loop ticks 15.) source emits 1 through s 16.) s emits 1 through a, logging a message 17.) s emits 1 through b, logging a message 18.) source signal its done emitting events to s 19.) s signals a its done emitting events 20.) the reference count within s goes from 2 to 1 21.) s signal b its done emitting events 22.) the reference count within s goes from 1 to 0 23.) cleanup occurs and process ends

TLDR;

All of this said, it is a fairly common kind of issue in applications that have multiple subscriptions. Dealing with it usually involves replaying some number of past events to "late" subscribers, and there exits an implementation for emitting exactly 1 event here - https://github.com/mostjs/hold

Hopefully that was helpful in understanding the results you are getting, but definitely feel free to ask more questions if anything is unclear

TylorS commented 10 months ago

One other piece of advice, not really related to the issue at hand, but on this line in the example

Promise.all([M.runEffects(a, MS.newDefaultScheduler()), M.runEffects(b, MS.newDefaultScheduler())]);

there are 2 separate schedulers being created here. This is definitely not generally recommended. Most's scheduler goes pretty far to ensure the ordering and timing of event as efficiently as possible (both memory and temporaly), and this guarantee is best made when re-using a single scheduler. I usually define an alias something like this

const scheduler = newDefaultScheduler()
const run = <A>(stream: Stream<A>) => runEffects(stream, scheduler)

I'm not positive if it was just a side-effect of making this example on the fly or not, but I figured I'd mention it either way just in case 😄

semmel commented 10 months ago

Hello @TylorS , that is an awesome comprehensive explanation! I still need to invest some quality time to digest your answer.

On the first glimpse I am glad, it's not a bug, but instead it seems I've been sloppy putting too much burden on my stream. I'll come back here to wrap-up this issue explaining my use-case and how it should be done better.

Also I very much appreciate your advice to use a single scheduler! Indeed this was my carelessness.

Thank again for your swift and valuable response!

TylorS commented 10 months ago

If you do have a more concrete use-case, I could likely give better advice on how to handle it. If you have any other questions though, feel free to shoot them my way.

semmel commented 10 months ago

My use-case is:

I combineArray some data streams to my UI state stream. Which I then consume to render the html on the page (via uhtml — but that does not matter).

Obviously, before the html can be rendered (i.e. combineArray produces its first event), every data stream must have produced its first event. Therefore, to get the first page render, I slap in e.g. a startWith("") or merge(now("")) to every data stream which ends up in the UI.

In the concrete case, an original data stream had an optional immediate initial event, depending on a value found in the browser's LocalStorage. I did not model that as a Maybe, but instead (conditionally) merged in a now(localStorageData) or empty stream. Thus the derived UI stream could have now two initial events. (Because startWith("") comes after merge(now(localStorageData)) I figured that it should still work.) To have to examine the sequence of startWith calls, to find out which one "wins" is already a code smell I guess.

In the end, I'll model that data as Stream Maybe (a stream carrying a Maybe of the data), which I'll map(getOrElse(""), startWith(nothing())). That should get me out of the mess.

Takeaway for me:

TylorS commented 10 months ago

@semmel It sounds like you came to the same conclusion I would have made. There's a visible temporal effect generated by startWith/continueWith which can cause missed events. When you're keeping states, using an operator like hold from @most/hold is what I'd recommend when sharing, it's identical to multicast (it actually extends it in implementation too), but given it will always hold onto the latest value and replay it to "late" subscriber, it makes me feel safer against those scenarios where otherwise your screen just stays blank or doesn't update as expected

semmel commented 10 months ago

@TylorS Thanks for recommending hold, I'll use that. (It serves the same purpose like a Property Stream in baconjs — I am used to that.)

I came to really appreciate the lazyness (and thus purity) of most streams (e.g. over baconjs EventStream which always carries some internal state). However, with sharing — as you said — the purity goes away. In my case, that was just a consequence of populating my streams with performance-sensitive web api calls. (Obviously I did not want those to execute for every stream subscriber, but just once.)

Having the ability to add state to a stream via hold (or multicast) definitely helps, but now I'll have to be careful considering the consequences.