Mithronn / rusty_ytdl

A Rust library for Youtube video searcher and downloader
https://docs.rs/rusty_ytdl
MIT License
107 stars 20 forks source link

Feature request: Implement `futures::Stream` for NonLiveStream and LiveStream. #34

Open nick42d opened 3 months ago

nick42d commented 3 months ago

Hi there, my main use case is a music player - youtui. I am currently using the rusty_ytdl::Stream trait to be able to provide progress updates whilst downloads are in progress. However I thought it could be nice to implement the futures::Steam trait to enable the use of the adaptors in futures::StreamExt. Example below of a pattern that this could enable.

use futures::StreamExt;

let stream = ... // code to create stream
let song = stream
    .enumerate()
    .map(|(idx, chunk)| {
        println!("Downloaded chunk {idx}!");
        chunk
    }).collect::<Vec<_>>().await;
play(song);
nick42d commented 1 month ago

Hi there @Mithronn, I'm taking a look at this currently. Be keen to get your opinion on a problem I'm having.

How critical is the content_length() method and where do you think it belongs? I can see the following options:

  1. Require an intermediate struct between Video and impl futures::Stream that contains the content_length() method.
  2. Stream method can return a tuple containing (impl futures::Stream, content_length: u64)
  3. Add the content_length() method to Video itself - note that calculating the content length is also required when returning the impl futures::Stream, so calling video.content_length().await followed by video.stream().await would calculate it twice. If video.content_length() took a mutable reference, we could cache the value however.
nick42d commented 1 month ago

With a bit more playing around, this could be an option too for the existing Stream trait. This is not my recommendation, as:

    fn into_futures_stream(self) -> impl futures::Stream<Item = Result<Bytes, VideoError>>
    where
        Self: Sized,
    {
        // Second value of initialisation tuple represents if the previous iteration of
        // the stream errored. If so, stream will close, as no future iterations of
        // the stream are expected to return Ok.
        futures::stream::unfold((self, false), |(state, err)| async move {
            if err {
                return None;
            };
            let chunk = state.chunk().await;
            match chunk {
                // Return error value on this iteration, on the next iteration return None.
                Err(e) => Some((Err(e), (state, true))),
                // Happy path
                Ok(Some(bytes)) => Some((Ok(bytes), (state, false))),
                // Stream has closed.
                Ok(None) => None,
            }
        })
    }