uNetworking / uWebSockets.js

μWebSockets for Node.js back-ends :metal:
Apache License 2.0
7.76k stars 563 forks source link

uWS.HttpResponse.onWritable incorrectly calls the handler for a previous failed chunk when handling backpressure #961

Closed kartikk221 closed 11 months ago

kartikk221 commented 11 months ago

Recently, I noticed multiple odd occurences where large files being served over local network between a macOS machine to a Windows machine would randomly hang mid way and never finish. After spending a lot of hours debugging the server side code hyper-express I observed very weird behavior with uWebsockets.js:

Implementation:

Debug modifications to make to Response.js for logging:

    request_id = Math.round(Math.random() * 1_000_000);

    _stream_chunk(chunk, total_size) {
        // Ensure this request has not been completed yet
        if (this.completed) return Promise.resolve();

        // Return a Promise which resolves once the chunk has been fully sent to the client
        return new Promise((resolve) =>
            this.#raw_response.cork(() => {
                // Ensure the client is still connected
                if (this.completed) return resolve();

                // Remember the initial write offset for future backpressure sliced chunks
                // Write the chunk to the client using the appropriate uWS chunk writing method
                const write_offset = this.write_offset;
                const [sent] = this._uws_write_chunk(chunk, total_size);
                if (sent) {
                    // The chunk was fully sent, we can resolve the promise
                    resolve();
                } else {
                    // Bind a drain handler to relieve backpressure
                    // Note! This callback may be called as many times as neccessary to send a full chunk when using the tryEnd method
                    let drain_id = Math.round(Math.random() * 1_000_000);
                    console.log(this.request_id, drain_id, 'draining', '...');
                    this.drain((offset) => {
                        console.log(this.request_id, drain_id, 'draining', ' at ', offset, '/', total_size, 'bytes');
                        // Check if the response has been completed / connection has been closed since we can no longer write to the client
                        if (this.completed) {
                            resolve();
                            return true;
                        }

                        // Attempt to write the remaining chunk to the client
                        const remaining = chunk.slice(offset - write_offset);
                        const [flushed] = this._uws_write_chunk(remaining, total_size);
                        if (flushed) resolve();

                        // Return the flushed boolean as not flushed means we are still waiting for more drain events from uWS
                        return flushed;
                    });
                }
            })
        );
    }

    async stream(readable, total_size) {
        console.log('\n\n', this.request_id, 'streaming', total_size, 'bytes from a readable stream...');
        // Ensure readable is an instance of a stream.Readable
        if (!(readable instanceof stream.Readable))
            this.throw(
                new Error('HyperExpress: Response.stream(readable, total_size) -> readable must be a Readable stream.')
            );

        // Do not allow streaming if response has already been aborted or completed
        if (!this.completed) {
            // Initiate response as we will begin writing body chunks
            this._initiate_response();

            // Bind an 'close' event handler which will destroy the consumed stream if request is closed
            this.once('close', () => (!readable.destroyed ? readable.destroy() : null));

            // Bind an 'end' event handler on the readable stream to send the response if no total size was provided hence chunked encoding is used
            if (total_size === undefined) readable.once('end', () => this.send());

            // Define a while loop to consume chunks from the readable stream until it is fully consumed or the response has been completed
            let read_bytes = 0;
            let written_bytes = 0;
            while (!this.completed && !readable.destroyed) {
                // Attempt to read a chunk from the readable stream
                let chunk = readable.read();
                if (!chunk) {
                    // Wait for the readable stream to emit a 'readable' event if no chunk was available
                    await new Promise((resolve) => readable.once('readable', resolve));

                    // Attempt to read a chunk from the readable stream again
                    chunk = readable.read();
                }
                read_bytes += chunk ? chunk.length : 0;
                console.log(this.request_id, 'read', read_bytes, '/', total_size, 'bytes');

                if (chunk) {
                    // Stream the chunk to the client
                    console.log(this.request_id, 'streaming chunk', chunk.length, 'bytes');
                    await this._stream_chunk(chunk, total_size);
                    console.log(this.request_id, 'streamed chunk', chunk.length, 'bytes');
                    written_bytes += chunk.length;
                    console.log(this.request_id, 'wrote', written_bytes, '/', total_size, 'bytes');

                    // This is a workaround due to a uWebsockets.js bug where the drain callback gets called in the context of the current chunk write for the failure of the next chunk write
                    // By waiting for the next tick, we can ensure that the drain callback is called in the context of the next chunk write
                    await new Promise((resolve) => setImmediate(resolve));
                }
            }
        }
    }

With the above modifications done to Response.js, If you attempt to stream a large file over the network which MUST encounter backpressure multiple times (You will be able to verify this by seeing the drain logs), You will observe that after the first failed chunk which is successfully retried due to backpressure, the next chunk which has to be retried hangs because the drain() handler calls for the previous chunk's bound handler. You will be able to verify that the wrong onWritable/drain() handler was called by looking at the mismatching drain_id in the logs for the drainage logs of the second chunk.

The solution / workaround this problem that I came up with was simply waiting for the next event loop / tick with setImmediate between serving each chunk. I assume this workaround can be further optimized by only using this workaround If the chunk attempted encountered backpressure. This workaround actually solving the problem was the primary reason why I decided to make this issue because I don't fully know why waiting for the next tick works but I figured this is some sort of a race condition in either my code / uWS or a major source of confusion from the Backpressure / VideoStreamer example which can cause very hard to trace bugs in production.

Hope the above provides sufficient information to look into this problem. Would be glad to provide more information If needed.

uNetworkingAB commented 11 months ago

I don't provide any debugging or fixing of third party libraries and I have no idea what Response.js is or does.

onWritable does not give you any guarantees of any kind, it's simply a hint that the socket could be writable. There is no connection between "a previous failed chunk" and an invocation of that callback. There is no guarantee of any sort, that you wouldn't get spurious calls, duplicate calls or similar.

onWritable simply means that it's a good idea to continue feeding the socket more data, but you should still call getBufferedAmount() or similar to determine if you actually need to send stuff.

The same goes for WebSocket drain handler. There is also a section in the user manual re. this:

Backpressure in websockets Similarly to for Http, methods such as ws.send(...) can cause backpressure. Make sure to check ws.getBufferedAmount() before sending, and check the return value of ws.send before sending any more data. WebSockets do not have .onWritable, but instead make use of the .drain handler of the websocket route handler.

Inside of .drain event you should check ws.getBufferedAmount(), it might have drained, or even increased. Most likely drained but don't assume that it has, .drain event is only a hint that it has changed.

uNetworkingAB commented 11 months ago

I read your post again and it sounds like you have misunderstood how onWritable works. There is no "per-chunk" onWritable and you cannot have 2 onWritable. The onWritable is per-socker, not per-chunk.

Maybe it's not even your misunderstanding, but the misunderstanding of third party. But it doesn't matter because third party is not uWS.js, it is some third party project and therefore I cannot look at it since I have provided working examples that reliably stream entire HD videos without any issues.

uNetworkingAB commented 11 months ago

Aha, it is your project.

In that case,

The solution / workaround this problem that I came up with was simply waiting for the next event loop / tick with setImmediate between serving each chunk.

This is definitely not correct, don't rely on this. What you need is integration tests that stress your usage and makes sure it works 100% correct, always.

kartikk221 commented 11 months ago

Yeah @uNetworkingAB 100% right. I completely missed the fact that local tests will not encounter backpressure (At least not to the extent where something like this would be triggered) and never encountered this issue in production until now with another high traffic file delivery library I built on top of hyper-express/uWS.

I understand how onWritable works now and yeah it def makes sense If it's per socket (I'm guessing per HttpResponse) that you should only be using it once not multiple times as it currently is in my implementation.

I do wonder though why is it that simply waiting for the next event loop / tick with setImmediate worked when binding multiple onWritables in my incorrect implementation? I tested this workaround with even a 3 GB transfer that encountered a ton of backpressure at times and it all arrived to the destination properly with the exact md5 hash as the original file.

I am re-writing the whole implementation as we speak according to how you described above but curious as to how things may be working under the hood so I can have better insight for the future.

uNetworkingAB commented 11 months ago

I do wonder though why is it that simply waiting for the next event loop / tick with setImmediate worked when binding multiple onWritables in my incorrect implementation?

If you dance and it rains, will dancing bring rain again? No, because they are unrelated. No idea, but it's definitely not a solution

porsager commented 11 months ago

Any callback from uWebSockets (like onWritable) is already corked, and i bet you're naively monkey patching any uws method to always cork any call, resulting in this behavior because you're suddenly corking twice. Once from already being in onWritable and once yourself.

uNetworkingAB commented 11 months ago

You can't cork twice, corking while corked will be noop

porsager commented 11 months ago

Exactly, so his code to continue in onWritable probably never runs ;)

kartikk221 commented 11 months ago

Exactly, so his code to continue in onWritable probably never runs ;)

Feel free to review my code but I actually only added corks after the warnings update to only patch up the uncorked warnings throughout the test suite.

It's safe to say the behavior I observed was not because of multi corks or anything like that. The culprit was actually the fact that you cannot assign more than one handler to the onWritable hence my second back-pressure handling would not work properly since it would never receive drain events.

It's all been fixed properly now and I appreciate the help and clarification @uNetworkingAB