Open joelrbrandt opened 1 month ago
For reference, a smaller reproduction is:
import { Readable, Transform } from 'node:stream';
Readable.from([{}]).pipe(new Transform({
transform(chunk, _, cb) {
this.push(chunk);
}
}));
AFAICT this is working as intended. Per the docs, if object mode is false
, "streams created by Node.js APIs operate exclusively on strings, \
CC @nodejs/streams
@RedYetiDev Thanks for taking a look!
AFAICT this is working as intended.
The part that I feel is not working as intended is that there is no way to catch the error that is produced when the pipe attempts to start flowing.
It should be possible (and in my opinion, would be preferrable) to throw synchronously in the Readable.pipe()
call when the modes don't match between source and destination.
Individual streams cannot change their object mode once constructed. From the documentation on object mode:
Stream instances are switched into object mode using the
objectMode
option when the stream is created. Attempting to switch an existing stream into object mode is not safe.
So, it should be sound to throw in Readable.pipe()
. (I.e., there shouldn't be a case where the check would fail in Readable.pipe()
but things would have actually worked out right when data eventually started flowing.)
The reason I provided a longer repro step was to demonstrate different places in the control flow where an error could be caught.
I agree we are missing a try/catch in pipe.
@ronag
I agree we are missing a try/catch in pipe.
Thanks for taking a look!
I assume you mean "we're missing a check and a throw
in Readable.pipe()
"? There's no exception propagating through .pipe
, so a catch
won't address the issue. The exception happens when the pipe is flowing and the Readable attempts to write to the Writable. That happens here, and there's no way to catch that exception (that I am aware of).
Anyway, if folks feel this is indeed a bug, and that the right fix is to add a check and throw in Readable.prototype.pipe()
, I'm happy to put up a PR by EOD tomorrow. This'll be my first time contributing to node, though, so I might need some guidance. 😄
function ondata(chunk) {
debug('ondata');
try {
const ret = dest.write(chunk);
debug('dest.write', ret);
if (ret === false) {
pause();
}
} catch (err) {
dest.destroy(err);
}
}
@ronag got it. Thanks.
Should there also be a check and throw at Readable.prototype.pipe()
?
I'll add some tests and put up a PR.
I think it will add a noticeable overhead if that is done for every data chunk. Also, I think this is an unrecoverable programmer error so a crash is expected. Can't we just add a check for the source and destination streams in Readable.prototype.pipe()
? I think this is also not necessary, but at least it is less invasive.
I mean something like this:
diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js
index 17f8e53ad5..75dc350b59 100644
--- a/lib/internal/streams/readable.js
+++ b/lib/internal/streams/readable.js
@@ -910,6 +910,17 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this._readableState;
+ if (
+ (state[kState] & kObjectMode &&
+ !dest._writableState[kState] & kObjectMode) ||
+ ((state[kState] & kObjectMode === 0) &&
+ dest._writableState[kState] & kObjectMode)
+ ) {
+ throw new Error(
+ 'The piped streams do not have the same objectMode stetting'
+ );
+ }
+
if (state.pipes.length === 1) {
if ((state[kState] & kMultiAwaitDrain) === 0) {
state[kState] |= kMultiAwaitDrain;
Just note that you can pipe non object mode into object mode but not the other way around.
@lpinca @ronag in my incredibly limited (only one machine / architecture) and not statistically valid (only run a few times, manually) benchmarking, I don't see a performance difference due to the addition of the try/catch
in ondata
. (See some manual runs below.)
Assuming there isn't performance overhead, is there a preferred approach (try/catch in onData vs check in .pipe)?
Happy to put up a PR for either.
Also, are these instructions still current for running benchmarks on CI? (I see the repo they are in is archived.) Also, will I have permissions to trigger such a run on CI? Or does a maintainer need to do that?
Note: I built node with ./configure --node-builtin-modules-path "$(pwd)"
. Then, I added a console.log("[local build]...")
to Readable.prototype.pipe
to ensure I was actually running the modified code in the benchmark.
ondata
[~/devel/node] (main) $ ./node --version
v23.0.0-pre
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,249,159.338211797
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,197,094.442971323
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,029,514.30719613
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 28,273,879.594224814
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,225,075.268889103
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 27,992,464.42857583
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 28,337,883.925193656
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,188,259.898103785
ondata
with try/catch[~/devel/node] (main) $ ./node --version
v23.0.0-pre
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 28,868,575.624709625
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 29,417,366.983859424
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 28,880,720.458452556
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 28,618,137.460068755
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 28,907,380.567654
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 29,311,473.430494733
[~/devel/node] (main) $ node --version
v22.8.0
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,344,122.315635882
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,975,323.889792357
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,917,007.4656484
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,974,708.162682924
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,985,794.415783517
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,888,451.761676144
If there is no performance overhead I would argue that both suggestions should be applied.
With the check in place, under what circumstances might writable.write()
throw an error?
Also, are these instructions still current for running benchmarks on CI? (I see the repo they are in is archived.) Also, will I have permissions to trigger such a run on CI? Or does a maintainer need to do that?
See https://github.com/nodejs/node/blob/main/doc/contributing/writing-and-running-benchmarks.md. A collaborator needs to start the benchmark CI.
With the check in place, under what circumstances might
writable.write()
throw an error?
"Streamlike" objects.
Version
22.8.0
Platform
Subsystem
stream
What steps will reproduce the bug?
Run the following code:
How often does it reproduce? Is there a required condition?
Always
What is the expected behavior? Why is that the expected behavior?
Probably
Readable.pipe()
should compare theobjectMode
state of the source and the write side of the destination, and if they don't match, it should synchronously throw.So, in the code above, I would expect to catch an error on the line that reads
console.error("caught error when calling pipe", e);
Alternatively, the writable (or transform) stream could emit an error. In that case, in the code above, I would expect to see an error logged by the
passThrough.on("error")
callback.What do you see instead?
There is an uncaught exception. It is caught by the callback to
process.setUncaughtExceptionCaptureCallback
. That exception is:I don't believe there is any way to catch this exception in the code that has the streams in context. (But if there is, then that'd be swell! And there's no issue here! And in that case, apologies for not understanding the right part of the streams API to handle these errors correctly.)
Additional information
Broadly, the stream module is great ❤️ ! I love building stuff with it. Thanks for your hard work.