carllerche / syncbox

Concurrency utilities for Rust
131 stars 14 forks source link

Implement `collect()` #13

Closed Hoverbear closed 9 years ago

Hoverbear commented 9 years ago

Implement fn collect(self) -> Future<Vec<T>, E>.

The new reduce() test currently fails due to a bug.

#[test]
pub fn test_stream_reduce_fail() {
    let (tx, rx) = Stream::pair();
    tx.send(1)
        .and_then(|tx| tx.send(2))
        .and_then(|tx| tx.send(3))
        .and_then(|tx| tx.fail(()));

    let reduced = rx.reduce(0, move |sum, v| sum + v);
    assert_eq!(Err(ExecutionError(())), reduced.await());
}
test util::async::test_stream_reduce::test_stream_reduce_async ... ok
stack backtrace:
   1:        0x101e3741d - sys::backtrace::write::h285c537ffe0ac504uFA
   2:        0x101e4b0fc - panicking::on_panic::ha47b1a2e51240934wuJ
   3:        0x101e1ce29 - rt::unwind::begin_unwind_inner::h406ea3a6d9701b4c2cJ
   4:        0x101e1d2ae - rt::unwind::begin_unwind_fmt::hb4b4902ebedc094dDbJ
   5:        0x101cca45e - util::async::core::CoreInner<T, E>::cancel::h1884414932182747558
   6:        0x101cc9986 - util::async::core::Core<T, E>::cancel::h3934921991654831371
   7:        0x101cc989e - util::async::stream::Stream<T, E>.Drop::drop::h4838054013572503276
   8:        0x101cc9808 - syncbox..util..async..stream..Stream<i32, ()>::glue_drop.9100::hd67f0980cb9770a9
   9:        0x101cd3dfb - core..result..Result<core..result..Result<core..option..Option<(i32, syncbox..util..async..stream..Stream<i32, ()>)>, syncbox..util..async..AsyncError<()>>, syncbox..util..async..stream..Stream<i32, ()>>::glue_drop.9147::h7e41735952cabbfb
  10:        0x101d9e515 - util::async::Async::receive::closure.11435
  11:        0x101d9e18c - util::async::stream::Stream<T, E>.Async::ready::closure.11430
  12:        0x101d9e2eb - util::async::F.BoxedReceive<T>::receive_boxed::h11875229223624517796
  13:        0x101cce7a1 - util::async::core::CoreInner<T, E>::notify_consumer_loop::h16532454760153589256
  14:        0x101d9df62 - util::async::core::CoreInner<T, E>::consumer_ready::h17417467204453872860
  15:        0x101d9d74b - util::async::core::Core<T, E>::consumer_ready::h1755553782673583562
  16:        0x101d9d56f - util::async::stream::Stream<T, E>.Async::ready::h3694238093183683983
  17:        0x101d9d48f - util::async::Async::receive::h18258753405855367321
  18:        0x101d9d34e - util::async::Async::handle::closure.11420
  19:        0x101d9d193 - util::async::Async::receive::closure.11417
  20:        0x101d9cebc - util::async::future::Complete<T, E>::ready::closure.11412
  21:        0x101d9d01b - util::async::F.BoxedReceive<T>::receive_boxed::h16717628162605047191
  22:        0x101b99251 - util::async::core::CoreInner<T, E>::notify_producer_loop::h2939863308080537748
  23:        0x101b988ef - util::async::core::CoreInner<T, E>::notify_producer::h4856555928893511681
  24:        0x101bb92ec - util::async::core::CoreInner<T, E>::consumer_wait::h2144405134075819597
  25:        0x101bbe112 - util::async::core::CoreInner<T, E>::consumer_ready::h11541565166904975099
  26:        0x101bbd748 - util::async::core::Core<T, E>::consumer_await::h18066904289756898668
  27:        0x101bbd374 - util::async::future::Future<T, E>.Async::await::h8799936698841210199
  28:        0x101d86ad0 - util::async::test_stream_reduce::test_stream_reduce_fail::h6850e2bc91f4aa3e9md
  29:        0x101df05b3 - thunk::F.Invoke<A, R>::invoke::h3379517438723955871
  30:        0x101dfa4e1 - thunk::F.Invoke<A, R>::invoke::h12145351053418619277
  31:        0x101df185b - thunk::F.Invoke<A, R>::invoke::h8214283204092294449
  32:        0x101df1d0f - rt::unwind::try::try_fn::h5942731328424611608
  33:        0x101e4cfd8 - rust_try_inner
  34:        0x101e4cfc5 - rust_try
  35:        0x101df252f - thunk::F.Invoke<A, R>::invoke::h3998061840641417267
  36:        0x101e3a312 - sys::thread::thread_start::h46962d4438d4c5b2N4E
  37:     0x7fff90710267 - _pthread_body
  38:     0x7fff907101e4 - _pthread_start
thread panicked while panicking. aborting.
Process didn't exit successfully: `/Users/hoverbear/git/syncbox/target/debug/test-9963656f2619ca26` (signal: 4)
carllerche commented 9 years ago

Thanks! I got this merged: 7948fd9f0783c849d7c2d77b7847a72da6edb90c