uNetworking / uWebSockets.js

μWebSockets for Node.js back-ends :metal:
Apache License 2.0
7.76k stars 563 forks source link

Compression #951

Closed 0x7d8 closed 1 year ago

0x7d8 commented 1 year ago

Can anyone provide an example for zlib compression with uws? Ive tried a lot but every time I pause a stream and resume it later (backpressure handling, works 100% fine without compression) it just flips some letters, sometimes just letting out information.

                // Handle Reponse
        if (ctx.continueSend) try {
            const results = await Promise.all([ ... ctx.response.content.map((c) => parseContent(c, ctx.response.contentPrettify, ctg.logger)) ])
            const response = { content: Buffer.concat(results.map((r) => r.content)), headers: Object.assign({}, ...results.map((r) => r.headers)) }

            Object.assign(ctx.response.headers, response.headers)
            const [ compressMethod, compressHeader, compressWrite ] = getCompressMethod(!ctx.response.isCompressed, ctx.headers.get('accept-encoding', ''), res, response.content.byteLength, ctg)
            ctx.response.headers['content-encoding'] = compressHeader
            if (compressHeader) ctx.response.headers['vary'] = 'accept-encoding'

            let eTag: string | null
            if (ctg.options.performance.eTag) {
                eTag = toETag(response.content, ctx.response.headers, ctx.response.cookies, ctx.response.status)
                ctg.logger.debug('generated etag for content of bytelen', response.content.byteLength)
                if (eTag) ctx.response.headers['etag'] = eTag
            }

            const meta = await writeHTTPMeta(res, ctx)

            if (!ctx.isAborted) return res.cork(() => {
                let endEarly = false
                if (ctg.options.performance.eTag && eTag && ctx.headers.get('if-none-match') === eTag) {
                    ctg.logger.debug('ended etag request early because of match')

                    ctx.response.status = Status.NOT_MODIFIED
                    ctx.response.statusMessage = undefined
                    endEarly = true
                }

                meta()

                if (endEarly) {
                    if (!ctx.isAborted) res.end()
                    return
                }

                // Get Content
                if (compressHeader) ctg.logger.debug('negotiated to use', compressHeader)
                const compression = handleCompressType(compressMethod)
                const destroyStream = () => {
                    compression.destroy()
                }

                // Handle Compression
                compression.on('data', (content: Buffer) => {
                    if (!ctx.isAborted) {
                        try {
                            const arrayBuffer = toArrayBuffer(content)
                            const ok = compressWrite(arrayBuffer)

                            if (!ok) {
                                compression.pause()

                                res.sendContent = arrayBuffer
                                res.contentOffset = res.getWriteOffset()

                                res.onWritable((offset) => {
                                    const sliced = res.sendContent.slice(offset - res.contentOffset)

                                    const ok = compressWrite(sliced)
                                    if (ok) {
                                        ctg.data.outgoing.increase(sliced.byteLength)
                                        ctg.logger.debug('sent http body chunk with bytelen', sliced.byteLength, '(delayed)')
                                        compression.resume()
                                    }

                                    return ok
                                })
                            } else {
                                ctg.data.outgoing.increase(content.byteLength)
                                ctg.logger.debug('sent http body chunk with bytelen', content.byteLength)
                            }
                        } catch { }
                    }
                }).once('close', () => {
                    if (compressHeader && !ctx.isAborted) res.cork(() => res.end())
                    destroyStream()

                    ctx.events.unlist('requestAborted', destroyStream)
                    return
                })

                // Handle Data
                compression.end(response.content)

                // Destroy if required
                ctx.events.listen('requestAborted', destroyStream)
            })
        } catch (err) {
            ctg.logger.debug(`Ending Request ${ctr.url.href} discarded unknown:`, err)
        }
0x7d8 commented 1 year ago

im not using an external framework, do you want me to describe some variables so you know more?

0x7d8 commented 1 year ago

but im not really here for help with that code anyway, I want some example for backpressure without knowing the size in advance

uNetworkingAB commented 1 year ago

You can't dump application code and expect someone to solve your app problems. Whether you send a video (VideoStreamer.js) or compressed content, is irrelevant to uWS. It's a layer atop that's not related to anything the server does.

If you get VideoStreamer.js working, you can get any other content working.

e3dio commented 1 year ago

@0x7d8 You are incorrectly getting res.getWriteOffset() after attempting write on data chunk, you are suppose to get that before write to get initial offset like VideoStreamer.js example, your code will send extra repeat chunks of data

e3dio commented 1 year ago

But res.getWriteOffset() only works with res.tryEnd(), you can't use tryEnd because you don't know length. res.write() buffers the whole chunk for you, just wait for onWritable again if res.write() returns false to resume stream, no need to slice chunks