smol-rs / futures-lite

Futures, streams, and async I/O combinators.
Apache License 2.0
449 stars 26 forks source link

Add a Stream::try_iter() method #70

Closed notgull closed 7 months ago

notgull commented 1 year ago

This PR adds a try_iter method to StreamExt that consumes all of the values until it gets Pending, at which point it returns None. The goal is to get a method similar to Receiver::try_iter() on the async_channel::Receiver type that would allow users to consume all of the values in the channel without waiting.

taiki-e commented 1 year ago

I'm confused about this is named _iter. Implementation has no relationship to the Iterator trait.

notgull commented 1 year ago

I basically just took the name from libstd's try_iter. Maybe a better name for this would be s.ready() or s.immediate()?

taiki-e commented 1 year ago

poll_immediate and now_or_never in futures are somewhat similar to this, but I don't think they are exactly the same.

Well, this doesn't even require implementing Stream in the first place. Implementing Iterator might actually work in more contexts.

notgull commented 1 year ago

Well, this doesn't even require implementing Stream in the first place. Implementing Iterator might actually work in more contexts.

Unfortunately we still need a Context to pass down to the underlying Stream, just like for future::poll_once(). We could add functionality to create a "no-op waker" for this if we wanted to.

taiki-e commented 1 year ago

Unfortunately we still need a Context to pass down to the underlying Stream, just like for future::poll_once().

Oh, good point. I forgot that because now_or_never uses noop_context.

fogti commented 1 year ago

Usage of this would probably introduce unnecessary spurious wake-ups due to the lack of smth. like noop_context.

notgull commented 1 year ago

Yes, it would be nice to have a noop_waker or a panic_waker for futures that are guaranteed to not return Pending. Unfortunately defining that using only safe code requires an unnecessary allocation.

fogti commented 1 year ago

Could the allocation be cached (similar to the caching in block_on)?

notgull commented 1 year ago

It could, but you would still need an atomic operation to clone it out of the thread local. While this is acceptable overhead for block_on, I imagine that try_iter() would be called much more frequently.

I don't think there's anything wrong with just passing try_iter() to stream::block_on, just like poll_once() is passed to future::block_on or spin_on.

notgull commented 1 year ago

It could, but you would still need an atomic operation to clone it out of the thread local. While this is acceptable overhead for block_on, I imagine that try_iter() would be called much more frequently.

I don't think there's anything wrong with just passing try_iter() to stream::block_on, just like poll_once() is passed to future::block_on or spin_on.

fogti commented 7 months ago

I think the concern about the name still applies here. I'd propose try_stream in analogue to try_iter, but idk if that's already in use...

fogti commented 7 months ago

Also, what about pinning? How does this interact with that?

notgull commented 7 months ago

Also, what about pinning? How does this interact with that?

It only takes Unpin streams. However it's possible to make it take a !Unpin stream by first pinning the stream to the stack.

let x = stream::once_future(async { 1 });
pin!(x);
let y = x.try_stream();
fogti commented 7 months ago

Error: cannot find function block_on in module futures_lite::future