nodejs / node-v0.x-archive

Moved to https://github.com/nodejs/node
34.44k stars 7.3k forks source link

enable stream.Writable#_flush([callback]) #5315

Closed networkimprov closed 11 years ago

networkimprov commented 11 years ago

Optional method for stream.Writable subclasses. Called when end() is called by the stream client, before 'finish' is emitted. Similar to Transform#_flush()

Discussed here: https://groups.google.com/d/topic/nodejs/ydWIyPrDVB4/discussion

isaacs commented 11 years ago

So, it sounds like your Writable is calling the _write() callback before it's actually done writing that chunk.

Why are you doing that, exactly?

Flushing on 'finish' works only if there's no one else listening on 'finish'. And then our stream subclass doesn't conform to the Writable API.

Why doesn't it work if others are listening to 'finish'? Events can have lots of listeners, that's fine.

The way that your stream doesn't comply with the Writable paradigm is that it's calling the write() callback (and, perhaps, the end() callback) before it's actually written or ended.

Maybe what you need is a Transform that takes arbitrary-length chunks, and writes out fixed-length chunks, but even then, a reader can read(n) in whatever sizes they like...

Still somewhat missing the intent here.

TooTallNate commented 11 years ago

@isaacs For node-speaker, the native interface offers an API that fits almost perfectly with the Writable interface. You create an audio "handle" and with that you can call write() with char* buffers, but then when that's done there's this close() function I need to call. Right now I'm doing that by hooking onto the finish event, but I think ideally, if there was _flush() as part of the Writable interface, that this close() function would run and invoke it's callback function before "finish" event is emitted.

networkimprov commented 11 years ago

@isaacs, at present 'finish' really means 'finishing'. The underlying resource can't be re-used during 'finishing' so emitting 'finish' to listeners outside the stream at that stage would be wrong. We'd have to tell our listeners to listen for a different event.

Re write() callbacks, if we don't callback in nextTick, the client won't deliver more input if it does subsequent write() or end() in the callback, which would stall the stream.

isaacs commented 11 years ago

You can call the _write callback now or whenever. The Writable machinery will detect a sync call, and defer whatever it has to for your stream to behave correctly. How are you getting the stream to stall?

'finish' means: end() was called, and all the _write callbacks were called, indicating that the stream should be fully flushed. It sounds like you're calling the _write callbacks early, before the chunk is actually "done".

There's no guarantee that all the resources are cleaned up when "finish" is emitted, only that everything you gave it to do is done. Most streams in node-core emit a "close" event when they've cleaned up their underlying resources.

I'm sorry, I'm still not understanding what the problem is with something like this:

function MyWritable(options) {
  if (!(this instanceof MyWritable)) return new MyWritable(options);
  Writable.call(this, options);

  this._resource = getResourceThingieOrWhatever();
  this.once('finish', this.destroy);
}

MyWritable.prototype.destroy = function(cb) {
  if (cb) this.once('close', cb);
  var me = this;
  this._resource.close(function() {
    me.emit('close');
  });
};

// This is kind of silly, I mean, you're only saving the cost of a
// single JavaScript instance object, which is SUPER cheap.
MyWritable.prototype.reuse = function() {
  var me = this;
  this.destroy(function() {
    me._resource = getResourceThingieOrWhatever();
  });
};

What's the problem with self-listening? Why does this assume that no one else can listen to the 'finish' event? We do stuff like this all the time in node-core.

I still feel like I must be missing something.

networkimprov commented 11 years ago

The stream only stalls if we defer the write() callback until all of that buffer is flushed, and the client is calling subsequent write/end via the write callback. We fire the callback on nextTick, so it's not a problem.

Oh, I didn't realize 'close' was an option! It's not mentioned in the stream.Writable docs. That does the trick. I'm not opposed to self-listening.

@TooTallNate does this solve your case?

isaacs commented 11 years ago

Oh, I didn't realize 'close' was an option! It's not mentioned in the stream.Writable docs. That does the trick. I'm not opposed to self-listening.

It's not mentioned in the stream.Writable docs because it's not really a requirement of the API. Not all streams emit it, but perhaps it would be worthwhile to mention it anyway as an option. The streams that DO emit 'close' generally all do so to indicate that the underlying resource is completely cleaned up: for sockets and fs streams, this means the handle or file descriptor is closed; for zlib, it's after zlib_free; for crypto, it's after disposing ofthe openssl object, etc.

The stream only stalls if we defer the write() callback until all of that buffer is flushed, and the client is calling subsequent write/end via the write callback.

Right, but isn't that just another way to say that you're completely giving up on backpressure? What happens if you pipe a giant file into it, or a socket that sends you many terabytes of information? The "stalling" until the write callback is called is so that you can accurately communicate the information back to the client.

networkimprov commented 11 years ago

If 'close' is a common event that a Writable subclass implementer should consider, then please do document it. If it was, you'd have at least saved yourself this entire discussion :-)

I''ll edit this issue to request doc for 'close' unless @TooTallNate has further input.

We can apply backpressure as necessary by deferring write() callbacks when we don't need the next batch of data immediately.

isaacs commented 11 years ago

@networkimprov Better to create a new issue, I'll close this one.

Out of curiosity, what is this actual Writable?

networkimprov commented 11 years ago

The Writable in question is an interface to the Xdelta (binary diff tool) patch mechanism. You stream the patch into the Writable and it reads from a source fd and writes to a destination fd. https://github.com/networkimprov/node-xdelta3/

New issue filed: https://github.com/joyent/node/issues/5336

isaacs commented 11 years ago

I see. Yeah, that sounds a bit tricky. So, when you are done writing, perhaps the last bit of the patch is consumed, but it's still not "done", since it has to finish reading the input fd and writing to the output fd.

Why is there a minimum amount that you can write, though? I mean... why not wait on the _write() callback until that bit of the patch is actually fully consumed? Do you just not get a notice of this, or does the underlying lib specify that you have to write in X-sized blocks?

networkimprov commented 11 years ago

We need a complete patch to generate output. When we get a buffer from write, it may not be a whole patch, in which case we'd callback to ack and keep the stream going. We could defer callbacks once we have a whole patch to hand to the engine, but I'm not sure whether we know where the patch boundaries are.

@mtibeica, can you shed more light on this?

isaacs commented 11 years ago

I see, so the chunk you wrote is "fully consumed", as far as you know, it's just that the internal lib is buffering it or something until they get a full patch, so it makes sense to just do the callback immediately. Am I understanding that correctly?

If so, yeah, more and more, it sounds like you should listen on finish and emit 'close' when it's cleaned up, and you're probably doing the right thing. Thanks for the added details, that is very illuminating.

mtibeica commented 11 years ago

I need to feed the xdelta lib XD3_ALLOCSIZE bytes at a time, so when a chunk is fed to _write I must keep a part of it for further processing (which I will concatenate with the next chunk). The problem is that I need to know when the last _write was called so that I can process the remaining chunk.

markstos commented 10 years ago

I've reviewed this thread and I'm not sure it addresses the use-case where a Writable stream has an internal buffer that needs to be flushed. To flush it, the writable stream needs to be able to able to detect when there's /no more input/ coming. The solutions I see proposed above include:

There's a need to trigger that it's time to flush, as the moment when the final flush has happened, which is already covered.

My specific use case is that I have an incoming stream of addresses which need geocoding. I have a Writable stream which will send them to a remote geocoding service, but ideally in large batches. I need to know when I've received the last bit of data, so I can flush the last few addresses to the geocoder.

I can add a listener to the Readable stream to explicitly flush the Writable stream, but it seems cleaner if the Writable stream could be self-contained and not require this.

markstos commented 10 years ago

Actually, I've decided to revise my design to use a Transform stream instead of Writable Stream. Transform already has _flush, so I'm all set.