reactive-streams / reactive-streams-js

Reactive Streams standardization for JavaScript
MIT No Attribution
177 stars 20 forks source link

Basic subscription behavior. #4

Closed benlesh closed 7 years ago

benlesh commented 9 years ago

We need to decide on what a common spec will be for basic subscription. What I know so far from my discussions with @jhusain is that the TC39 spec for observables will look a lot like RxJS's Observables.

The basic API looks like this:


function nextHandler(x) {
  console.log('next: %o', x);
}

function errorHandler(err) {
  console.error('error: %o', err);
}

function completedHandler() {
  console.info('done');
}

// subscription
var disposable = observable.forEach(nextHandler, errorHandler, completedHandler);

// disposal
disposable.dispose();

forEach

Actually subscribes to the observable, if the observable is "cold" it sets up the underlying data stream. The RxJS implementation does this synchronously (which I'm not sure I like). It returns a disposable object with a dispose method on it.

The only problem I have with RxJS's forEach implementation is I think it should be asynchronous by default. (I'm not talking about event emission, only subscription or setting up the underlying stream)

disposable return

The reason for the disposable return and not just a function is that we can get better performance out of leveraging prototypal inheritance and calling the super-class' dispose() method that we can with closures (if we were to only return a dispose function), which are more expensive. In other words, I believe (and @mattpodwysocki can correct me) it creates a new Disposable that subclasses the previous disposable which calls _super() inside of it's dispose method.

The disposal of the stream is the biggest divergent thing from one implementation to the next

I'm really not sure what to do about this, honestly. RxJS returns an object, Bacon returns a function, and Most expects composition to explicitly end a stream programmatically (I think?)

What are your thoughts?

briancavalier commented 9 years ago

The RxJS implementation does this synchronously (which I'm not sure I like). It returns a disposable object with a dispose method on it.

I prefer async start as well, for a couple reasons:

  1. Having some streams emit in the current call stack, and others not, can create hazards for the developer. It's in the same vein as promises never invoking their callbacks in the current call stack.
  2. It made it easier to ensure that the disposable chain was established before the first event is ever processed. For example, if the first event causes the stream to end (e.g. if there's a take(1) in the mix), then the its possible to guarantee the disposer chain is in a valid state.

The reason for the disposable return and not just a function is that we can get better performance out of leveraging prototypal inheritance

Yes, exactly. While I like the idea of just returning a function, using an object helps avoid capturing scope chains. In my mind that's the primary advantage, rather than inheritance, since my personal preference is composition over inheritance to chain disposables.

Most expects composition to explicitly end a stream programmatically

Most.js encourages declarative over imperative wherever possible. Callers typically arrange, using take, until, etc. to lose interest when needed. Imperative event source adapters, like most.create provide a way to for the source to imperatively end an event stream, but otherwise, it must be done declaratively.

var disposable = observable.forEach(nextHandler, errorHandler, completedHandler);

My perspective is that nextHandler has different characteristics than the other two: it is intended to be called zero or more times, while the other two will be called at most once. I think in both RxJS and most.js, a typical sequence would be zero or more nextHandler calls followed by exactly one call to either errorHandler or completedHandler. IOW, the final overall outcome of success or failure is much like a promise.

That was the reasoning behind most.js's forEach only taking 1 function argument, and returning a promise for the final outcome. Callers can observe the promise, using then and catch, to deal with the final outcome. Obviously, returning a promise means that forEach cannot return a disposable. Hence, the preference for declaratively losing interesting.

benlesh commented 9 years ago

Obviously, returning a promise means that forEach cannot return a disposable.

I see this being the biggest point of contention, ongoing. The TC39 spec will be a lot like RxJS and what I outlined above. The open standard should accommodate that spec as much as possible.

So if there were native Observables in the browser that matches this spec, do you think you'd try to have your observables match the interface?

Another side to this is that if ever come up with compelling reasons to change this interface, we can always make suggestions to the tc39 effort.

briancavalier commented 9 years ago

The open standard should accommodate that spec as much as possible.

It seems like the mission of this group should be to find the best possible interop API, e.g. how do we make things like this work: most.of(123).flatMap(Rx.Observable.of). At the end of that process, maybe we'll have decided that the TC39 API is the best fit for interop, but maybe not.

I could see coming at it from at least 2 angles:

  1. Start with the current forEach TC39 spec, and think about alternatives and improvements, such as enforcing that no event/error/end may occur in the same call stack as the call to forEach (ie async start), or passing in an object instead of 3 functions in order to avoid scope capturing.
  2. Think about an interop API from first principles. Besides just bridging event/error/end signals between libs, eg. do we need to consider timestamps, consumer demand (aka backpressure), etc.?

So if there were native Observables in the browser that matches this spec, do you think you'd try to have your observables match the interface?

Since supporting it would be a benefit to users, yeah, I'd consider it. However, I disagree with the current forEach proposal as a primary public API for the reasons I described above: I think it conflates too much into a single signature, and I think it would be better to encourage declarative limits rather than the imperative use of a returned disposable.

briancavalier commented 9 years ago

@blesh I have an idea for an alternative interop proposal. I'll try to put together a quick outline this weekend.

benlesh commented 9 years ago

Great! Sorry I wasn't very responsive. I was at ngconf the tail end of last week. Just getting back into the swing of things. @jhusain was at that conference with me as well. I'm very interested in his feedback here, because he's the one working on the TC39 spec draft.

There was a LOT of talk about this particular issue at ngconf. That is, standard interop and desired behaviors of Observables. Angular 2 may (or may not) ship with a lite version of Observables.

briancavalier commented 9 years ago

@blesh Cool, no worries. I agree @jhusain's feedback would be hugely valuable here.

I ended up not having time to actually write anything down over the weekend, so everything's still in my head :( But I'll try to make time this week.

briancavalier commented 9 years ago

Ok, here's another strawman. This is def intended for interop, and not necessarily as a friendly public API. It's similar to the original proposal above, but uses an subscriber object instead of functions. That removes the need for any capturing when interoperating.

I also tried to capture some of the operational requirements of the subscribe method.

I'm not sold on any of the names here--naming can be another discussion if this proposal seems interesting. I did borrow some of the naming from the reactive-streams-jvm spec

// subscriber
interface Subscriber {
    void event(x)
    void error(e)
    void end(x)
}

// subscribe
// - observable must not call subscriber.event/error/end
//   in the current call stack
// - observable must not call subscriber.event/error/end after calling
//   subscriber.end or subscriber.error
// - observable may call subscriber.event zero or more times.
// - observable may call subscriber.end at most once
// - observable may call subscriber.error at most once
let subscription = observable.subscribe(subscriber)

// forcibly unsubscribe
// Not sure yet what this should return (undefined?)
subscription.unsubscribe()
jhusain commented 9 years ago

Hey Brian,

The proposal I'm working on is nearly identical to yours, the only difference is that generator interface introduced in ES6 for the Observer interface:

{ next(value), throw(e), return(v) }

I think the most logical decision for interop is to use the type that's natively in the language. It looks like we agree on semantics though (with the exception of always subscribing adynchronously).

Do you have time for a hangout today? Say 10:30?

briancavalier commented 9 years ago

@jhusain Hey, cool, thanks for jumping in here!

I think the most logical decision for interop is to use the type that's natively in the language

That makes sense to me. Like you said, they only differ in the method names.

(with the exception of always subscribing adynchronously)

I should try to clarify a bit more what I meant. The way this works in most.js is that a new subscriber is subscribed synchronously (eg literally added to an Array in the multicast case), but no event is ever delivered to that new subscriber in the same call stack. In fact, all events are delivered in a trampoline outside of normal user space (ie in some platform async call stack, like setTimeout, etc).

So far, it has worked out very well because it allows subscription setup to completed in the current call stack before any events are delivered.

Also, Promises/A+ discussions, and real world promise usage has convinced me (and most every promise implementor) that "sometimes sync, sometimes async" is a pretty big footgun for developers.

What timezone are you in? I'm EST. I probably can't do a hangout today, but I can try to make time for one tomorrow if there's a time that works for you. Let me know, it'd be great to chat.

benlesh commented 9 years ago

I'm a proponent of subscription occurring on the next codeblock. This is primarily to reduce the chances of releasing Zalgo.

It's a strong opinion weakly held. I'm willing to be convinced otherwise if the arguments are solid.

briancavalier commented 9 years ago

@blesh I'm not quite sure what you mean by "subscription occurring on the next codeblock".

I think there are two things at play here:

  1. "subscription" - the act of a caller subscribing to an observables events by calling subscribe
  2. the act of the observable propagating an event by calling subscriber.next

I'll try to explain what I mean by using promises as an analog. Calling then(f) on a promise causes f to be placed into an internal queue before then returns (ie in the current call stack). However, f will never be invoked before then returns, even if the promise is already fulfilled. There's no zalgo in that scenario. IOW, zalgo is not caused by placing f into an internal promise queue before then returns.

In contrast, a promise-like implementation in which f may or may not be called before then returned would certain unleash zalgo. The silly, but illustrative classic example is the following, which might log 0 or might log 1.

var i = 0;
promise.then(() => ++i);
console.log(i);

Similarly in the stream/observable case, I don't believe that putting a subscriber into an internal subscription list is a source of nondeterminism (and in fact, may help developers reason about things better--see 2 below). The key is never calling subscriber.next/end/error before subscribe(subscriber) returns. That is, never deliver an event to a subscriber in the same microturn in which it subscribed.

I believe this has 2 advantages:

  1. The code to put the subscriber in a list is simpler than the code to wait a tick and then put the subscriber in a list.
  2. Waiting a tick to put a subscriber in a list may create unintuitive behavior: events that are already in a trampoline could be "missed" by subscribers who are also sitting in a trampoline waiting to be added to a subscription list.

Sorry to be a bit long winded. Did that make sense?

briancavalier commented 9 years ago

Thinking about it a bit more: I think the only thing that matters is never calling subscriber.next/end/error before subscribe() returns. That's enough to avoid zalgo. When the actual subscription "happens" is probably just an implementation detail.

benlesh commented 9 years ago

I'm saying that...

var s = 0;
var observable = Observable.from([1,2,3]);
observable.forEach(x => s + x);
console.log(s);

should never log 6. It should always log 0.

To mitigate this, I'm basically saying that the stream setup that occurs when forEach is called should be deferred to the next frame. However, all emitted "next" events from that observable should happen synchronously (in this case) or asynchronously as necessary.

I think we're talking apples and apples. But I'm not expressing my apples very well.

benlesh commented 9 years ago

In other words we completely agree.

What I'm most interested in is @jhusain's position that it should not be like this. I've heard him mention it several times, but I haven't heard the reasoning. I assume it has something to do with performance? But is performance worth Zalgo? ¯_(ツ)_/¯

jhusain commented 9 years ago

Sorry for the late reply. I'm in Pacific? I can call you at 7:00 AM if you are in Eastern (10AM EST)? If not, can you chat at 7 PM tomorrow? (PST)

JH

On Mar 17, 2015, at 11:41 AM, Brian Cavalier notifications@github.com wrote:

@jhusain Hey, cool, thanks for jumping in here!

I think the most logical decision for interop is to use the type that's natively in the language

That makes sense to me. Like you said, they only differ in the method names.

(with the exception of always subscribing adynchronously)

I should try to clarify a bit more what I meant. The way this works in most.js is that a new subscriber is subscribed synchronously (eg literally added to an Array in the multicast case), but no event is ever delivered to that new subscriber in the same call stack. In fact, all events are delivered in a trampoline outside of normal user space (ie in some platform async call stack, like setTimeout, etc).

So far, it has worked out very well because it allows subscription setup to completed in the current call stack before any events are delivered.

Also, Promises/A+ discussions, and real world promise usage has convinced me (and most every promise implementor) that "sometimes sync, sometimes async" is a pretty big footgun for developers.

What timezone are you in? I'm EST. I probably can't do a hangout today, but I can try to make time for one tomorrow if there's a time that works for you. Let me know, it'd be great to chat.

— Reply to this email directly or view it on GitHub.

jhusain commented 9 years ago

Yes. :) "Don't release Zalgo" falls into the same category as "don't use threads" and "avoid mutation." These are all good practices in general, but we need to preserve the flexibility to opt out of them in order to achieve an acceptable level of performance.

I think there is a solution where everyone can get (pretty much) what they want. Still working on a larger document, because I figure this will come up in the TC-39 as well. Super busy at the moment, appreciate your patience.

JH

On Mar 17, 2015, at 10:13 PM, Ben Lesh notifications@github.com wrote:

In other words we completely agree.

What I'm most interested in is @jhusain's position that it should not be like this. I've heard him mention it several times, but I haven't heard the reasoning. I assume it has something to do with performance? But is performance worth Zalgo? ¯(ツ)

— Reply to this email directly or view it on GitHub.

briancavalier commented 9 years ago

@blesh Great, yep, I think we're effectively saying the same thing.

@jhusain No worries. I could chat from 7-7:30pm PST. Maybe the 3 of us can do a google hangout then?

briancavalier commented 9 years ago

"Don't release Zalgo" falls into the same category as "don't use threads" and "avoid mutation."

I completely agree that these are likely impossible constrains in the general/global case. However, I think we're talking about a very specific case, the instant of subscription, in which the spec can remove a developer hazard.

This was discussed at great length in Promises/A+, and those discussions included several TC-39 members, as well as other very smart folks. The conclusion there was that preventing the hazard at the instant of calling then was worthwhile. There were similar concerns about performance at the time. The most popular current implementations (Bluebird, RSVP, and when.js) have proven that it isn't a concern at all. Similarly, I believe that most.js and kefir prove that it isn't a concern for reactive streams either.

In my experience, the number of calls to subscribe tends to be a drop in the ocean compared to the number of events propagated through a stream. So making subscribe safe, and making event propagation fast seems like the right combo to me.

All of that said, I kind of forgot that my proposal above was intended as an interop API rather than a public API (tho I'm not convinced we could enforce that in JS!) IOW, I'm not sure if allowing subscriber.event/end/error before subscribe() returns would be ok purely for interop, but my intuition is to err again on the side of safety.

jhusain commented 9 years ago

We still on for hangout?

JH

On Mar 18, 2015, at 4:32 AM, Brian Cavalier notifications@github.com wrote:

@blesh Great, yep, I think we're effectively saying the same thing.

@jhusain No worries. I could chat from 7-7:30pm PST. Maybe the 3 of us can do a google hangout then?

— Reply to this email directly or view it on GitHub.

briancavalier commented 9 years ago

I may be a few minutes late, but I can still do 7pst.

briancavalier commented 9 years ago

Email me at brian at hovercraftstudios dot com

viktorklang commented 9 years ago

Sorry for the delayed response here, I've been caught up in a promotion.

May I suggest we first discuss for which reasons the JVM interfaces ended up the way they did (none of it is accidental, a lot of effort was poured into getting the signatures just right, and the spec took a very long time to distill to the essentials). That'd save everyone a ton of time I think.

viktorklang commented 9 years ago

Any news here?

benlesh commented 9 years ago

A lot of this is now being covered at http://github.com/zenparsing/es-observable... Since Observable ES7 spec has some traction. Once that settles down a bit, and RxJS "v Next" is in full swing, this will be the place for either: 1. the interop spec, or 2. a JavaScript client/server implementation of the reactive streams io protocol... thoughts, @viktorklang?

viktorklang commented 9 years ago

@blesh If we can get the reactive-stream-io protocol and TCK in place then interop via it will be extremely nice. But JS-lib-to-JS-lib RS interop would be preferable too (orthogonal). Wdyt?

benlesh commented 9 years ago

It seems like @jhusain's traction with Observable in ES7 will do some of the work of providing an interop spec or starting point between JS libs. Perhaps we should focus this repo on Reactive Streams IO connectivity for JavaScript? I'm unsure. Actually the delineation between Reactive-Streams, ReactiveX and the Reactive-Extensions orgs are very, very confusing to me personally.

viktorklang commented 9 years ago

@blesh I spoke with @jhusain in New York ~1 week ago. The big, open, question for me is how ES7 Observable will deal with back pressure (especially for IO and handoffs between an async boundary). As for reactive-streams-io I think the need for it grows stronger and stronger.

benlesh commented 9 years ago

@viktorklang is it your opinion/position that the current ES7 Observable spec doesn't have the ability to be "updated/upgraded" to support a more RxJava-y approach to backpressure? (via the addition of some sort of request mechanism) I've not looked into it, honestly.

viktorklang commented 9 years ago

@blesh I am not familiar enough with the current ES7 proposal to be sure.

headinthebox commented 9 years ago

Since JavaScript has async await, IMHO it makes much mores sense to use async iterables for back pressure (reactive streams are actually a form of async pull that could not leverage async await)

viktorklang commented 9 years ago

@headinthebox You mean something similar to https://news.ycombinator.com/item?id=9742967 ?

chicoxyzzy commented 8 years ago

it seems like subscription / disposal issue is solved https://github.com/zenparsing/es-observable/issues/48

benlesh commented 7 years ago

This has gone stale forever ago.