kefirjs / kefir

A Reactive Programming library for JavaScript
https://kefirjs.github.io/kefir/
MIT License
1.87k stars 97 forks source link

Re-subscribing after rebinding/destroying the object or variable #226

Closed graforlock closed 7 years ago

graforlock commented 7 years ago

I have an issue that somewhere in my code i'd want to subscribe again, precisely after rebinding or destroying the class, like in the example:

a = Kefir.pool();

a.plug(Kefir.stream(function(emitter) 
{
  return emitter.emit({a: 1})
}));

b = Kefir.pool();

b.plug(Kefir.stream(function(emitter) 
{
  return emitter.emit({b: 2})
}));

c = Kefir.combine([a, b], function(a, b) { return [a, b];});

c.onValue(function(val) {
  console.log(val, '1');
})

c = Kefir.combine([a, b], function(a, b) { return [a, b];});

c.onValue(function(val) {
  console.log(val, '2');
})

This however never happens, and logger logging string '2', never happens. Is there a way to re-subscribe that re-combined stream?

rpominov commented 7 years ago

This is a common issue. You create a stream that emits a value in response to a first subscription.

The example can be reduced to this:

a = Kefir.stream(function(emitter) {
  return emitter.emit({a: 1})
})

a.onValue(function(val) {
  console.log(val, '1');
})

a.onValue(function(val) {
  console.log(val, '2');
})

The problem here is that first subscriber consumes the value and by the time second subscriber is added the value is already emitted, so second subscriber doesn't get it.

There are various solution to this. If you want to completely "reboot" the system, you may consider wrapping everything in a function:

function buildC() {
var a = Kefir.pool();

a.plug(Kefir.stream(function(emitter) 
{
  return emitter.emit({a: 1})
}));

var b = Kefir.pool();

b.plug(Kefir.stream(function(emitter) 
{
  return emitter.emit({b: 2})
}));

return Kefir.combine([a, b], function(a, b) { return [a, b];});
}

buildC().onValue(function(val) {
  console.log(val, '1');
})

buildC().onValue(function(val) {
  console.log(val, '2');
})

Another option is to convert a and b to a properties, then they will remember last values that came through them and eventually both cs will be able to access these values:

a = Kefir.pool();
a_ = a.toProperty();

...

b = Kefir.pool();
b_ = b.toProperty();

...

c = Kefir.combine([a_, b_], function(a, b) { return [a, b];});

...

c = Kefir.combine([a_, b_], function(a, b) { return [a, b];});

One more option is to emit values only after all subscribers are added:

a.plug(Kefir.stream(function(emitter) 
{
  setTimeout(function() {
emitter.emit({a: 1})
}, 0)
}));

... same for b ...

...

See also http://rpominov.github.io/kefir/#current-in-streams

graforlock commented 7 years ago

Thanks.

I think the property conversion does the trick in this case.

The issue is that I have a parent class that passes a stream to a composed-in nested class that is constructor initialised on re-rendering. So parent class is passing down the stream to that class through constructor; and so there are two streams that combine inside the body of that nested class. It also means that the parent stream is a keep-alive, don't die stream that has to retain its current value, whereas the nested class/object's stream is disposable so it is destroyed on render.

So the parent class has to have the ability to replug its stream to those destroyable ones.

rpominov commented 7 years ago

Ok, glad that helped.