kefirjs / kefir

A Reactive Programming library for JavaScript
https://kefirjs.github.io/kefir/
MIT License
1.87k stars 97 forks source link

A stream ending while filtered listeners are listening causes TypeError: object is not a function #119

Closed Macil closed 9 years ago

Macil commented 9 years ago

Basic example:

var Kefir = require('kefir');

var _em;
var vce = Kefir.stream(function(emitter) {
  _em = emitter;
  return function() {};
});

vce
  .filter(function(){return true;})
  .onValue(function(e) {
    _em.end();
  });
vce
  .filter(function(){return true;})
  .onValue(function(e) {
    console.log('received', e);
  });

_em.emit('a');
$ node stream.js 

/Users/chris/Coding/kefir-test/node_modules/kefir/dist/kefir.js:2011
        if (fn(x)) {
            ^
TypeError: object is not a function
    at AnonymousObservable._handleValue (/Users/chris/Coding/kefir-test/node_modules/kefir/dist/kefir.js:2011:10)
    at AnonymousObservable._handleAny (/Users/chris/Coding/kefir-test/node_modules/kefir/dist/kefir.js:1844:24)
    at AnonymousObservable._$handleAny (/Users/chris/Coding/kefir-test/node_modules/kefir/dist/kefir.js:1820:21)
    at callSubscriber (/Users/chris/Coding/kefir-test/node_modules/kefir/dist/kefir.js:838:6)
    at Dispatcher.dispatch (/Users/chris/Coding/kefir-test/node_modules/kefir/dist/kefir.js:868:8)
    at S._emitValue (/Users/chris/Coding/kefir-test/node_modules/kefir/dist/kefir.js:644:25)
    at Object.value (/Users/chris/Coding/kefir-test/node_modules/kefir/dist/kefir.js:1397:10)
    at Object.<anonymous> (/Users/chris/Coding/kefir-test/stream.js:20:5)
    at Module._compile (module.js:456:26)
    at Object.Module._extensions..js (module.js:474:10)

Interestingly, removing the two .filter calls causes the error to not happen.

I know the above code example can be done in other ways besides abusing a stream emitter manually. The example was extracted out of a bigger codebase that had a lot of indirection between each of the steps. The issue isn't limited to that and can also happen with .takeUntilBy:

var Kefir = require('kefir');
var EventEmitter = require('events').EventEmitter;

var em = new EventEmitter();
var stopper = new EventEmitter();
var vce = Kefir.fromEvents(em, 'em')
  .takeUntilBy(Kefir.fromEvents(stopper, 'stopper'));

vce
  .filter(function(){return true;})
  .onValue(function(e) {
    stopper.emit('stopper');
  });
vce
  .filter(function(){return true;})
  .onValue(function(e) {
    console.log('received', e);
  });

em.emit('em', 'a');

Kefir 2.7.0

rpominov commented 9 years ago

Seems like a bug. Will look into it.

rpominov commented 9 years ago

It's actually a quite fundamental problem. At a certain conditions, a subscriber can be called after the end of the stream or after it was unsubscribed.

Consider the loop where we call subscribers (let flatten it to a number of calls):

subscribers[0](value);
subscribers[1](value);
subscribers[2](value);

Now let assume that in the second subscriber we unsubscribe the third unsubscribe:

subscribers[0](value);

subscribers[1](value);
  // unsubscribes subscribers[2], 
  // so `dispatcher.subscribers` is replaced with a new array without that item

// but at this point we still loop over old `subscribers` array, 
// which contains subscribers[2], so we call it even though it unsubscribed
subscribers[2](value);

Similar, but even nastier, when we emit end from a subscriber:

subscribers[0](value);

subscribers[1](value);
  // emits `end`
  // so we call all `end` subscribers, even though current loop isn't done yet
  // endSubscribers[1]()
  // endSubscribers[2]()
  // endSubscribers[3]()

// now we get back to the `onValue` loop, 
// but subscribers[2] also already unsubscribed in response to the end event 
subscribers[2](value);

It doesn't look easy fixable to me at the moment, although I didn't have much time to think about it yet. Have two ideas, but both have some issues. Hopefully I'll be able to get back to it at the end of the week.

PS.

Another reproducing example:

var stream = Kefir.later(1000, 1)

function fn1(x) {
  console.log('got in fn1:', x)
}

function fn2(x) {
  console.log('unsubscribe fn1')
  stream.offValue(fn1)
}

stream.onValue(fn2)
stream.onValue(fn1)

// > unsubscribe fn1
// > got in fn1: 1

As you can see it calls fn1 after it was unsubscribed.

PS2.

Fast fix for your case is to call _em.end(); in the next tick via setTimeout(..., 0) or process.nextTick() if it available.

rpominov commented 9 years ago

Should be fixed in 2.7.1! But it probably won't work as you might expect anyway — the second subscriber won't receive the event. We should fix that too, but it's a breaking change — #120