TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.25k stars 272 forks source link

Primes example... slower with more workers? #479

Open davideger opened 2 years ago

davideger commented 2 years ago

Following along here:

https://timelydataflow.github.io/timely-dataflow/chapter_1/chapter_1_1.html

I've downloaded a fresh copy of timely and find when I run the "parallel primes" program, timely gets much slower with more workers, opposite of what I'd expect. Is there a recent regression with Timely?

` $ time cargo run --release --example primes -- -w16 > output16.txt Finished release [optimized + debuginfo] target(s) in 0.02s Runningtarget/release/examples/primes -w16`

real 1m9.235s user 15m27.958s sys 0m1.987s

$ time cargo run --release --example primes -- -w1 > output1.txt Finished release [optimized + debuginfo] target(s) in 0.02s Running target/release/examples/primes -w1

real 0m0.279s user 0m0.204s sys 0m0.076s

``

#![allow(unused_variables)]
extern crate timely;

use timely::dataflow::{InputHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();

        // create a new input, exchange data, and inspect its output
        let probe = worker.dataflow(|scope| {
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect( //move |x| println!("worker {}:\thello {}", index, x))
                          |x| {
                              let limit = (*x as f64).sqrt() as u64;
                              if *x > 1 && (2 .. limit + 1).all(|i| x % i > 0) {
                                  // why can't i capture index?
                                  println!("{} is prime", x);
                              }
                          })
                 .probe();
        });

        // introduce data and watch!
        for round in 0..200000 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
        }
    }).unwrap();
}
frankmcsherry commented 2 years ago

What's happening is that you are overwhelming the control plane with this line:

            input.advance_to(round + 1);

For each individual record you put in, you advance the ~round~ timestamp which introduces coordination information. This can be valuable if you want to be able to track the progress of the computation record by record, but it is expensive to do that (and quadratic with the number of workers).

If you comment it out, and crank the rounds up to 2M, I see at least

cargo run --release --example primes -- -w1  0.82s user 0.13s system 86% cpu 1.091 total
cargo run --release --example primes -- -w4  0.65s user 0.05s system 130% cpu 0.539 total

If you go up again to 20M, it looks like

cargo run --release --example primes -- -w1  15.24s user 0.83s system 98% cpu 16.271 total
cargo run --release --example primes -- -w4  13.62s user 0.17s system 198% cpu 6.935 total

At this point, you would probably go faster with some synchronization: the code puts 20M records into the queue and probably introduces some backlog. Only introducing .. idk .. say 1M at a time and then coordinating the workers (not introducing more records until the probe has advanced to the next time).