cloudflare / workerd

The JavaScript / Wasm runtime that powers Cloudflare Workers
https://blog.cloudflare.com/workerd-open-source-workers-runtime/
Apache License 2.0
6.29k stars 307 forks source link

🐛 Bug Report — unused stream warning on a tee()d tee #754

Open eminence opened 1 year ago

eminence commented 1 year ago

Consider the following code, as part of a worker's fetch handler. It calls tee() on the request body in order to get 3 ReadableStreams, and cancels the first two:

const [a, tee] = request.body.tee();
const [b, c] = tee.tee();
await a.cancel();
await b.cancel();
return new Response(c, {
    headers
});

This works (in that the request body is returned to the client as expected), but the following warning is printed:

Your worker called response.clone(), but did not read the body of both clones. This is wasteful, as it forces the system to buffer the entire response body in memory, rather than streaming it through. This may cause your worker to be unexpectedly terminated for going over the memory limit. If you only meant to copy the response headers and metadata (e.g. in order to be able to modify them), use new Response(response.body, response) instead.

I don't think this warning is accurate, since all tee branches were either fully read (sent back as a response) or cancelled. Looking at the WarnIfUnusedStream class, I wonder if the call to tryTee should set wasRead = true :

  // No special behavior, just forward these verbatim.
  kj::Maybe<Tee> tryTee(uint64_t limit) override { return inner->tryTee(limit); }

Interestingly, a body that is only tee()d once doesn't issue any warning:

const [a, b] = request.body.tee();
await a.cancel();
return new Response(b, {
    headers
});

Using wrangler v3.1.0

kentonv commented 1 year ago

Hmm... Cancelling probably prevents buffering (though I'd have to verify the implementation gets this right before saying for sure).

But teeing a stream and then canceling one branch of the tee is still redundant and may break optimizations, so the change recommended by the warning is still a good idea.

Is there some use case where you find yourself teeing a stream and then cancelling a branch, where it's difficult to refactor to avoid the tee in the first place?

eminence commented 1 year ago

My actual use case involves teeing a ReadableStream (request.body) three ways:

So a more realistic reproducer is the following:

const digestStream = new crypto.DigestStream("SHA-256");

const [guesser, tee] = request.body.tee();
const [hasher, uploader] = tee.tee();

const r = guesser.getReader();
const chunk = await r.read();
// Try to guess content-type based on chunk.value

await r.cancel(); // we don't need any more data from the reader
r.releaseLock();
await guesser.cancel(); // not sure if we need to cancel both the reader and the stream

hasher.pipeTo(digestStream); // the digestStream should read all bytes from the stream

// put() will also read all bytes from the stream
await env.kv_upload.put(crypto.randomUUID(), uploader, {
    expirationTtl: 60,
});

const final_digest = await digestStream.digest;

return new Response("OK", {
    headers
});

I suppose I could just drain and discard the rest of the chunks from the guesser branch, but cancelling it feels more natural to me.

kentonv commented 1 year ago

Interesting.

What if you move the const chunk = await r.read(); down so that it can be concurrent with the KV upload and the digest pipe?

As written right now, the hasher and uploader tees do actually have to buffer data while the guesser is reading the first chunk. If you could make all the branches happen in parallel then no buffering is needed.

eminence commented 1 year ago

What if you move the const chunk = await r.read(); down so that it can be concurrent with the KV upload and the digest pipe?

No apparent change. To be clear, here's what I tried:

const digestStream = new crypto.DigestStream("SHA-256");

const [guesser, tee] = request.body.tee();
const [hasher, uploader] = tee.tee();

hasher.pipeTo(digestStream); // the digestStream should read all bytes from the stream

const upload_promise = env.kv_upload.put(crypto.randomUUID(), uploader, {
    expirationTtl: 60,
});

const digest_promise = digestStream.digest;

async function guess(readable: ReadableStream) {
    const r = readable.getReader();
    const chunk = await r.read();
    // Try to guess content-type based on chunk.value

    await r.cancel(); // we don't need any more data from the reader
    r.releaseLock();
    await readable.cancel(); // not sure if we need to cancel both the reader and the stream

    return Promise.resolve("TODO")
}

const guesser_promise = guess(guesser);

await Promise.all([upload_promise, digest_promise, guesser_promise]);

// send contents directly back to client:
return new Response("OK", {
    headers
});

I'm OK with the hasher and uploader tees doing a little buffering, but based on these docs it seems like the buffers won't grow unbounded.

kentonv commented 1 year ago

Well, the warning is meant to warn about buffering taking place, and it might be hard to distinguish between "a little buffering" and "too much buffering". But in the parallel case I agree the warning seems to be incorrect.

FWIW, this warning is only generated when running with the inspector enabled. In production we don't try to detect this scenario at all. So a fine work-around here is to ignore the warning.

But I agree it would be nice to make it more accurate.

eminence commented 1 year ago

The warning message says this:

This is wasteful, as it forces the system to buffer the entire stream of data in memory

The language about "the entire stream" made me initially think that the message wasn't about "a little buffering", but a more problematic case where literally the entire stream would be buffered into memory (which is what the canceled tees were supposed to prevent). But having taken a brief look at the code that generates this warning, I can see that it's not that nuanced.

I am indeed ignoring the warning now, but I am interested in having a more accurate one. I worry that the warning might catch a real issue for me in the future, but if I adopt a mental filter to just ignore it, I might get myself into trouble :)