yoshuawuyts / futures-concurrency

Structured concurrency operations for async Rust
https://docs.rs/futures-concurrency
Apache License 2.0
383 stars 29 forks source link

Fix pin violation in `FutureGroup` #188

Open yoshuawuyts opened 4 weeks ago

yoshuawuyts commented 4 weeks ago

Following up on #187, this problem still persists in FutureGroup itself. I see two options for resolving this and moving forward with a permanent resolution:

  1. We keep the existing architecture for FutureGroup based on a slab structure, but replace it with a more efficient implementation such as the beton crate.
  2. Change the internal architecture of FutureGroup to more closely match FuturesUnordered. I haven't fully paged in how that works yet, but we know it doesn't have the same issues we have, so that might be a lower-risk solution.

cc/ @conradludgate Given you've worked on futures-buffered, and probably know the FuturesUnordered internals better than I do, I'd be interested in your thoughts here.

conradludgate commented 4 weeks ago

The slab implementation in beton would still exhibit the same unsoundness that FutureGroup currently does. The core of the issue is that when you inevitably need to realloc the slab, it will move all items that were allocated - this will violate any Pin requirements. You can over come this by requiring F: Unpin, but that's quite limiting.

I'll try give a simplified overview of the internals of futures and futures-buffered and how they overcome this constraint.

futures::future::FuturesUnordered

This implementation uses an intrusive linked list construction. Each future spawned in the group is allocated in it's own arc, and linked into the ready list. When the future is polled and not ready, it is unlinked and linked into the idle list. The task doubles as it's own waker through ArcWake. When woken, the task unlinks itself from the idle list and links into the ready list. It then wakes the waker that is stored in the FuturesUnordered.

This implementation is not unlike how most executors work, although it's not the most efficient. Tokio, for instance, has an additional fixed length ring buffer that it keeps a batch of ready tasks in - since popping from a ring buffer is cheaper than removing from a cache-unfriendly linked list.

futures_buffered::FuturesUnordered

This implementation uses a "triangular array" to amortise some of the allocation cost and potentially allow for allocation re-use. (Similar properties as a Vec, except it is non-contiguous and never re-allocates, which preserves pin properties). This is also what the thread_local crate uses. Simply put, it's a Vec<Box<[T]>> where each subsequent box-slice is double the length of the slice before it:

[[1], [2, 3], [4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]]

Although, more accurately we use Vec<FuturesUnorderedBounded<T>> as there is some additionally metadata to store. Notably, we use a SlotMap construction for the box-slice to keep a free-list of slots that are available to insert into.

Because futures_buffered does not have a Arc<Task<F>> that it can use as a waker for each task, I created a novel concurrent data structure for the purpose - although a much simpler construction like your WakerVec works, just is less alloc efficient.

The data-structure that I dubbed the "arc slice" works by allocating an arc for each of the FuturesUnorderedBounded<T>. This arc looks something like

Arc<(Waker, mpsc::Sender<usize>, [0, 1, 2, 3, 4, 5, 6, 7])>

The waker for future 2 in the bounded group is a pointer to the number 2 in that arc. When wake is called, it derefs to read the number 2, then walks back 2 usize lengths to find the start of the arc allocation. It then pushes the number 2 into the mpsc channel and calls wake on the parent waker.

When polling the FuturesUnordered, we update the waker in the "arc slice", then receive the values from the corresponding mpsc::Receiver<usize> for the indexes to wake.

Recommendation

I would probably suggest starting somewhere in the middle - A triangular array is a simple construction - and paired with your WakerVec still allows some amount of allocation re-use without adding an overly complex unsafe data structure. The triangular slot-map construction should still support a generational key for removals like FuturesGroup has, although this is not something I've experimented with yet.