austinjones / postage-rs

The feature-rich, portable async channel library
MIT License
253 stars 9 forks source link

`MergeStream` does not work correctly with one stream `Pending`, one stream `Closed` #59

Open seritools opened 1 year ago

seritools commented 1 year ago

Extending from the swap_closed test in merge.rs:

#[test]
fn swap_closed_pending() {
    let left = from_poll_iter(vec![PollRecv::Closed, PollRecv::Closed, PollRecv::Closed]);
    let right = from_poll_iter(vec![PollRecv::Ready(1), PollRecv::Pending, PollRecv::Ready(2)]);
    let mut find = MergeStream::new(left, right);

    let mut cx = Context::empty();

    assert_eq!(PollRecv::Ready(1), Pin::new(&mut find).poll_recv(&mut cx));

    // ↓ this fails because it returns PollRecv::Closed instead
    assert_eq!(PollRecv::Pending, Pin::new(&mut find).poll_recv(&mut cx));
    assert_eq!(PollRecv::Ready(2), Pin::new(&mut find).poll_recv(&mut cx));
    assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx));
}

It seems that, whenever MergePoll::First returns Pending (or Closed), it just falls back to the result of MergePoll::Second, which causes the entire MergeStream to return Closed if Second did (even if First was pending!). This violates the documented behavior (emphasis mine):

Merges two streams, returning values from both at once, until both are closed.