zebulonj / callbag-subscribe

A callbag sink (listener) that connects an Observer a-la RxJS. 👜
MIT License
18 stars 5 forks source link

Complete #4

Closed aboeglin closed 6 years ago

aboeglin commented 6 years ago

Hi there !

I came about your operator. It seems really good. I just have a question though. I try to do some computation on complete, but the complete is never happening. That is my current pipe :

const unsubscribe = pipe(
    source,
    scan(stateReducer, initialState),
    subscribe({
        complete: () => console.log('complete'),//update({...state, position: 0, currentPan: 0}, element)
        next: state => update(state, element),
        error: error => console.log(error)
    })
);

Calling unsubscribe does not trigger the complete. I checked your code and I don't understand why, I'll probably pull your code in my project to log things there. But I thought maybe you have a hint on what may be happening here.

aboeglin commented 6 years ago

So, after an hour playing with it, I realised that the source needs to push a type 2 for the subscribe operator to receive it. The source you define in your tests helped me figure it out. So the way the sources are made from the callbag-basics packages don't allow for that to work right now.

Here is the code of my source, which is the same as fromEvent, but for HammerJS events :

export const fromHammerEvent = (hammerHandler, name) => (start, sink) => {
    if(start !== 0)
        return;

    const handler = ev => sink(1, ev);

    const talkback = t => {
        if(t === 2) {
            hammerHandler.off(name, handler);
            sink(2); // That line did the trick, it allows the subscribe to call the complete callback
        }
    };

    sink(0, talkback);

    hammerHandler.on(name, handler);
};

Except if I'm missing something I suggest you just add a line specifying that in your readme, as I believe most of the sources in the callbag-basics packages don't call sink(2) when being terminated.

staltz commented 6 years ago

Can you give an example?

aboeglin commented 6 years ago

fromEvent for instance, the code is pretty much the one I posted above except it makes use of addEventListener, but in the definition of the talkback, it's not calling the sink with type 2.

I update that answer in a minute with the code example.

Edit:

const fromEvent = (node, name) => (start, sink) => {
  if (start !== 0) return;
  const handler = ev => sink(1, ev);
  sink(0, t => {
    if (t === 2) node.removeEventListener(name, handler); // Here the sink(2) call is missing
  });
  node.addEventListener(name, handler);
};
staltz commented 6 years ago

fromEvent source should never complete (that is, call sink(2)) because it's a data source that may always have a next event coming up next.

And complete should not be called after an unsubscribe (that's the HammerJS code) because the observer canceled the listening from the source, the source is not notifying about end of data. This is also the case in RxJS, most.js, xstream.

aboeglin commented 6 years ago

Also, I'm not sure, is it going to unsubscribe every observer then ? That's not something we'd want I believe.

aboeglin commented 6 years ago

Well, I thought about the same thing I guess. But if you remove the listener, you won't be getting any more event right ? How would you then handle the situation where you need to unsubscribe and want to throw the source ?

zebulonj commented 6 years ago

Catching up. Agree 100% with @staltz response. I'm not sure what you're referring to in these follow-up questions.

...is it going to unsubscribe every observer...?

Could you give an example? In general, when you subscribe in a (at the end of a) pipe, you're dealing in a single observer and a single subscription. When you dispose of that subscription, the resulting termination ( type === 2 ) is transmitted through all the piped operators and received by the source. If the source was created with something like fromEvent(...), then it probably removes the internal event listener to clean up. If the source is a Subject (or BehaviorSubject), then only the terminating sink is removed. Any upstream sources continue to dispatch data, and any other subscribers on that Subject will continue to receive data.

...want to throw the source...

Explain what you mean?

aboeglin commented 6 years ago

Hi there,

I've never played much with complete on Observables, and I believe my understanding is a bit off. So, a complete should be fired only when the source is never going to push any new data to the stream, ever, to any observer. Is that correct ? In that case I think I just got confused with things and the current implementation of the sources in the callbag-basics must be correct. By throwing the source I meant freeing the memory and allowing GC to kick in.