RustAudio / rodio

Rust audio playback library
Apache License 2.0
1.8k stars 234 forks source link

Is there support for streaming audio? #159

Open krishnakumar4a4 opened 6 years ago

krishnakumar4a4 commented 6 years ago

Hi,

I have tried integrating rodio with a websocket, as and when 4096 bytes of u8 data comes in, I have tried to pass it as i16 for play_raw api. The speaker is too noisy.

let mut converted: Vec<i16> = data.into_iter().map(|i|i as i16).collect();
rodio::play_raw(&endpoint, SamplesBuffer::new(1,44100, converted).convert_samples());

The same binary stream is sent to a node js server to write to a wav file plays just fine. So the binary stream seems to be proper.

  var fileWriter = new wav.FileWriter(outFile, {
    channels: 1,
    sampleRate: 48000,
    bitDepth: 16
  });

  client.on('stream', function(stream, meta) {
    stream.pipe(fileWriter);

Is there something I am missing here to make it work with rodio?

Xaeroxe commented 6 years ago

You might find more luck by creating your own Source that can have data appended to it after being added to the sink rather than just using the play_raw api. This will allow rodio to control the pace of playback by itself.

tomaka commented 6 years ago

What do you mean exactly by "The speaker is too noisy."? Too loud?

betta-cyber commented 4 years ago

I understand what you’re saying, maybe you can try this code

let device = rodio::default_output_device().unwrap();
let sink = Sink::new(&device);
let mut decoder = Decoder::new(File::open("audio.mp3").unwrap());

let mut count = 0;
loop {
    match decoder.next_frame() {
        Ok(Frame { data, sample_rate, channels, .. }) => {
            println!("Decoded {} samples {} {}", data.len(), sample_rate, channels);
            let buff = SamplesBuffer::new(channels as u16, sample_rate as u32, data);
            sink.append(buff);
            println!("{}", count);
            while count > 26 {
                // sleep and wait for rodio to drain a bit
                count = 0;
                thread::sleep(Duration::from_millis(10));
            }
            count += 1;
            println!("finish append");
            // thread::sleep(Duration::from_secs(1));
        },
        Err(Error::Eof) => break,
        Err(e) => panic!("{:?}", e),
    }
}
sink.sleep_until_end()
bernhardfritz commented 4 years ago

I'm kinda new to rust so I don't know if this is the best way of doing it, but I had success with audio streaming in a pet project of mine using https://github.com/bernhardfritz/pitunes/blob/master/pitunes_client/src/http_stream_reader.rs

Hygdrasil commented 9 months ago

Hey, I'm a little bit late to the party. With the symphonia -Decoder i got streamin to work! But at the moment it seams that rodio does not allow symphonia to select the audio format by it self. Is there any reasen behind that? If not I could make a pull request. I added this funktio to decoder/mod.rs and it seams to work for my usecase.

    pub fn new_symphonia_no_hint(data: R) -> Result<Decoder<R>, DecoderError> {
        let mss = MediaSourceStream::new(
            Box::new(ReadSeekSource::new(data)) as Box<dyn MediaSource>,
            Default::default(),
        );

        match symphonia::SymphoniaDecoder::new(mss, None) {
            Err(e) => Err(e),
            Ok(decoder) => {
                return Ok(Decoder(DecoderImpl::Symphonia(decoder)));
            }
        }
    }```

    Hear is the rest of the code to get it up and running. Please  be aware that I`m new to rust and this is still work in progress. Alsow the stream can not seek so any attempt will result in a panic.

use std::fs::File; use std::io::BufReader; use ringbuf::ring_buffer::RbBase; use rodio::{Decoder, OutputStream, Sink}; use tokio::sync::broadcast; use std::sync::{Arc, Mutex, mpsc}; use bytes::Bytes; use ringbuf::{Rb, StaticRb};

async fn play_url(){ let (data_sender, receiver) = mpsc::channel(); let _handle = play_chunks_in_thread(receiver); fetch_data(data_sender).await; }

async fn fetch_data(data_sender: mpsc::Sender){ let client = reqwest::Client::new();

// Make a request to the stream URL
let url = "http://regiocast.streamabc.net/regc-radiobobmittelalter7445036-mp3-192-8025500?sABC=65p6s021%230%232qqpnss01895rqr0s8oq129o03s183o0%23fgernzf.enqvbobo.qr&aw_0_1st.playerid=streams.radiobob.de&amsparams=playerid:streams.radiobob.de;skey:1707536417"; // Replace with your stream URL
let mut respons = client.get(url).send().await.expect("url error");

// Check if the request was successful

    // Check if the request was successful
if !respons.status().is_success() {
    panic!("Request failed with status: {}", respons.status());
}

while let Some(chunk) = respons.chunk().await.expect("chunk error") {
    println!("Read {} bytes from stream", chunk.len());
    data_sender.send(chunk).expect("sending failed");
}

}

pub struct WebMediaSource { receiver: Arc<Mutex<mpsc::Receiver>>, ring_buffer: ringbuf::StaticRb<u8, 10240>, }

impl WebMediaSource { pub fn new(receiver: mpsc::Receiver) -> Self { Self { receiver: Arc::new(Mutex::new(receiver)), ring_buffer: StaticRb::<u8, 10240>::default()} } }

impl symphonia::core::io::MediaSource for WebMediaSource { fn is_seekable(&self) -> bool{ false } fn byte_len(&self) -> Option{ None } }

impl std::io::Read for WebMediaSource { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { if self.ring_buffer.is_empty() { let receiver = self.receiver.lock().expect("Failed to lock receiver");

        match receiver.recv() {
            Ok(bytes) => {
                let len = *[bytes.len(), buf.len()].iter().min().unwrap();
                buf[..len].copy_from_slice(&bytes[..len]);
                if len < bytes.len(){
                    let mut writer = bytes[len..].into_iter().map(|v| *v);
                    self.ring_buffer.push_iter(&mut writer);
                    if !writer.next().is_none() {
                        println!("data is dropped");
                    }
                }
                return Ok(len);
            }
            Err(_) => {return Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "Receive timeout"));}
        }

    }

    let start_size = self.ring_buffer.capacity() - self.ring_buffer.free_len();
    let len = *[start_size, buf.len()].iter().min().unwrap();
    self.ring_buffer.pop_slice(&mut buf[0..len]);
    let end_size = self.ring_buffer.capacity() - self.ring_buffer.free_len();
    let filled = start_size - end_size;
    Ok(filled)
}

}

impl std::io::Seek for WebMediaSource { fn seek(&mut self, _pos: std::io::SeekFrom) -> std::io::Result{ Err(std::io::Error::other("not seekable")) } }

fn play_chunks_in_thread(data_receiver: mpsc::Receiver) -> tokio::task::JoinHandle<()> {

tokio::task::spawn_blocking(
    {move || {

        let (_stream, stream_handle) = OutputStream::try_default().unwrap();

        let source = Decoder::new_symphonia_no_hint(WebMediaSource::new(data_receiver)).unwrap();
        let sink = Sink::try_new(&stream_handle).unwrap();
        sink.append(source);
        sink.play();
        sink.sleep_until_end();
}})

}