whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.34k stars 155 forks source link

Synchronous WritableStream Writer to avoid heap allocations from Promises returned by WritableStreamDefaultWriter's write #1276

Open btakita opened 1 year ago

btakita commented 1 year ago

I have a sychronous export process for a large xlsx file where many writes occur. The export previously used nodejs's Writable the write method is synchronous. I converted the streaming over to WritableStream. This resulted in a FATAL ERROR: Reached heap limit Allocation failed.

Since Promises create heap allocations & synchronous programs do not benefit from Promises, it would be more memory efficient to have a WritableStreamSynchronousWriter or a WritableStreamBufferedWriter.

I believe a WritableStreamBufferedWriter could be a library, which would presumably delay heap limit errors with synchronous programs. WritableStreamSynchronousWriter does not seem possible without adding support to the Web Streams API. A synchronous function could also run into memory limit issues or need to be drained at a later time than an async function if the reads are faster than the writes.

Is this use case something to be considered for Web Streams API?

ricea commented 1 year ago

I don't think it's the promises that are causing you to run out of memory, but the large number of pending writes holding onto buffers. So awaiting the results of your writes (or waiting writer.ready) would be your best option.

Synchronous blocking operations are harmful in a browser context, so we would prefer not to add them to the standard.

btakita commented 1 year ago

I can confirm that the promises have an effect. I went ahead & implemented a synchronous buffered write using ArrayBuffer, Uint8Array, & a write queue. I was able to complete the process while limiting memory usage.

Here is an article that reifies the claim the Promises cause excessive heap allocations.

In addition to storing information about the Promise instance itself (such as its properties and methods), the JavaScript runtime also dynamically allocates more memory to keep track of the asynchronous activity associated with each promise.

Edit: I was able to achieve the same performance as Nodejs writer write using the synchronous buffered write. When I first wrote this post, I was excessively calling Uint8Array slice.

Synchronous blocking operations are harmful in a browser context, so we would prefer not to add them to the standard.

I agree that synchronous blocking operations are harmful on the main thread in a Web UI. Though blocking synchronous operations are the most time efficient approach for a CPU bound export processes on a server/dev machine or on a WebWorker. I'm looking for a fire-and-forget approach. It seems like this has been achieved by creating the synchronous buffered write function.

In case you are interested, here is the source code:

import { queue_ } from '@ctx-core/queue'
import { createWriteStream } from 'fs'
import { Writable } from 'stream'
export const stream__promise_ = web__stream__promise_
async function web__stream__promise_(
    path:string,
    fn:(write:(content:string)=>void)=>any
):Promise<void> {
    console.info(performance.now(), path, 'start!')
    const writer = Writable.toWeb(
        createWriteStream(path)
    ).getWriter()
    const buffer_length = 512_000
    let buffer = new ArrayBuffer(buffer_length)
    let view = new Uint8Array(buffer)
    const encoder = new TextEncoder()
    let byte_idx = 0
    let total__byte_idx = 0
    const queue = queue_(1)
    function write(content:string) {
        if (!content) return
        const encoded = encoder.encode(content)
        const encoded_length = encoded.length
        if (byte_idx + encoded_length > buffer_length) {
            queue__add(view, byte_idx, total__byte_idx)
            buffer = new ArrayBuffer(buffer_length)
            view = new Uint8Array(buffer)
            byte_idx = 0
        }
        view.set(encoded, byte_idx)
        byte_idx += encoded_length
        total__byte_idx += encoded_length
    }
    fn(write)
    // write the remaining bytes in buffer
    queue__add(view, byte_idx, total__byte_idx)
    await queue.close()
    console.info(performance.now(), path, 'done!')
    function queue__add(
        view:Uint8Array,
        byte_idx:number,
        total__byte_idx:number
    ) {
        queue.add(()=>
            writer.write(view.slice(0, byte_idx))
                .then(()=>
                    console.info(performance.now(), path, total__byte_idx, 'bytes written'))
        ).then()
    }
}
async function node__stream__promise_(
    path:string,
    fn:(writer:(content:string)=>void)=>any
):Promise<void> {
    console.info(performance.now(), path, 'start!')
    const writer = createWriteStream(path)
    function write(content:string) {
        if (!content) return
        writer.write(content)
    }
    fn(write)
    await new Promise(res=>
        writer.end(()=>{
            console.info(performance.now(), path, 'done!')
            res(null)
        }))
}