serenity-rs / songbird

An async Rust library for the Discord voice API
ISC License
384 stars 110 forks source link

Random Playback Speed Issues #180

Closed maxall41 closed 1 year ago

maxall41 commented 1 year ago

Songbird version: Custom based off of 0.3.2

Rust version (rustc -V): 1.69.0

Serenity/Twilight version: Serenity: 0.11.5 ...

Description: I'm trying to write a custom reader that streams files into a buffer as chunks so i don't have to frontload the entire file and then start playing it i created the following custom reader inside of the Songbird source code todo this:

impl StreamFromURL {
    pub fn new(source: Cursor<Vec<u8>>,data_source: &str,chunk_size: u64,safety: u64) -> StreamFromURL {
        return StreamFromURL {
            // TODO: Replace with Dashmap Hashmap so multiple workers can be working on the next chunk(s) concurrently
            internal_buffer: Arc::new(RwLock::new(Cursor::new(Vec::new()))),
            running_buffer: source,
            safety: safety,
            bytes_read: Arc::new(RwLock::new(0)),
            data_source: Arc::new(RwLock::new(data_source.to_string())),
            chunk_size: Arc::new(RwLock::new(chunk_size)),
            buffer_processing: Arc::new(RwLock::new(false)),
            current_pos: Arc::new(RwLock::new(0)),
            chunks_read: Arc::new(RwLock::new(1))
        }
    }
    fn read(&mut self, mut buffer: &mut [u8]) -> std::io::Result<usize> {
        let bytes_read_c = *self.bytes_read.as_ref().read().unwrap();
        if bytes_read_c > *self.chunk_size.as_ref().read().unwrap() - self.safety && *self.buffer_processing.read().unwrap() == false  {
            *self.buffer_processing.write().unwrap() = true;
            let t_datasource = self.data_source.clone();
            let t_bytes_read = self.bytes_read.clone();
            let t_chunk_size = self.chunk_size.clone();
            let mut t_internal_buffer = self.internal_buffer.clone();
            let t_chunks_read = self.chunks_read.clone();
            let t_buffer_processing = self.buffer_processing.clone();
            //TODO: Probably more performant as tokio task
            thread::spawn(move || {
                debug!("Need new buffer! Current LEN: {}",t_internal_buffer.read().unwrap().byte_len().unwrap());
                let current_chunk_end = *t_chunk_size.read().unwrap() * *t_chunks_read.read().unwrap();
                let range = HeaderValue::from_str(&format!("bytes={}-{}",  current_chunk_end + 1, current_chunk_end + *t_chunk_size.read().unwrap())).expect("string provided by format!");
                debug!("RANGE: {:?}",range);
                let client = reqwest::blocking::Client::new();
                let url = t_datasource.read().unwrap().to_string();
                let resp = client.get(url).header(RANGE, range).send().unwrap();
                let mut x = vec![];
                let r = resp.bytes().unwrap().reader().read_to_end(&mut x);
                let y : &[u8] = &x;
                debug!("Writing all, READ NEW: {}",r.unwrap());
                t_internal_buffer.write().unwrap().seek(SeekFrom::Start(0)).unwrap();
                t_internal_buffer.write().unwrap().write_all(y).unwrap();
                t_internal_buffer.write().unwrap().seek(SeekFrom::Start(0)).unwrap();
                debug!("Wrote all! New Total LEN: {}",&t_internal_buffer.read().unwrap().byte_len().unwrap());
                *t_bytes_read.write().unwrap() = 0;
                *t_chunks_read.write().unwrap() += 1;
            });
        }
        let bytes_read = self.running_buffer.read(&mut buffer).unwrap();
        *self.current_pos.write().unwrap() = self.running_buffer.stream_position().unwrap();
        *self.bytes_read.write().unwrap() += bytes_read as u64;
        let swap_size = self.running_buffer.byte_len().unwrap();
        if *self.current_pos.read().unwrap() == swap_size {
            self.running_buffer = self.internal_buffer.read().unwrap().clone();
            self.running_buffer.seek(SeekFrom::Start(0)).unwrap();
            self.internal_buffer = Arc::new(RwLock::new(Cursor::new(Vec::new())));
            *self.buffer_processing.write().unwrap() = false;
        }

        return Ok(bytes_read);
    }
    fn seek(&mut self,pos: SeekFrom) -> u64 {
        return self.running_buffer.seek(pos).unwrap();
    }
    fn is_seekable(&self) -> bool {
        return self.running_buffer.is_seekable();
    }
}

And a custom source:

use std::io::{Cursor, Read, Seek};
use std::{fmt, thread};
use std::time::Duration;
use bytes::Buf;
use kafka::producer::Producer;
use lofty::{AudioFile, mpeg, ogg, ParseOptions};
use lofty::iff::wav;
use lofty::iff::wav::WavProperties;
use reqwest::header::{HeaderValue, RANGE};
use songbird::Call;
use songbird::input::{Codec, Container, Input, Metadata, Reader};
use songbird::input::codec::OpusDecoderState;
use songbird::input::reader::StreamFromURL;
use symphonia_core::io::{ReadOnlySource};
use tokio::runtime::Builder;
use tokio::sync::MutexGuard;
use tokio::time;
use crate::worker::queue_processor::ErrorReport;
use crate::worker::sources::helpers::{get_extension_from_uri, lofty_wav_codec_to_songbird_codec};
use snafu::{ResultExt, Snafu, Whatever, whatever};

/// Basic URL Player that downloads files from URLs into memory and plays them
pub async fn url_source(url: &str) -> Result<Input, Whatever> {
    // let chunk_size = 250000; // Chunk = 250KB
    let chunk_size = 10000000;
    let range = HeaderValue::from_str(&format!("bytes={}-{}", 0, &chunk_size)).with_whatever_context(|_| format!("Failed to generate range header"))?;
    let client = reqwest::Client::new();
    let resp = client.get(url).header(RANGE, range).send().await.with_whatever_context(|_| format!("Failed to read file from URL"))?;
    let mut pre : Vec<u8> = vec![];

    let bytes = resp.bytes().await.with_whatever_context(|_| format!("Failed to read bytes from file. Malformed?"))?.clone();
    let metadata_bytes = bytes.clone(); // This is required because for some reason read_to_end breaks the pre-buf symph

    metadata_bytes.reader().read_to_end(&mut pre).with_whatever_context(|_| format!("Failed to read bytes into in memory file buffer"))?;
    let mock_file : Cursor<Vec<u8>> = Cursor::new(pre);

    let format = get_extension_from_uri(url);
    let mut metadata: Option<Metadata> = None;
    let mut mfp = mock_file.clone();
    let mut stereo = false;
    let mut codec : Option<Codec> = None;
    let mut container = Container::Raw;
    match format.as_str() {
        "wav" => {
            let parsing_options = ParseOptions::new();
            let tagged_file = wav::WavFile::read_from(&mut mfp, parsing_options).unwrap();
            let properties = tagged_file.properties();
            metadata = Some(Metadata {
                track: None,
                artist: None,
                date: None,
                channels: Some(properties.channels()),
                channel: None,
                start_time: None,
                duration: Some(properties.duration()),
                sample_rate: Some(properties.sample_rate()),
                source_url: None,
                title: None,
                thumbnail: None,
            });
            codec = Some(lofty_wav_codec_to_songbird_codec(properties.format()));
            stereo = properties.channels() >= 2
        },
        // "mp3" => {
            // We may support this in the future it is not currently supported because songbird does
            // not support the LAME codec
        // },
        _ => whatever!("Invalid file format. Valid file formats are: .wav and .ogg. using .ogg is recommended as .wav will be downscaled to .ogg internally to support the discord api at the cost of extra memory and cpu cycles.")
    }

    let x =  Input {
        metadata: Box::new(metadata.unwrap()),
        stereo: stereo,
        // reader: Reader::StreamForURL(StreamFromURL::new(mock_file,url, chunk_size,250000)),
        reader: Reader::Extension(Box::new(mock_file)),
        kind: codec.unwrap(),
        container: container,
        pos: 0,
    };
    println!("{:?}",x);
    return Ok(x);
}

And it works pretty well except for an issue where for certain files it will play back at 5x speed or 2x speed or just slightly faster but it always seems to be faster and not slower. This does not happen if i load these files with ffmpeg but only when using my custom implementation. But as you can see in the above implemented reader i have also tested this with the default Reader::Extension type and I'm having the exact same issue. ...

Steps to reproduce:

  1. Create custom source using above code with Reader::Extension
  2. Use this URL to reproduce: https://www.ee.columbia.edu/~dpwe/sounds/music/africa-toto.wav ...
FelixMcFelix commented 1 year ago

You will probably be better off using the next branch which has built in support for HTTP sources without using ffmpeg.

I am 99% sure that the issue is due to sample rate -- songbird current requires a sample rate of 48 kHz as documented on all the relevant Input structs. Next handles this for you, otherwise you will need to look into a crate like rubato for correct audio resampling.

EDIT: The file you've linked has a sample rate of 22050 Hz, thus will sound sped up by ~2.18x. Other common sample rates include 44.1 kHz (~1.09x), 11025 Hz (~4.36x), 8 kHz (6x).

maxall41 commented 1 year ago

Oh. Yeah I looked into sample rate but couldn't find any thing in the docs. My bad. Thanks for your help!