kefirjs / kefir

A Reactive Programming library for JavaScript
https://kefirjs.github.io/kefir/
MIT License
1.87k stars 97 forks source link

Sequencing things asynchronously #114

Closed myndzi closed 9 years ago

myndzi commented 9 years ago

It looks like flatMapConcurLimit is the only means of controlling concurrency, but it calls its callback eagerly. It would be nice if there was a useful way to do something like map a stream of URLs to their contents, but be able to control concurrency in a way that prevents hammering on an API. That is to say, if I wanted to make one request at a time, and wait until one finishes before starting the next one, there doesn't seem to be a way to do that.

Promises seem to be a useful way to govern "when" to consume the next value, but returning a property via Kefir.fromPromise doesn't help in this case because of the eager calling. It'd be convenient to be able to return promises directly, but I can understand why that might be out of scope for the module.

Am I failing to understand some obvious way to handle this need? What do you think about providing a method or two for lower level composition, similar to how "map" and "concat" make up "mapConcat", but for controlling this sort of thing?

rpominov commented 9 years ago

You can simply use flatMapConcurLimit for this. The callback is called eagerly, but the stream that it returns activated lazily. So something like this should work:

foo.flatMapConcurLimit(function(url) { // this callback is called eagerly

  return Kefir.stream(function(emitter) { // this callback is called lazily

    // supposed `then` accepts onSucces, onError, and onAlways callbacks
    makeRequest(url).then(emitter.emit, emitter.error, emitter.end);
  });
}, 2);
myndzi commented 9 years ago

I don't know that I would call the above, 'simply', but it mostly worked when I tried it. With a concurrency of 1 it still called the function twice for some reason on the first round. Might be a bug, I'm not sure how to run it down.

Either way, thanks; this confirms at least that I wasn't misunderstanding something, and I wrote a little helper that essentially subscribes/unsubscribes repeatedly as needed to a source stream to 'serialize' the requests.

rpominov commented 9 years ago

I don't know that I would call the above, 'simply'

Point taken :)

With a concurrency of 1 it still called the function twice for some reason on the first round. Might be a bug, I'm not sure how to run it down.

I also couldn't reproduce this:

Kefir.later(1000, 1).flatMapConcurLimit(function() {
  console.log('in callback 1'); 
  return Kefir.stream(function(em) {
    console.log('in callback 2'); 
    em.emit(42); 
    em.end();
  });
}, 1).log()
in callback 1
in callback 2
[later.flatMapConcurLimit] <value> 42
[later.flatMapConcurLimit] <end>

I guess there other things involved.

myndzi commented 9 years ago

I don't have the code I had anymore but will try to reproduce later for debugging purposes...

myndzi commented 9 years ago

When I said called it twice, I didn't mean duplicate call (two times with same parameter), I meant it didn't delay between the first and the second call of a sequence. I played around with it a bit but I think my understanding is tainted now; couldn't reproduce and can't think of what I would have done. Might have had to do with properties since I was using fromPromise. I wouldn't worry about it.

Some kind of 'lazyMap' method would be helpful, but without the ability to consume promises naturally, I'm not sure it's all that useful since you'd still have to do a bunch of wrapping.

rpominov commented 9 years ago

I'll try to explain in detail: when we create a stream using Kefir.stream(callback), the callback is called only when stream gets a first subscriber (i.e. activates). In flatMapConcurLimit when it gets a new source stream, it looks if it already has more than n streams it subscribed to, if so, it puts new stream to the queue, otherwise immediately subscribes to the new source.

myndzi commented 9 years ago

I believe I get it, it's just unwieldy for this sort of thing. When you pointed out explicitly the 'eager' vs 'lazy' callback, it was clear what happens and why. I think there's a valid use case for making this specific sort of thing easier on the library user, which is why I mention it; whether you feel the same, is up to you :)

rpominov commented 9 years ago

I think the laziness that Kefir.stream(cb) provides is powerful enough. I wish promises worked similarly and call the callback only on first "subscriber", then we wouldn't have to wrap promises into Kefir.stream, but just use Kefir.fromPromise.

Perhaps documentation could be improved to better highlight the power of lazy activation of streams. If I'd only knew how to improve it :)

BTW, I've built another example. I see you've already "got it", but maybe it will be helpful for others:

var urls = Kefir.stream(function(em) {
  em.emit(1)
  em.emit(2)
  em.emit(3)
  em.end()
})

function loadContent(url) {
  return Kefir.stream(function(em) {
    console.log('sending request for ' + url)
    setTimeout(function() {
      em.emit('response for ' + url)
      em.end()
    }, 1000)
  })
}

var contents = urls.flatMapConcurLimit(loadContent, 1)

contents.log()
sending request for 1
[stream.flatMapConcurLimit] <value> response for 1
sending request for 2
[stream.flatMapConcurLimit] <value> response for 2
sending request for 3
[stream.flatMapConcurLimit] <value> response for 3
[stream.flatMapConcurLimit] <end>