kefirjs / kefir

A Reactive Programming library for JavaScript
https://kefirjs.github.io/kefir/
MIT License
1.87k stars 97 forks source link

Add .end() to Kefir.pool() #137

Open cefn opened 9 years ago

cefn commented 9 years ago

I've got a puzzle with a framework I'm putting together, where I would like to be able to properly dispose of promises when they are no longer needed (and unsubscribe from the relevant streams).

I believe a subscription is made in order to back the Promise, but when program logic establishes that the promise is no longer needed I can't find anything which can be done to tidy up and unsubscribe from the stream, as the onX handler is hidden within Kefir, meaning the stream would remain subscribed indefinitely with all its underlying resources committed.

If there is some way to e.g. trigger Promise rejection/fulfilment which could cause unsubscription, or to get hold of the subscribed value- and end-handler(s) to manually unsubscribe, that would be very useful to know.

Equally, perhaps there's a Kefiric way to tackle this with a minimum of weirdness (perhaps creating some kind of derived stream, creating the Promise from the derived stream, then manipulating the derived stream causing the unsubscription). I haven't been able to pin down the right transformation of a stream which would permit this, yet. Pool would be promising, but it never ends, so I can't externally induce the toPromise to complete.

If there is no obvious way, perhaps this could be a feature request for there to be some channel for unsubscription and termination of Promises.

rpominov commented 9 years ago

Promises are broken. They don't have any mechanism for clearing resources. I wish promises work same as observables — when all consumers unsubscribe your callback is called, allowing you to free resources. But unfortunately we can't even unsubscribe from a promise :(

We could manually attach a .kefirDispose() method to the result promise. But then we'll have a problem, that anyone who has the promise will be able to dispose.

Here a solution I came up with, that uses current APIs:

function createEcho(source) {
  var unsub = null
  var stream = Kefir.stream(function(em) {
    if (source === null) {
      return function() {}
    }
    source.onAny(em.emitEvent)
    unsub = function() {
      source.offAny(em.emitEvent)
      unsub = null
    }
    return unsub
  })
  return {
    stream: stream,
    disconnect: function() {
      if (unsub) {
        unsub()
      }
      source = null
    }
  }
}

// -----------------------------------------------------------------------------

var source = ... // some stream
var echo = createEcho(source)
var promise = echo.stream.toPromise()

echo.disconnect()
promise = echo = null

// (I didn't test it)

Maybe it's a good idea to add createEcho to the core, or maybe someone could create a separate npm module with it.

cefn commented 9 years ago

It's not unreasonable to expect stream termination to be explicit, but controllable, through an intermediate stream which exposes that control, but currently the API doesn't really allow for streams where you can inject events, except through a kind of hack using Kefir.stream's callback to export the emitter as shown in the below example.

The intent is roughly illustrated by the steps below although this isn't a suggested API signature.

The outputUnsubscribe function always runs, regardless if the Promise is hit or not. I think in the case setTimeout is 100 ms, the Promise triggers unsubscribe, and in the case setTimeout is 1000, the outputPipe.emitter.end() call triggers it.

Some API options suggested by this are...

I understand there may be that there is no room for a pipe primitive because of Kefir's minimal API intent. A simple pipe primitive could have the signatures of both an emitter and a stream, but extra thinking would be needed to ensure that a downstream pipe which receives an End through its own emitter then unsubscribes from all its upstream sources.

var Kefir = require("kefir");

var input = Kefir.sequentially(500, [0,1,2,3]);

var outputPipe;

var outputSubscribe = function(){
    input.onAny(outputPipe);
};

var outputUnsubscribe = function(){
    input.offAny(outputPipe);
};

var output = Kefir.stream(function(emitter){
    outputPipe = function(event){
        emitter.emitEvent(event);
    }
    outputPipe.emitter = emitter;
    outputSubscribe();
    return outputUnsubscribe;
});
output.onEnd(outputUnsubscribe);

output.take(1).toPromise().then(console.log)

setTimeout(function(){
    outputPipe.emitter.end();
}, 1000);
rpominov commented 9 years ago

Here is another pattern:

var input = ... 
var disposer = ...
var promise = input.takeUntilBy(disposer).toPromise()

If only you could create the disposer somehow naturally, without emitter closure acrobatics :) For example var disposer = Kefir.fromEvents(closeButton, 'click').take(1).

cefn commented 9 years ago

Here's an alternative - using Promises to fight Promises, something like...

var Kefir = require("kefir"),
      Q = require("q");

var input = ...
var disposer = Q.defer();
var promise = input.takeUntilBy(Kefir.fromPromise(disposer.promise)).toPromise();
disposer.resolve();

(untested)

Is there any reason that pool never ends? I understand that it shouldn't end when it's contributing streams do. However, can't there be an end() call available on the pool itself, which will cause it to unsubscribe from its contributors and issue an end event to its subscribers? That would address the scenario of tidying up promises using just a single indirection through a call to pool().

Opening up a full emitter interface on pool could make it quite a powerful element although this scenario only needs an end() call available.

rpominov commented 9 years ago

Opening up a full emitter interface on pool could make it quite a powerful element

We actually have a deprecated method doing exactly that https://github.com/rpominov/kefir/blob/master/deprecated-api-docs.md#kefirbus (see deprecation story here #88).

I still wish we'll find a better solution for that kind of problems, but perhaps bus / Kefir.emitter() will indeed return in some form eventually.

Btw, native Promises also has only Promise((resolve, reject) => ...) interface (so we may have to extract resolve from them the same way we extract emitter from Kefir.stream() callback — and that's OK) https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Promise

Observable proposal also doesn't has such API, AFAIK https://github.com/zenparsing/es-observable

cefn commented 9 years ago

I like the clean and minimal API, so I definitely understand you wanting to minimise duplication and steer people away from sloppy practice. I recognise adding a full emitter interface to pool might be a step too far for these reasons.

However, adding the end() method to streams from pool() is complementary to the existing API as it handles the situation that...

Without Pool looking after it, an author without control downstream subscribers, who wants to stop events being forwarded, would have to maintain a second duplicate list of sources and do their own unsubscription to achieve the same result.

https://rpominov.github.io/kefir/#pool currently says Pool never ends, but in the alternative implementation, Pool only ends when pool.end() is called and not before. Does this make sense? Or do you think even adding end() is too much?

rpominov commented 9 years ago

Yeah, I guess adding .end() method to the pool makes sense. A PR is welcomed.

cefn commented 9 years ago

I've been looking into a PR but it's my first! I think I can figure out the right bit of AbstractPool, and the right bit of git, but time will tell.

NeverwinterMoon commented 7 years ago

If only you could create the disposer somehow naturally, without emitter closure acrobatics :)

@rpominov I know this isn't going to sound like the best idea, but what about using the private function #_emitEnd() on Pool?

I am currently trying to use this in tests only: I have a function that takes multiple streams, I then try to test this function by passing multiple pools and just plugging them with data to simulate data flow. I am also using Jest, so it makes sense to return a promise in the spec, so Jest will wait for that promise to resolve and evaluate the expected results. So, what I am doing now is (CS):

streamA = Kefir.pool()
streamB = Kefir.pool()

setTimeout (->
  streamA._emitEnd()
  streamB._emitEnd()
), 10

streamA.plug Kefir.sequentially 0, [ 'something' ]
streamB.plug Kefir.sequentially 0, [ 'somethingElse' ]

expect.assertions 1

functionUnderTest streamA, streamB
  .toPromise()
  .then (value) ->
    expect value
      .toEqual 'something'

Is there any better approach to this for such a case?

rpominov commented 7 years ago

It's probably fine, because if #end() was implemented it would probably just call #_emitEnd() under the hood.

But I wouldn't recommend to use private API too much, after all it may change in any minor version, although I'm not planning to do so any time soon.

mAAdhaTTah commented 7 years ago

@NeverwinterMoon FWIW, I used to do something similar in my tests but found that it was easier to just pass the streams directly into said functions rather than plugging them into a pool, often using Kefir.merge. There generally are enough ways to compose a stream even for tests that I was able to convert my usage of pool -> plain streams.

NeverwinterMoon commented 7 years ago

@mAAdhaTTah Thanks for the advice! We have been using Kefir in our code base for quite some time now, but I started noticing that we were testing the functionality around the streams in a very awkward manner (or not testing at all in a lot of scenarios). So, now I am trying this and that to find the best approach to writing clean but reliable tests.

mAAdhaTTah commented 7 years ago

@NeverwinterMoon The other thing I'll definitely suggest is using spies. My tests tend to look like this:

it('should do a thing', () => {
  const source$ = Kefir.stream(/* emit values & then end */);
  const error = sinon.spy();
  const value = sinon.spy();

  doThing(source$).observe({
    error,
    value,
    end() {
      // Validate spies' callCount & argument values
    }
  });
});

Hope that helps!

mAAdhaTTah commented 6 years ago

@NeverwinterMoon Update: We now have chai-kefir for testing streams. Please let me know if the documentation is unclear in any way.

NeverwinterMoon commented 6 years ago

@mAAdhaTTah Great news! Sadly, I am no longer involved with the project that was using Kefir. But I'll pass the info to the people still working on it.