yoshuawuyts / futures-concurrency

Structured concurrency operations for async Rust
https://docs.rs/futures-concurrency
Apache License 2.0
414 stars 33 forks source link

ConcurrentStream usage with tokio leads to ACCESS_VIOLATION #182

Closed inklesspen1rus closed 5 months ago

inklesspen1rus commented 7 months ago

Tried to use concurrent streams to sleep in parallel with tokio:

use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            tokio::time::sleep(Duration::from_secs(x as _)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

But sometimes I get crash:

> cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
     Running `target\debug\rstestss.exe`
error: process didn't exit successfully: `target\debug\rstestss.exe` (exit code: 0xc0000005, STATUS_ACCESS_VIOLATION)

Without "current_thread" flavor program just freeze

Other runtimes work fine: async_std

use core::time::Duration;
use async_std;
use futures_concurrency::prelude::*;
use futures::prelude::*;

#[async_std::main]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            async_std::task::sleep(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

smol

use core::time::Duration;
use futures_concurrency::prelude::*;
use futures::prelude::*;

async fn main_async() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            smol::Timer::after(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

fn main() {
    smol::block_on(main_async());
}

Also tokio runtime with smol Timer works fine:

use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            // tokio::time::sleep(Duration::from_secs(x as _)).await;
            smol::Timer::after(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

Is that Tokio issue?

matheus-consoli commented 7 months ago

Interesting, thanks for reporting it It seems that you're running it on Windows, right? - I only have Linux available, and I didn't find the ACCESS_VIOLATION error - but it freezes on every run (while the others runtimes run as you pointed out). @yoshuawuyts you run on Windows, right? I'll investigate some more, but my initial guess is that it is probably a problem coming from tokio

taiki-e commented 7 months ago

From https://github.com/rust-lang/futures-rs/issues/2851#issuecomment-2066826865:

I don't intend to review everything of it, but it has bunch of very suspicious unsafe codes even at a cursory glance: This would be unsound when vec, the storage of slab, is reallocated. This probably has the same problem as tokio-rs/tokio#2612. The heavy use of ManuallyDrop around Pin API reminds me of async-rs/async-std#903. etc.

IIRC, the tokio timer is !Unpin and the others are Unpin, so it is probably only the tokio timer that is affected by the first unsoundness in them.

conradludgate commented 7 months ago

I also noticed the unsoundness when looking through the code just now. I have put together a minimal test repro for the pin unsoundness in this ConcurrentStream vec collect impl.

use std::{future::Future, marker::PhantomPinned, pin::pin, task::Poll};

use futures_concurrency::{
    concurrent_stream::{ConcurrentStream, Consumer, ConsumerState},
    future::Race,
};
use futures_executor::block_on;
use pin_project::pin_project;

#[pin_project]
struct PinCheck {
    addr: usize,
    #[pin]
    _pinned: PhantomPinned,
}

impl PinCheck {
    fn new() -> Self {
        Self {
            addr: 0,
            _pinned: PhantomPinned,
        }
    }
}

impl Future for PinCheck {
    type Output = ();

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Self::Output> {
        let this = self.project();
        let addr = this.addr as *mut usize as usize;
        if *this.addr == 0 {
            cx.waker().wake_by_ref();
            *this.addr = addr;
            Poll::Pending
        } else {
            assert_eq!(*this.addr, addr, "pinned value was moved.");
            Poll::Ready(())
        }
    }
}

struct Tricky;

impl ConcurrentStream for Tricky {
    type Item = ();

    type Future = PinCheck;

    async fn drive<C>(self, consumer: C) -> C::Output
    where
        C: Consumer<Self::Item, Self::Future>,
    {
        let mut consumer = pin!(consumer);
        for _ in 0..64 {
            match consumer.as_mut().send(PinCheck::new()).await {
                ConsumerState::Break => return consumer.flush().await,
                ConsumerState::Continue => continue,
                ConsumerState::Empty => unreachable!(),
            }
        }

        let progress = async { Some(consumer.as_mut().progress().await) };
        let noop = async { None };

        // poll progress once.
        assert!((progress, noop).race().await.is_none());

        // push new entry, reallocs internal futures slab.
        // this moves the futures and violates pinning.
        consumer.as_mut().send(PinCheck::new()).await;

        consumer.flush().await
    }

    fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
        todo!()
    }
}

#[test]
fn it_works() {
    block_on(async { Tricky.collect::<Vec<()>>().await });
}

this currently outputs

thread 'tests::it_works' panicked at src/lib.rs:49:17:
assertion `left == right` failed: pinned value was moved.
  left: 5830115848
 right: 5830117896
yoshuawuyts commented 7 months ago

I've been out on sick leave for the past several days. Just wanted to quickly acknowledge this is indeed an issue we should fix.

I want to thank @inklesspen1rus for reporting this, and I wanted to thank everyone else in this thread helping narrow this issue down.