discordjs / discord.js

A powerful JavaScript library for interacting with the Discord API
https://discord.js.org
Apache License 2.0
25.45k stars 3.97k forks source link

Bot prematurely ends output audio stream when I stop speaking #10449

Open recko10 opened 3 months ago

recko10 commented 3 months ago

Which package is this bug report for?

voice

Issue description

Hi everyone, thank you for your time.

I am building a Discord bot that processes and outputs audio in real-time. I take a speaking user's input stream, process it through a WebSocket, and then send the output stream to be played by the Discord bot. However, I'm running into a bug where if I stop speaking, the bot stops outputting audio. Since there is a lag between when I say something and when I receive it from the WebSocket, my desired output never plays fully.

As an example of where I am right now:

user speaks --> input stream is sent to WebSocket --> bot successfully receives processed packets from the WebSocket --> audio output begins to play --> user stops speaking --> audio output immediately ends

When the user continues to speak after that, the bot continues to output from where it left off (so as an example: if the bot is supposed to say "hey what's up" and I stop speaking when the bot says "what's", then when I start speaking again it will make sure to say "up" before continuing).

I can confirm a few things from my testing:

-The input audio stream is never destroyed

-The speaking user has one WebSocket connection established, and it is never closed until the user leaves the channel.

-The WebSocket is receiving all of the user’s input data (because when you stop taking and start again, the bot picks up the translated audio from where it left off, implying that everything the user has said has been processed and sent back in some way).

-I notice that my output queue for playing audio goes empty when I stop speaking. When I continue speaking it is populated again with the correct stream (the left over from the previous output).

When the user starts speaking I subscribe like so:

      receiver.speaking.on('start', (userId) => {
        console.log(`User ${userId} started speaking`);

        const audioStream = receiver.subscribe(userId, {
          end: {
            behavior: 'manual',
          },
        });

        processAudioStream(userId, audioStream, connection);
      });

Here is my function for processing the audio stream:

async function processAudioStream(userId, audioStream, connection) {
      console.log(`Processing audio stream for user ${userId}`);

      let webSocketTransformer;

      if (!userWebSockets.has(userId)) {
        const socket = new WebSocket('WEBSOCKET URL HERE');
        webSocketTransformer = new WebSocketTransformer(socket);
        userWebSockets.set(userId, socket);

        socket.on('open', () => {
          console.log(`WebSocket connection established for user ${userId}`);
        });

        socket.on('message', (data) => {
          console.log(`Received data from WebSocket for user ${userId}`);
        });

        socket.on('error', (error) => {
          console.error(`WebSocket error for user ${userId}:`, error);
        });

        socket.on('close', (code, reason) => {
          console.log(`WebSocket closed for user ${userId} with code ${code}, reason: ${reason}`);
          userWebSockets.delete(userId);
        });
      } else {
        webSocketTransformer = new WebSocketTransformer(userWebSockets.get(userId));
      }

      const opusDecoder = new prism.opus.Decoder({ rate: 48000, channels: 2, frameSize: 960 });
      const resampler_in = new Resampler(48000, 16000);
      const toMono = new StereoToMonoTransformer();
      const resampler_out = new Resampler(16000, 48000);
      const toStereo = new MonoToStereoTransformer();
      const verify = new VerifyAndAdjustTransformer();
      const opusEncoder = new prism.opus.Encoder({ rate: 48000, channels: 2, frameSize: 960 });

      const translatedStream = audioStream
        .pipe(opusDecoder)
        .pipe(resampler_in)
        .pipe(toMono)
        .pipe(webSocketTransformer)
        .pipe(resampler_out)
        .pipe(toStereo)
        .pipe(verify)
        .pipe(opusEncoder);

      audioStream.on('end', () => {
        console.log(`Input audio stream ended for user ${userId}`);
      });

      translatedStream.on('end', () => {
        console.log(`Translated stream ended for user ${userId}`);
        webSocketTransformer.close();
      });

      console.log(`Created translated stream for user ${userId}`);

      translatedStream.on('error', (error) => {
        console.error('Error in translated stream:', error);
      });

      audioQueue.play(connection, translatedStream);
      console.log(`Added translated stream to audio queue for user ${userId}`);

      handleStreamErrors(opusDecoder, opusEncoder, audioStream);
    }

My AudioQueue handling looks like this:

class AudioQueue {
  constructor() {
    this.players = new PlayerPool();
    this.queues = new Map();
    this.playingIn = new Set();
    console.log('AudioQueue initialized');
  }

  init(connection) {
    const player = this.players.get(connection.joinConfig.guildId);
    connection.subscribe(player);
    console.log(`Initialized player for guild ${connection.joinConfig.guildId}`);
    return player;
  }

  extractValue(key) {
    if (!this.queues.has(key)) {
      console.log(`No queue found for key ${key}`);
      return;
    }

    const queue = this.queues.get(key);
    if (!queue) {
      console.log(`Queue is empty for key ${key}`);
      return;
    }

    const value = queue.shift();
    console.log(`Extracted value from queue for key ${key}, remaining items: ${queue.length}`);

    if (queue.length === 0) {
      this.queues.delete(key);
      console.log(`Deleted empty queue for key ${key}`);
    }

    return value;
  }

  playFromQueue(connection) {
    const stream = this.extractValue(connection.joinConfig.channelId);
    if (!stream) {
      this.playingIn.delete(connection.joinConfig.channelId);
      console.log(`No stream to play for channel ${connection.joinConfig.channelId}`);
      return;
    }

    this.playingIn.add(connection.joinConfig.channelId);
    console.log(`Playing in channel ${connection.joinConfig.channelId}`);

    const player = this.players.get(connection.joinConfig.guildId);
    const resource = createAudioResource(stream, { inputType: StreamType.Opus });
    console.log('Created audio resource:', resource);
    player.play(resource);
    console.log(`Started playing audio resource in guild ${connection.joinConfig.guildId}`);

    player.once(AudioPlayerStatus.Idle, () => {
      console.log(`Player became idle, attempting to play next item from queue`);
      this.playFromQueue(connection);
    });

    player.on(AudioPlayerStatus.Playing, () => {
      console.log(`Audio playing in guild ${connection.joinConfig.guildId}`);
    });

    player.on(AudioPlayerStatus.Buffering, () => {
      console.log(`Audio buffering in guild ${connection.joinConfig.guildId}`);
    });

    player.on(AudioPlayerStatus.AutoPaused, () => {
      console.log(`Audio auto-paused in guild ${connection.joinConfig.guildId}`);
    });
  }

  add(channelId, audioStream) {
    if (!this.queues.has(channelId)) {
      this.queues.set(channelId, []);
      console.log(`Created new queue for channel ${channelId}`);
    }

    this.queues.get(channelId).push(audioStream);
    console.log(`Added audio stream to queue for channel ${channelId}, queue size: ${this.queues.get(channelId).length}`);
  }

  play(connection, audioStream) {
    this.add(connection.joinConfig.channelId, audioStream);
    console.log(`Added audio stream to queue for channel ${connection.joinConfig.channelId}`);

    if (!this.playingIn.has(connection.joinConfig.channelId)) {
      console.log(`Starting playback for channel ${connection.joinConfig.channelId}`);
      this.playFromQueue(connection);
    } else {
      console.log(`Already playing in channel ${connection.joinConfig.channelId}, audio stream queued`);
    }
  }

  clear(connection) {
    this.queues.delete(connection.joinConfig.channelId);
    this.playingIn.delete(connection.joinConfig.channelId);
    this.players.get(connection.joinConfig.guildId).pause();
  }

  stop(connection) {
    this.clear(connection);
    this.players.get(connection.joinConfig.guildId).stop();
  }
}

Thank you for your time, and forgive me if I have missed something as I am new to both Discord and Javascript.

Code sample

No response

Versions

-discord.js: 14.15.3 -node: v22.2.0 -OS: Mac Ventura 13.3

Issue priority

Medium (should be fixed soon)

Which partials do you have configured?

Not applicable

Which gateway intents are you subscribing to?

Guilds, GuildMembers, GuildVoiceStates, GuildMessages, DirectMessages

I have tested this issue on a development release

No response

nyapat commented 3 months ago

Try changing the end behaviour type to AfterSilence and setting a duration

Look at this (deleted) guide for some examples

Join the discord to discuss implementation if there's no bug in the library