nodejs / help

:sparkles: Need help with Node.js? File an Issue here. :rocket:
1.47k stars 282 forks source link

Custom N-to-1 Readable implementation is dropping data occasionally #3128

Closed MHebes closed 4 months ago

MHebes commented 3 years ago

I also created a stackexchange code review post about this, but now realize that I have a bug, so it's not really suited for code review yet.

Below is mostly a summary from that post:

Background

I am writing a Readable implementation which takes multiple input streams (in Buffer mode). It aligns the inputs' data on 16-bit boundaries, and sums up their respective bytes as if they were 16-bit LE integers. It then outputs those sums.

It's essentially a modified implementation of the default Transform, where instead of pipeing in you just pass your Readable into an addInput(...) method.

Streams can be added/removed from the input list at any time while running.

This is used to mix together two 16-bit 2-channel PCM audio streams. This output is fed to a discord bot, which eventually sends the audio over a stream to a discord client.

For example:

stream A: <added> 0x00...0x01.......0x02....0x03...........0x04........0x05....0x06
stream B:                |     <added> ..0x10.....0x11........0x12.......0x13......
                         |                        |                      |
                     (just A's 16bLE)          (A + B 16bLE)             |
output:   ...............0x0100...................0x1412.................0x1816....

Problem

I have this mostly working, but after adding and removing a few test streams, it will eventually start dropping data. It seems to play just the first part of chunks, and then skips to the first part of the next chunk.

This behavior doesn't show up in my test.ts (see the linked stackexchange post).

The only way I've been able to reproduce the behavior is by actually running my discord bot and playing a few test songs. The first few songs play fine, but eventually it starts skipping forward in the song.

Can anyone take a look at mixer.ts below and see if I'm making any obvious errors with how I'm forwarding the input streams' data? I seem to be dropping a lot of it somewhere.

I'll try to record an example of the issue happening.

Code

(This is also in the discord bot repo (branch rewrite-index / hash d206e9aa at time of posting), and in the stackexchange post.

mixer.ts

import { Readable, ReadableOptions } from "stream";

const CHANNELS = 2;
const BIT_DEPTH = 16;

const SAMPLE_BYTES = BIT_DEPTH / 8;
const FRAME_BYTES = SAMPLE_BYTES * CHANNELS;
const SAMPLE_MAX = Math.pow(2, BIT_DEPTH - 1) - 1;
const SAMPLE_MIN = -SAMPLE_MAX - 1;

/**
 * Combines 16-bit 2-channel PCM audio streams into one.
 *
 * Usage:
 *
 * const mixer = new Mixer()
 *
 * mixer.addInput(somePCMAudioStream)
 * mixer.addInput(anotherPCMAudioStream)
 *
 * mixer.pipe(yourAudioPlayer)
 *
 * // remove one of the streams after 2 seconds
 * setTimeout(() => mixer.removeInput(somePCMAudioStream), 2000)
 */
class Mixer extends Readable {
  // list of input streams, buffers, and event handlers (for cleanup)
  private inputs: {
    stream: Readable | null;
    buffer: Buffer;
    name: string;
    ondata: (chunk: Buffer) => void;
    onend: () => void;
  }[] = [];

  // each sample is multiplied by this
  private gainMultiplier;

  // true when _read() is called, false when this.push() later returns false
  private thirsty = false;

  readonly mixerHighWaterMark;

  debugPrint = false;

  constructor(
    opts?: ReadableOptions & {
      gainDivide?: number;
      mixerHighWaterMark?: number;
    }
  ) {
    super(opts ? { highWaterMark: opts.highWaterMark } : undefined);
    this.mixerHighWaterMark =
      opts?.mixerHighWaterMark ?? this.readableHighWaterMark;
    this.gainMultiplier = opts?.gainDivide ? 1 / opts.gainDivide : 1;
  }

  private log(message?: any, ...optionalParams: any[]) {
    if (this.debugPrint) {
      console.log(message, ...optionalParams);
    }
  }

  /**
   * Called by Readable.prototype.read() whenever it wants more data
   */
  _read() {
    this.log("_read");
    this.thirsty = true;

    // we need data so resume any streams that don't have a bunch already
    this.inputs.forEach((v) => {
      if (v.buffer.length < this.mixerHighWaterMark) {
        if (v.stream) {
          this.log(
            `  Resuming ${v.name}, need more data (have ${v.buffer.length})`
          );
          v.stream?.resume();
        } else {
          this.log(
            `  Would resume ${v.name} but it's removed (have ${v.buffer.length})`
          );
        }
      }
    });

    // have to do this in case all our streams are removed, but there's still
    // some buffers hanging around
    this._doProcessing();
  }

  /**
   * Adds a stream to the list of inputs.
   *
   * @param name is just for debug info
   */
  addInput(stream: Readable, name: string) {
    this.log(`+ Adding ${name}`);
    const obj = {
      stream: stream,
      buffer: Buffer.allocUnsafe(0),
      name: name,
      ondata: (chunk: Buffer) => {
        this.log(
          `  Got ${chunk.length} data for ${name}, (${obj.buffer.length} => ${
            chunk.length + obj.buffer.length
          })`
        );
        // this handler is only called when the downstream is thirsty so the
        // streams were all resumed
        obj.buffer = Buffer.concat([obj.buffer, chunk]);

        this._doProcessing();
        if (obj.buffer.length >= this.mixerHighWaterMark) {
          // we couldn't keep processing, but we have a lot of data for this
          // particular input buffer, so we pause it until we need to _read
          // again
          this.log(
            `  Pausing ${name}, have enough (have ${obj.buffer.length})`
          );
          stream.pause();
        }
      },
      onend: () => {
        this.log(`  end ${name}`);
        this.removeInput(stream);
      },
    };

    // These are removed in removeInput, so we don't keep getting data events
    // if a user decides to remove an input mid-stream
    stream.on("data", obj.ondata);
    stream.once("end", obj.onend);

    // stream.pause();

    this.inputs.push(obj);
  }

  /**
   * Removes streams, but not necessarily buffers (so they can be used up first)
   */
  removeInput(stream: Readable) {
    this.inputs.forEach((v) => {
      if (v.stream === stream) {
        this.log(`  (delayed) Removing ${v.name}, short length`);
        v.stream.removeListener("data", v.ondata);
        v.stream.removeListener("end", v.onend);
        v.stream = null;
      }
    });
    this._doProcessing();
  }

  /**
   * Schedules several _process() calls for the next event loop.
   *
   * Schedules it async so that streams have a chance to emit "end" (and get
   * dropped from the input list) before we process everything.
   *
   * @param cb invoked when processing completes
   */
  private _doProcessing() {
    while (this._process()) {}
  }

  /**
   * Calculates the sum for the first N bytes of all input buffers, where N is
   * equal to the length of the shortest buffer.
   *
   * @return  true if you should call _process again because there's more data
   *          to process (sort of like this.push())
   */
  private _process(): boolean {
    if (!this.thirsty) return false;

    this.log("Processing...");

    // get the shortest buffer and remove old inputs
    let shortest = Infinity;

    this.inputs = this.inputs.filter((v) => {
      if (v.stream === null && v.buffer.length < FRAME_BYTES) {
        this.log(`- (fufilled) Removing ${v.name}`);
        return false;
      } else {
        shortest = Math.min(shortest, v.buffer.length);
        return true;
      }
    });

    if (this.inputs.length === 0) {
      this.log("Length 0, stop processing");
      return false;
    }

    if (shortest < FRAME_BYTES) {
      this.log(
        `  Shortest (${this.inputs
          .filter((v) => v.buffer.length === shortest)
          .map((v) => v.name)
          .join()}) is too small, stop processing`
      );
      return false; // don't keep processing, we don't have data
    }
    const frames = Math.floor(shortest / FRAME_BYTES);

    const out = Buffer.allocUnsafe(frames * FRAME_BYTES);

    // sum up N int16LEs
    for (let f = 0; f < frames; f++) {
      const offsetLeft = FRAME_BYTES * f;
      const offsetRight = FRAME_BYTES * f + SAMPLE_BYTES;
      let sumLeft = 0;
      let sumRight = 0;
      this.inputs.forEach((v) => {
        sumLeft += this.gainMultiplier * v.buffer.readInt16LE(offsetLeft);
        sumRight += this.gainMultiplier * v.buffer.readInt16LE(offsetRight);
      });

      this.log(
        `    (left) ${this.inputs
          .map((v) => {
            const x = v.buffer.readInt16LE(offsetLeft);
            return `${x} <0x${x.toString(16).padStart(4, "0")}>`;
          })
          .join(" + ")} = ${sumLeft} <0x${sumLeft
          .toString(16)
          .padStart(4, "0")}>`
      );
      this.log(
        `    (right) ${this.inputs
          .map((v) => {
            const x = v.buffer.readInt16LE(offsetRight);
            return `${x} <0x${x.toString(16).padStart(4, "0")}>`;
          })
          .join(" + ")} = ${sumRight} <0x${sumRight
          .toString(16)
          .padStart(4, "0")}>`
      );

      out.writeInt16LE(
        Math.min(SAMPLE_MAX, Math.max(SAMPLE_MIN, sumLeft)),
        offsetLeft
      );
      out.writeInt16LE(
        Math.min(SAMPLE_MAX, Math.max(SAMPLE_MIN, sumRight)),
        offsetRight
      );
    }

    // shorten all buffers by N
    this.inputs.forEach(
      (v) => (v.buffer = v.buffer.slice(FRAME_BYTES * frames))
    );

    // keep processing if we can push more...
    this.log("Trying push!");
    const ret = this.push(out);
    if (!ret) {
      this.thirsty = false;
      this.inputs.forEach((v) => v.stream?.pause());
    }
    return ret;
  }

  _destroy() {
    this.inputs.forEach((v) => {
      v.stream?.removeListener("data", v.ondata);
      v.stream?.removeListener("end", v.onend);
      v.stream?.pause();
    });
    this.inputs = [];
  }
}

export default Mixer;

I know this is a lot to parse. Any help is appreciated, or even any ideas about how to find the bug would be extremely helpful.

olivbau commented 3 years ago

I'm sorry I can't help you, I'm new to nodejs streams. Have you found a solution to your problem? I am interested in the solution. I'm currently working on a Discord bot that retrieves the voice streams of two (or more) people in a channel. To mix these streams together, then make the bot output it.

MHebes commented 3 years ago

@olivbau No, haven’t been able to figure out the problem yet. Once I do I’ll close this issue. You can also try using the code I posted and see how it works for you — maybe the issue doesn’t manifest itself with your use case.

github-actions[bot] commented 5 months ago

It seems there has been no activity on this issue for a while, and it is being closed in 30 days. If you believe this issue should remain open, please leave a comment. If you need further assistance or have questions, you can also search for similar issues on Stack Overflow. Make sure to look at the README file for the most updated links.

github-actions[bot] commented 4 months ago

It seems there has been no activity on this issue for a while, and it is being closed. If you believe this issue should remain open, please leave a comment. If you need further assistance or have questions, you can also search for similar issues on Stack Overflow. Make sure to look at the README file for the most updated links.