nodejs / help

:sparkles: Need help with Node.js? File an Issue here. :rocket:
1.44k stars 276 forks source link

If I want a transform stream to be able to be used as just a writer as well as a true transform, how do I stop pushing data to a possibly non-existant downstream? #4340

Open 1mike12 opened 4 months ago

1mike12 commented 4 months ago

Details

So I have a custom transform stream that basically just lets me accumulate data from upstream, and run a async task when it's hit a predefined batch limit. The use case is to stream files and batch insert into a database.

What I understand is that we can use a transform stream either as a true transformer, or just as a writer AKA the end of the stream pipeline.

However, if we want a stream that can behave correctly in either mode, does this just work automatically? If I push to the read side buffer but there is nothing attached on the read side, wouldn't the read side buffer of my transform grow forever? I don't get it. I can't find any conversation about this anywhere online.

Node.js version

Not applicable.

Example code

import { Transform, TransformCallback } from "stream"

export class BatchStream extends Transform {
  private readonly batchSize: number
  private readonly asyncProcessFunction: (batch: any[]) => Promise<any>
  private batch: any[]

  constructor(batchSize: number, asyncProcessFunction: (batch: any[]) => Promise<any>) {
    super({ objectMode: true })
    this.batchSize = batchSize
    this.asyncProcessFunction = asyncProcessFunction
    this.batch = []
  }

  _transform(chunk: any, encoding: string, callback: TransformCallback): void {
    this.batch.push(chunk)

    if (this.batch.length >= this.batchSize) {
      void this.processBatch(callback)
    } else {
      callback()
    }
  }

  _flush(callback: TransformCallback): void {
    if (this.batch.length > 0) {
      void this.processBatch(callback)
    } else {
      callback()
    }
  }

  private async processBatch(callback: TransformCallback): Promise<void> {
    try {
      await this.asyncProcessFunction(this.batch)
      this.batch = []
      callback()
    } catch (err) {
      callback(err)
    }
  }
}

Operating system

n/a

Scope

code

Module and version

streams