smol-rs / futures-lite

Futures, streams, and async I/O combinators.
Apache License 2.0
427 stars 25 forks source link

Add `StreamExt::take_until` #99

Closed nanoqsh closed 1 month ago

nanoqsh commented 3 months ago

How about adding a similar method? https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.take_until

This can be convenient for a stream cancellation.

nanoqsh commented 2 months ago

I tried to express this combinator using an existing API

async fn run() {
    use {
        futures_lite::{stream, StreamExt},
        std::{pin, time::Duration},
        tokio::time,
    };

    // an origin stream
    let stream = stream::iter(0..5).then(|i| async move {
        time::sleep(Duration::from_secs(1)).await;
        i
    });

    // wrap items in option, I need some mark that the original stream is finished
    let stream = stream.map(Some).chain(stream::once(None));

    // instead of `take_until` it will be a stream with a single item
    let cancel = stream::once_future(async {
        tokio::signal::ctrl_c().await.unwrap();
        println!("cancelled");
        None
    });

    // merge two streams
    let stream = stream
        .or(cancel)
        .take_while(|opt| opt.is_some())
        .map(|opt| opt.unwrap());

    // use the final stream
    let mut stream = pin::pin!(stream);
    while let Some(item) = stream.next().await {
        println!("item {item}");
    }
}

Overall it's pretty acceptable to me. It would be especially cool to replace .take_while(|opt| opt.is_some()).map(|opt| opt.unwrap()) with .map_while(|opt| opt).

But I think it would be simpler and more convenient to have the first class method.

LebranceBW commented 1 month ago

Inspired by your idea, I proposed this PR. BTW, code review wanted.