yoshuawuyts / futures-concurrency

Structured concurrency operations for async Rust
https://docs.rs/futures-concurrency
Apache License 2.0
413 stars 33 forks source link

`TryJoin` on tuple trying to free a null pointer #155

Closed fogodev closed 1 year ago

fogodev commented 1 year ago

I've been using futures_concurrency crate as much as I can at Spacedrive, I prefer its functions and traits approach better than the macros approach of futures crate. But I was having a weird crash problem with a TryJoin. I was able to reproduce a minimal example as follows.

use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};

async fn process(iter: impl IntoIterator<Item = &i32>, fail: bool) -> Result<Vec<i32>, Vec<()>> {
    if fail {
        return Err(vec![]);
    } else {
        sleep(Duration::from_secs(5)).await;
    }

    Ok(iter
        .into_iter()
        .map(|i| ready(*i))
        .collect::<Vec<_>>()
        .join()
        .await)
}

#[tokio::main]
async fn main() -> Result<(), Vec<()>> {
    let v = (0..10).collect::<Vec<_>>();

    (
        process(v.iter().take(5), true),
        process(v.iter().take(0), false),
    )
        .try_join()
        .await?;

    Ok(())
}
image

On MacOS it fires a SIGABRT and on Linux a SIGSEGV (saying that the faulty address is 0x0)

matheus-consoli commented 1 year ago

even stranger, if you use a slight modified version, you trigger an infinite recursion:

use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};

async fn process(
    iter: impl IntoIterator<Item = &LoudDrop<i32>>,
    fail: bool,
) -> Result<Vec<LoudDrop<i32>>, Vec<()>> {
    if fail {
        return Err(vec![]);
    } else {
        sleep(Duration::from_secs(5)).await;
    }

    Ok(iter
        .into_iter()
        .map(|i| ready(i.clone()))
        .collect::<Vec<_>>()
        .join()
        .await)
}

#[derive(Clone)]
struct LoudDrop<T>(T);
impl<T> Drop for LoudDrop<T> {
    fn drop(&mut self) {
        println!("DROP");
    }
}

#[tokio::main]
async fn main() -> Result<(), Vec<()>> {
    let v = (0..4).map(LoudDrop).collect::<Vec<_>>();

    // 
    // COMMENT THIS LINE AND YOU GET A SEGFAULT
    // UNCOMMENT THIS LINE AND YOU GET AN INFINITE RECURSION
    //
    println!("{}", v.len());

    (
        process(v.iter().take(2), true),
        process(v.iter().take(0), false),
    )
        .try_join()
        .await?;

    Ok(())
}
matheus-consoli commented 1 year ago

miri is detecting UB in the drop_initialized_values, here: https://github.com/yoshuawuyts/futures-concurrency/blob/a2a7f6a17ada682e01a0096e0b2a223c886eba39/src/future/try_join/tuple.rs#L72

miri output ```rust error: Undefined Behavior: using uninitialized data, but this operation requires initialized memory --> /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/raw_vec.rs:223:9 | 223 | self.ptr.as_ptr() | ^^^^^^^^ using uninitialized data, but this operation requires initialized memory | = help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior = help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information = note: BACKTRACE: = note: inside `alloc::raw_vec::RawVec::>::ptr` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/raw_vec.rs:223:9: 223:17 = note: inside `std::vec::Vec::>::as_mut_ptr` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs:1324:9: 1324:23 = note: inside `> as std::ops::Drop>::drop` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs:3064:62: 3064:79 = note: inside `std::ptr::drop_in_place::>> - shim(Some(std::vec::Vec>))` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1: 497:56 = note: inside `std::mem::MaybeUninit::>>::assume_init_drop` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/maybe_uninit.rs:728:18: 728:55 = note: inside ` as pin_project::__private::PinnedDrop>::drop::__drop_inner::<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec>, std::vec::Vec<()>>` at /home/consoli/projects/rust/contrib/futures-concurrency/src/future/try_join/tuple.rs:72:22: 72:48 = note: inside `>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec>, std::vec::Vec<()>> as pin_project::__private::PinnedDrop>::drop` at /home/consoli/projects/rust/contrib/futures-concurrency/src/future/try_join/tuple.rs:263:9: 263:23 = note: inside `futures_concurrency::future::try_join::tuple::_::>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec>, std::vec::Vec<()>>>::drop` at /home/consoli/projects/rust/contrib/futures-concurrency/src/future/try_join/tuple.rs:174:23: 174:33 = note: inside `std::ptr::drop_in_place::>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec>, std::vec::Vec<()>>> - shim(Some(futures_concurrency::future::try_join::tuple::TryJoin2<[async fn body@src/main.rs:8:42: 21:2], std::vec::Vec>, [async fn body@src/main.rs:8:42: 21:2], std::vec::Vec>, std::vec::Vec<()>>))` at /home/consoli/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:497:1: 497:56 note: inside closure --> src/main.rs:42:14 | 42 | .await?; | ^ = note: inside closure at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:63: 282:87 = note: inside `tokio::runtime::coop::with_budget::>>, [closure@tokio::runtime::park::CachedParkThread::block_on<[async block@src/main.rs:31:1: 31:15]>::{closure#0}]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:107:5: 107:8 = note: inside `tokio::runtime::coop::budget::>>, [closure@tokio::runtime::park::CachedParkThread::block_on<[async block@src/main.rs:31:1: 31:15]>::{closure#0}]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:73:5: 73:38 = note: inside `tokio::runtime::park::CachedParkThread::block_on::<[async block@src/main.rs:31:1: 31:15]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:31: 282:88 = note: inside `tokio::runtime::context::blocking::BlockingRegionGuard::block_on::<[async block@src/main.rs:31:1: 31:15]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/blocking.rs:66:9: 66:25 = note: inside closure at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:87:13: 87:38 = note: inside `tokio::runtime::scheduler::multi_thread::MultiThread::block_on::<[async block@src/main.rs:31:1: 31:15]>` at /home/consoli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:86:9: 88:11 note: inside `main` --> src/main.rs:44:5 | 44 | Ok(()) | ^^^^^^ note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace error: aborting due to previous error```
fogodev commented 1 year ago

Also, worth to note that these errors are only happening with Tokio for some reason, just tested my example with async_std and worked just fine.

Edit: Also tested with smol and futures crate executor, also crashes with them. The unsafe usage that @matheus-consoli mentioned gives that the outputs are only being properly initialized with async_std crate, with the other executors it's trying to drop invalid data.

yoshuawuyts commented 1 year ago

Hey, thanks for reporting this!

Im not behind a computer right now, but from reading the code it seems we may be setting the wrong state on line 36 of try_join.rs. We're tagging the entry as "ready" rather than "completed", and then dropping the data. This leads to a double-drop, which is what I believe Miri is finding.

We do want to drop in-line, so I think what we should do is to instead mark it as "completed" and the drop it inline. If anyone wants to try that out and see if it still fails that would be helpful. Otherwise I can give this a try when I'm back at work next week.

fogodev commented 1 year ago

Tested here, calling $this.state[$fut_idx].set_none(); just after the ManuallyDrop::drop line solved this issue. Setting the PollState to none was the closest thing I could find to mark it as "completed" instead of "ready". Introduced this line on both join and try_join for tuples.

I can help it further, just need some guidance on how to proceed. I'm putting here the tests output. Most of them were just expecting a "ready" state and should be simple to fix for a "none" state or for a new "completed" state, the ones that worried me more are the ones for leaking stuff, namely the assertion at https://github.com/yoshuawuyts/futures-concurrency/blob/a2a7f6a17ada682e01a0096e0b2a223c886eba39/src/future/try_join/tuple.rs#L395-L397

Unit tests ```rust running 59 tests test future::join::array::test::debug ... ok test future::join::array::test::empty ... ok test future::join::vec::test::debug ... ok test future::join::array::test::smoke ... ok test future::join::tuple::test::join_0 ... ok test future::future_group::test::smoke ... ok test future::join::vec::test::empty ... ok test future::join::vec::test::smoke ... ok test future::race::array::test::no_fairness ... ok test future::join::tuple::test::does_not_leak_memory ... FAILED test future::race::tuple::test::race_1 ... ok test future::join::tuple::test::join_3 ... FAILED test future::join::tuple::test::join_1 ... FAILED test future::join::tuple::test::join_2 ... FAILED test future::race::tuple::test::race_2 ... ok test future::race::tuple::test::race_3 ... ok test future::race::vec::test::no_fairness ... ok test future::race_ok::array::test::all_err ... ok test future::race_ok::array::test::all_ok ... ok test future::race_ok::array::test::one_err ... ok test future::race_ok::tuple::test::race_ok_1 ... ok test future::race_ok::tuple::test::race_ok_2 ... ok test future::race_ok::tuple::test::race_ok_3 ... ok test future::race_ok::tuple::test::race_ok_err ... ok test future::race_ok::vec::test::all_err ... ok test future::race_ok::vec::test::all_ok ... ok test future::race_ok::vec::test::one_err ... ok test future::try_join::array::test::all_ok ... ok test future::try_join::array::test::empty ... ok test future::try_join::array::test::one_err ... ok test future::try_join::tuple::test::all_ok ... FAILED test future::try_join::tuple::test::does_not_leak_memory ... FAILED test future::try_join::tuple::test::issue_135_resume_after_completion ... FAILED test future::try_join::tuple::test::one_err ... ok test future::try_join::vec::test::all_ok ... ok test future::try_join::vec::test::empty ... ok test future::try_join::vec::test::one_err ... ok test stream::chain::array::tests::chain_3 ... ok test stream::chain::tuple::tests::chain_3 ... ok test stream::chain::vec::tests::chain_3 ... ok test stream::merge::array::tests::merge_array_2x2 ... ok test stream::merge::array::tests::merge_array_4 ... ok test stream::merge::tuple::tests::merge_tuple_0 ... ok test stream::merge::array::tests::merge_channels ... FAILED test stream::merge::tuple::tests::merge_channels ... FAILED test stream::merge::tuple::tests::merge_tuple_1 ... ok test stream::merge::tuple::tests::merge_tuple_2 ... ok test stream::merge::tuple::tests::merge_tuple_3 ... ok test stream::merge::tuple::tests::merge_tuple_4 ... ok test stream::merge::vec::tests::merge_vec_2x2 ... ok test stream::merge::vec::tests::merge_vec_4 ... ok test stream::stream_group::test::smoke ... ok test stream::merge::vec::tests::merge_channels ... FAILED test stream::zip::array::tests::zip_array_3 ... ok test stream::zip::tuple::tests::zip_tuple_3 ... ok test utils::poll_state::vec::tests::boxed_does_not_allocate_twice ... ok test stream::zip::vec::tests::zip_array_3 ... ok test utils::poll_state::vec::tests::type_size ... ok test utils::wakers::vec::readiness_vec::test::resize ... ok failures: ---- future::join::tuple::test::does_not_leak_memory stdout ---- thread 'future::join::tuple::test::does_not_leak_memory' panicked at 'assertion failed: *flag.borrow()', src/future/join/tuple.rs:367:13 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace ---- future::join::tuple::test::join_3 stdout ---- thread 'future::join::tuple::test::join_3' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13 ---- future::join::tuple::test::join_1 stdout ---- thread 'future::join::tuple::test::join_1' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13 ---- future::join::tuple::test::join_2 stdout ---- thread 'future::join::tuple::test::join_2' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13 ---- future::try_join::tuple::test::all_ok stdout ---- thread 'future::try_join::tuple::test::all_ok' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13 ---- future::try_join::tuple::test::does_not_leak_memory stdout ---- thread 'future::try_join::tuple::test::does_not_leak_memory' panicked at 'assertion failed: *flag.borrow()', src/future/try_join/tuple.rs:398:13 ---- future::try_join::tuple::test::issue_135_resume_after_completion stdout ---- thread 'future::try_join::tuple::test::issue_135_resume_after_completion' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13 ---- stream::merge::array::tests::merge_channels stdout ---- thread 'stream::merge::array::tests::merge_channels' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13 ---- stream::merge::tuple::tests::merge_channels stdout ---- thread 'stream::merge::tuple::tests::merge_channels' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13 ---- stream::merge::vec::tests::merge_channels stdout ---- thread 'stream::merge::vec::tests::merge_channels' panicked at 'Future should have reached a `Ready` state', src/utils/poll_state/array.rs:29:13 failures: future::join::tuple::test::does_not_leak_memory future::join::tuple::test::join_1 future::join::tuple::test::join_2 future::join::tuple::test::join_3 future::try_join::tuple::test::all_ok future::try_join::tuple::test::does_not_leak_memory future::try_join::tuple::test::issue_135_resume_after_completion stream::merge::array::tests::merge_channels stream::merge::tuple::tests::merge_channels stream::merge::vec::tests::merge_channels test result: FAILED. 49 passed; 10 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.01s error: test failed, to rerun pass `--lib` ```
yoshuawuyts commented 1 year ago

I'm trying to reproduce this bug today. And I believe the bug may actually be in Join, not TryJoin. The following test case fails:

use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};

async fn process_not_fail() -> Result<Vec<i32>, ()> {
    sleep(Duration::from_millis(100)).await;
    Ok(vec![ready(1), ready(2)].join().await)
}

async fn process_fail() -> Result<Vec<i32>, ()> {
    Err(())
}

#[tokio::test]
async fn test() {
    let res = (process_fail(), process_not_fail()).try_join().await;
    assert!(res.is_err());
}

But if we replace Vec::join with Array::join, then the error stops triggering:

use futures_concurrency::future::{Join, TryJoin};
use std::future::ready;
use tokio::time::{sleep, Duration};

async fn process_not_fail() -> Result<[i32; 2], ()> {
    sleep(Duration::from_millis(100)).await;
    Ok([ready(1), ready(2)].join().await) // <- changed this line
}

async fn process_fail() -> Result<[i32; 2], ()> {
    Err(())
}

#[tokio::test]
async fn test() {
    let res = (process_fail(), process_not_fail()).try_join().await;
    assert!(res.is_err());
}

I want to see if I can further reduce this to see whether I can trigger it without using try_join later on.

yoshuawuyts commented 1 year ago

If we replace the try join of the tuple with a try join of an array, we trigger the same error. This should make this easier to debug since we're no longer needing to track down the source of the error through macro frames in the backtrace.

use futures_concurrency::future::{Join, TryJoin};
use futures_core::Future;
use std::{future::ready, pin::Pin};
use tokio::time::{sleep, Duration};

pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

async fn process_not_fail() -> Result<Vec<i32>, ()> {
    sleep(Duration::from_millis(100)).await;
    Ok(vec![ready(1), ready(2)].join().await)
}

async fn process_fail() -> Result<Vec<i32>, ()> {
    Err(())
}

#[tokio::test]
async fn test() {
    let a: BoxFuture<'static, _> = Box::pin(process_fail());
    let b: BoxFuture<'static, _> = Box::pin(process_not_fail());
    let res = [a, b].try_join().await;
    assert!(res.is_err());
}
yoshuawuyts commented 1 year ago

Ha, I found the bug. It turns out the error was in all of the implementations of TryJoin, not just the tuple one. Fixing it up for all impls and filing a patch now!