rayon-rs / rayon

Rayon: A data parallelism library for Rust
Apache License 2.0
11.07k stars 501 forks source link

Terribly inefficient design and possible solution #1178

Open lyphyser opened 4 months ago

lyphyser commented 4 months ago

Unless I misunderstand something, it looks like the Rayon design is deeply flawed, and will result in threads sitting idle for no reason in pretty much all situations.

Example 1

For example, let's assume 2 threads and that thread B is running a 20 seconds long task..

Thread A runs for_each on a parallel iterator, resulting in two splits that each take 15 seconds.

After 20 seconds, thread B ends and WASTES 10 seconds doing nothing, since thread A is now running the second split, and the terrible Splitter design no longer splits the iterator (since the job wasn't stolen).

A: [15 first half] [15 second half] B: [20 other job] [10 WASTED TIME]

Example 2

Now let's still assume 2 threads and that they are both idle, when a for_each is run.

Now we have 2 splits and thread B steal the second half, and splits in two again..

Now let's assume that the first half takes 20 seconds and the two quarters composing the second half take 15 each, which will result in no split, and again 10 seconds of WASTED TIME.

A: [20 first half] [10 WASTED TIME] B: [15 3rd quarter] [15 4th quarter]

Solution

I think the best solution is (in the simplest implementation) to not split at all in the beginning, but register the running task as something that can stolen.

When the task is stolen while it's running, then the stealing task splits it (this requires interior mutability and either atomics or mutexes) and, if the split is non-empty, registers it as something that be stolen and runs it.

This guarantees that there can be no wasted time since all running tasks that contain more than one item can be stolen and split.

An optimization would be at the beginning to detect whether other threads are idle and split the task, giving a piece to all idle threads, rather than having them steal it.

Implementation

This seems to require significant changes to Rayon. In particular:

cuviper commented 4 months ago

You are using rather inflammatory language, and your proposed solution leaves a whole lot of devils in the details. If nothing else, I suspect that such invasive stealing will end up pessimizing a lot of the "happy" path, where jobs that could be streaming rapidly and independently on each thread now have to step more carefully around atomics and synchronization. You are welcome to experiment with changes, of course!

One minor point is that the current splitter will actually create 4 total jobs for a 2 thread pool with no stealing. It starts with splits: current_num_threads(), then divides by two until it reaches 0. The comments are not really clear about this, but none of that is promised in the public API anyway. Your worst case can still occur when one thread goes idle after the other thread has already started its last job.

In general, I think jobs taking multiple seconds is too long for rayon to be effective, but we have no idea about that up front. Maybe a more intelligent splitter could time its first split and adjust accordingly. As a user, you can also give with_max_len hints in your code if you know your work takes a long time and shouldn't be grouped so much.

lyphyser commented 4 months ago

I think the current behavior is highly unexpected (and thus reasonably described as "completely broken") as it seems to me that the basic goal of any work scheduling algorithm is to never let any processors be idle when there is work that can be done.

At least one atomic operation per iterator item seems to be absolutely necessary (unless min_len is set to > 1, or exotic direct thread manipulation like sending signals to or stopping specific threads is used) since if Rayon ever attempts to execute two items in a row with no atomic in the middle, then other worker threads could suddenly complete all work before the first item is done with no way for the first thread to notice and then there would be one thread running two (or more) items and one or more thread with zero items, resulting in wasted time (this is why the current design is completely broken).

However, this should not be a big deal as long as the cacheline that the atomic operation acts upon stays on the current CPU, since atomic operations are generally fast on current CPUs if the memory touched stays in the local cache, which my proposed solution does (if there is no stealing).

cuviper commented 4 months ago

The current behavior is a trade-off between maximizing granularity (which would minimize idleness) and minimizing overhead. It's possible that we could find a better balance, but these goals will be competing in any case.

wagnerf42 commented 4 months ago

hi, we tried that a few years ago (https://arxiv.org/pdf/2009.05504). (see section 3.6)

in my opinion the best way to solve it would be to make the producer types an associated type in the parallel iterator trait (once the current problem with GAT is solved).

it would allow to implement such a scheduler in user space.

lyphyser commented 4 months ago

I'm trying to come up with a design to solve this. I think the best is to keep the current "static splitting" interface, which is still useful to implement skip(), take(), and when many threads are idle, and add a new interface.

This new interface would allow to turn a Producer into a double-ended iterator-like construct with a shared state and two split ends referencing it where both ends can be used, and both ends are Iterators, support folding with a folder and support splitting which is equivalent "reading a block of time"; the semantics would be of course that items taken from the right end would not be returned by left end and viceversa, like normal double-ended iterators.

For example, for a 0..n iterator the shared state would be a CachePadded\<AtomicT> with the length, and the left end would contain the start plus a reference to the length, while the right end would countain the end plus a reference to the length. Items are generated via an atomic decrement of the length and a check that it was positive (on x86, this is a "lock dec" and an SF check), while splitting can happen with a compare-and-exchange operatoin.

Something like:

trait Producer {
  ...
  type Shared: Sync;
  type Left: ProducerEnd<Self>;
  type Right: ProducerEnd<Self>;

  fn into_ends(self) -> (Self::Shared, Self::Left, Self::Right);
}

trait ProducerEnd<P: Producer>: Send
{
  // not ExactSizeIterator of course because the size is dynamic
  // not DoubleEndedIterator because the other end is separate
  type IntoIter<'a>: Iterator<Item = P::Item>;

  /// if index is None, split into halves, otherwise split at index; return the split index
  fn split_at(&mut self, state: &P::Shared, index: Option<usize>) -> (Range<usize>, P);
  fn into_iter(self, state: &P::Shared) -> Self::IntoIter<'_>;

  fn fold_with<F: Folder<P::Item>>(self, state: &S, folder: F) -> F {
     folder.consume_iter(self.into_iter());
  }
}

Unfortunately it seems impossible to make this change backwards-compatible due to the lack of associated type defaults, unless new variants of Producer traits are made.

The bridge code would then store the shared state in a local variable, move the left and the right end to the first and second closure of a join_context, with the first job folding the left end and the second job doing nothing unless migrated, in which case it would split the right end, fold the new producer and offer the original and the new right ends as stealable jobs.