kefirjs / kefir

A Reactive Programming library for JavaScript
https://kefirjs.github.io/kefir/
MIT License
1.87k stars 97 forks source link

Ending a parent stream when child streams end? #101

Closed ospatil closed 9 years ago

ospatil commented 9 years ago

I am trying to create quite a convoluted stream structure for reading and processing files in a directory and am stuck at one point and not sure how to proceed. Here is how my code looks like -

var Kefir   = require('kefir');
var fs      = require('fs');
var glob    = require('glob');
var acorn = require('acorn');

//Create a glob to read all JS files from dirPath
var jsFiles = glob(dirPath + '/**/*.js');

/* The glob object created above is a nodejs event emitter
 * It emits 'end' event with array of file paths as data
 * The processing is as follows -
 * 1 -> create a stream out of end event and concat all the resultant streams
 * 2 -> Create a Kefir sequential stream for all paths in path array, create
 *          and concat a stream for each of the file paths. 
 * 3 -> Create a stream from node callback for each file path and read and process contents
 */
var matchStream = Kefir.fromEvents(jsFiles, 'end').flatMapConcat(function(pathArr) { //1
    return Kefir.sequentially(0, pathArr).flatMapConcat(function(path) { //2
      return Kefir.fromNodeCallback(fs.readFile.bind(fs, path, 'utf8')).map(function(content) { //3
          return {
            name: path,
            content: acorn.parse(content, {locations: true})
          };
        });
    });
  });

While the above code works fine and I can get the contents of files as expected, what I need is a stream end event to know that I can all files have been read. The matchStream simply doesn't end in above example and I have verified that the inner streams end successfully. Is there any way I can make matchStream end when all the "child" streams end? For example, if the size of pathArr in above example is 2, I can do matchStream.take(2) to make it end after picking up two results, but how can it be made dynamic based on pathArr length?

Am I going in right direction or this can be done in some better way? I would appreciate any help on this.

rpominov commented 9 years ago

Hi, I think the problem is that the very first part Kefir.fromEvents(jsFiles, 'end') never ends. Try to add .take(1) to it: Kefir.fromEvents(jsFiles, 'end').take(1).flatMapConcat(...

ospatil commented 9 years ago

That was a super quick response :). Thank you. I used take(1) in the meantime and it works as expected in this case. I have a follow-up question though. I suppose streams created using fromEvents won't end on their own, would they? One way of end handling of streams with variable number of events could be creating and merging streams and use withHandler as shown below as most event emitters emit some sort of "end" events along with "data" events, right?

var EventEmitter = require("events").EventEmitter;
var emitter = new EventEmitter();

var counter = 0;

var interval = setInterval(function() {
  if(counter < 3) {
    emitter.emit('data', ++counter);
  } else {
    emitter.emit('end', -1);
    clearInterval(interval);
  }
}, 100);

var dataStream = Kefir.fromEvents(emitter, 'data');
var endStream = Kefir.fromEvents(emitter, 'end').map(function() {
  return "END";
});

var unifiedStream = Kefir.merge([dataStream, endStream]);

unifiedStream.withHandler(function(emitter, event) {
  if (event.value === 'END') {
    emitter.end();
  } else {
    emitter.emit(event.value);
  }
})
.log();

BTW, thanks a ton for kefir. It's an amazing library with intuitive "just what you need" API and excellent documentation. In fact, the code in the question comes from an application that I created first in rxjs before discovering kefir and moving on to it. Great work :+1:.

rpominov commented 9 years ago

Actually it much easier:

var streamThatEnds = Kefir.fromEvents(emitter, 'data')
  .takeUntilBy(Kefir.fromEvents(emitter, 'end'));

And if you already have something like unifiedStream, you can use .takeWhile instead of .withHandler:

var streamThatEnds = unifiedStream.takeWhile(function(x) {
  return x !== 'END';
});

BTW, thanks a ton for kefir. It's an amazing library with intuitive "just what you need" API and excellent documentation. In fact, the code in the question comes from an application that I created first in rxjs before discovering kefir and moving on to it. Great work :+1:.

Thanks, that means a lot!