dvlsg / async-csp

CSP style channels using ES7 async/await
MIT License
317 stars 18 forks source link

Puttable or alternative APIs #9

Closed ivan-kleshnin closed 8 years ago

ivan-kleshnin commented 8 years ago

I'm trying to replace Node Stream API with Channel API and get real backpressure (not buffering, throttling, highWaterMark or other heuristic approximation Node and RxJS do) for learning purpose.

Here is my attempt with comments about the problem I've encountered:

function readFile(path, opts) {
  let outChan = new Channel(1);
  let inStream = Fs.createReadStream(path, {encoding: "utf-8"});
  console.log("== READ START ==");
  inStream
    .on("readable", async function () {
      console.log("readable!");
      let chunk;
      do {
        console.log("reading...");
        // inStream.pause(); does not help, flowing mode only?!

        ////
        // await outChan.puttable(); -- need something like this
        //// -- imaginary Promise which resolves when you can put into Channel

        chunk = inStream.read(); // may immediately cause another "readable" event
        await outChan.put(chunk);
        // inStream.resume(); does not help, flowing mode only?!
      } while (chunk !== null);
    })
    .on("end", function () {
      console.log("== READ DONE ==");
    });
  return outChan;
}

function writeFile(path, opts, inChannel) {
  let outStream = Fs.createWriteStream(path, opts);
  console.log("== WRITE START ==");
  (async function () {
    let chunk;
    while ((chunk = await inChannel.take()) !== null) {
      console.log("writing...");
      await timeout(1000); // should stop readable for a second
      // ... write implementation ...
    }
    console.log("== WRITE DONE ==");
  })();
}

readFile("sample.jpg", {}); // -- problem is here already
// writeFile("sample2.jpg", {}, readFile("sample.jpg", {}));

The API of ReadableStream made in a way that as soon as you call .read() it immediately resumes with a next chunk. I need to pause reading somehow but there is no API to check if Channel is writable right now. I can't just await channel.put(foo); in this case. Do I miss something?

dvlsg commented 8 years ago

Readable streams are a tricky one to work with. In order to properly support their expectations, I would potentially need a #write() method which returns either true or false, depending on whether or not backpressure should be assumed. Then, we would also potentially need a way to support Channel#on('drain', callback) to tell the reads to continue piping. Both of these items would be very specific to the nodejs stream expectations. I believe they could be implemented, but it would tie us to support a specific external library as well.

If we do decide to support something like your proposal, I would probably call it #drain(), and mimic how nodejs streams handle the drain event. I'm not convinced it would be enough, given your example code though -- I'm really not sure how a ReadStream#on('readable', callback) would handle a callback which returns a promise. You could try something like this, instead of your callback for #on('readable') (attaching a data event handler puts the stream into flowing mode):

  inStream.on('data', async data => {
    inStream.pause();
    await outChan.put(data);
    inStream.resume();
  });

But a glance at that has me slightly concerned about performance, especially if you have to pause/resume the instream every time.

ivan-kleshnin commented 8 years ago

But a glance at that has me slightly concerned about performance, especially if you have to pause/resume the instream every time.

Absolutely. In this way we can try to achieve radical memory efficiency (no buffer usage) for the cost of awful performance. As I said, it's just for learning purposes.

In general, I agree. Node Stream API is very specific and this single use case should not dictate us how the library API should look like. We definitely need to collect more usage experience. So let's close this issue for now and if (or when) we hit something similar it may be reconsidered.