uNetworking / uWebSockets.js

μWebSockets for Node.js back-ends :metal:
Apache License 2.0
7.76k stars 563 forks source link

Correct Way of writing large amounts of dynamic data while respecting backpressure (http) #905

Closed 0x7d8 closed 1 year ago

0x7d8 commented 1 year ago

So Im trying to write relatively large amounts of compressed data and I obviously cant know the size, so I set on using chunked with write instead of tryEnd. for the tryEnd backpressure I used the VideoStream example. I tried to basically copy that 1:1 but replacing tryEnd with write and adding a .end() but the data seems to have flipped letters on the other side.

This is my current code I have, the getCompressMethod() function gives back the method (none, gzip, ...), the header value and a function that handles tryEnd & write, if method is none it uses tryEnd, else it uses write, it returns the first index of the tryEnd return and the return of write, so it always returns whether backpressure is ok.

// Handle Reponse
        if (ctx.continueSend) try {
            const response = await parseContent(ctx.response.content, ctx.response.contentPrettify, ctg.logger)
            Object.assign(ctx.response.headers, response.headers)
            const [ compressMethod, compressHeader, compressWrite ] = getCompressMethod(!ctx.response.isCompressed, ctx.headers.get('accept-encoding', ''), res, response.content.byteLength, ctg)
            ctx.response.headers['content-encoding'] = compressHeader
            if (compressHeader) ctx.response.headers['vary'] = 'accept-encoding'

            const parsedHeaders = await parseHeaders(ctx.response.headers, ctg.logger)

            let eTag: string | null
            if (ctg.options.performance.eTag) {
                eTag = toETag(response.content, parsedHeaders, ctx.response.status)
                ctg.logger.debug('generated etag for content of bytelen', response.content.byteLength)
                if (eTag) parsedHeaders['etag'] = Buffer.from(`W/"${eTag}"`)
            }

            if (!ctx.isAborted) return res.cork(() => {
                let endEarly = false
                if (ctg.options.performance.eTag && eTag && ctx.headers.get('if-none-match') === `W/"${eTag}"`) {
                    ctg.logger.debug('ended etag request early because of match')

                    ctx.response.status = Status.NOT_MODIFIED
                    ctx.response.statusMessage = undefined
                    endEarly = true
                }

                // Write Headers & Status
                if (!ctx.isAborted) res.writeStatus(parseStatus(ctx.response.status))
                for (const header in parsedHeaders) {
                    if (!ctx.isAborted) res.writeHeader(header, parsedHeaders[header])
                }

                if (endEarly) {
                    if (!ctx.isAborted) res.end()
                    return
                }

                // Get Content
                const compression = handleCompressType(compressMethod)
                const destroyStream = () => {
                    compression.destroy()
                }

                // Handle Compression
                compression.on('data', (content: Buffer) => {
                    const contentArrayBuffer = toArrayBuffer(content)

                    if (!ctx.isAborted) {
                        try {
                            const lastOffset = res.getWriteOffset()
                            const ok = compressWrite(contentArrayBuffer)
                            console.log(contentArrayBuffer, ok)

                            if (!ok) {
                                compression.pause()

                                console.log('Paused', contentArrayBuffer.byteLength, compression.isPaused())

                                res.onWritable((offset) => {
                                    const sliced = contentArrayBuffer.slice(offset - lastOffset)

                                    const ok = compressWrite(sliced)
                                    console.log('Resumed,', ok, sliced.byteLength, compression.isPaused())
                                    if (ok) {
                                        ctg.data.outgoing.increase(sliced.byteLength)
                                        ctg.logger.debug('sent http body chunk with bytelen', sliced.byteLength)
                                        compression.resume()
                                        console.log('Resumed intern,', ok, sliced.byteLength, compression.isPaused())
                                    }

                                    return ok
                                })
                            } else {
                                ctg.data.outgoing.increase(content.byteLength)
                                ctg.logger.debug('sent http body chunk with bytelen', content.byteLength)
                            }
                        } catch { }
                    }
                }).once('close', () => {
                    ctx.events.unlist('requestAborted', destroyStream)
                    if (compressHeader && !ctx.isAborted) res.end()
                    return
                })

                // Handle Data
                compression.end(response.content)

                // Destroy if required
                ctx.events.listen('requestAborted', destroyStream)
            })
        } catch (err) {
            ctg.logger.debug(`Ending Request ${ctr.url.href} discarded unknown:`, err)
        }

(sorry for the identation)

What ive found yet is that the logs end at a Paused Log, so the write returns false but .onWritable is not executed after that. Not sure if thats related but may be good to know

uNetworkingAB commented 1 year ago

https://github.com/uNetworking/uWebSockets.js/blob/master/examples/Backpressure.js

0x7d8 commented 1 year ago

https://github.com/uNetworking/uWebSockets.js/blob/master/examples/Backpressure.js

Isnt that just for websockets?

uNetworkingAB commented 1 year ago

drain = onWritable send = write

same story otherwise, see docs

0x7d8 commented 1 year ago

thanks! Turns out my actual problem with the flipping letters is caused by pausing & resuming the zlib stream for whatever reason. Still havent figured out why that does what it does but backpressure works fine