rust-threadpool / rust-threadpool

A very simple thread pool for parallel task execution
https://crates.io/crates/threadpool
Apache License 2.0
548 stars 85 forks source link

Correct way to wait until all threads are finished #19

Closed cetra3 closed 7 years ago

cetra3 commented 8 years ago

If you have a fixed sized queue of work, what's the correct way to block until your queue is finished? I am currently using a channel, but I'm not sure that's the correct way.

For instance, from the dining philosophers problem:

let handles: Vec<_> = philosophers.into_iter().map(|p| {
    thread::spawn(move || {
        p.eat();
    })
}).collect();

//blocks until all work is done   
for h in handles {
    h.join().unwrap();
}

The way I have currently accomplished this with threadpools is using mpsc::channel:

let pool = ThreadPool::new(4);

let num_philosophers = philosophers.len();

let (tx, rx) = channel();

for p in philosophers {
   let tx = tx.clone();
   pool.execute(move || {
        p.eat();
        tx.send(()).unwrap();
   })
}

//blocks until all work is done   
for _ in 0..num_philosophers {
  rc.recv().unwrap();
}

Is there a cleaner way to wait?

dns2utf8 commented 8 years ago

I had the same problem with my lasttest project.

Since the pool uses a channel for internal communication my starter thread is faster with dispatching new jobs than the pool is able to start the first one and increase the counter.

So I had to wait for the pool to start with an additional variable or with a collecting channel

I am thinking about creating a pull request to increase the active_count in the execute function, what do you think?

dns2utf8 commented 8 years ago

You could use a barrier: https://github.com/frewsxcv/rust-threadpool/pull/35

use threadpool::ThreadPool;
use std::sync::{Arc, Barrier};
use std::sync::atomic::{AtomicUsize, Ordering};

// create at least as many workers as jobs or you will deadlock yourself
let n_workers = 42;
let n_jobs = 23;
let pool = ThreadPool::new(n_workers);
let an_atomic = Arc::new(AtomicUsize::new(0));

// create a barrier that wait all jobs plus the starter thread
let barrier = Arc::new(Barrier::new(n_jobs + 1));
for i in 0..n_jobs {
  let barrier = barrier.clone();
  let an_atomic = an_atomic.clone();

  pool.execute(move|| {
    // do the heavy work
    an_atomic.fetch_add(1, Ordering::Relaxed);

    // then wait for the other threads
    barrier.wait();
  });
}

// wait for the threads to finish the work
barrier.wait();
assert_eq!(an_atomic.load(Ordering::SeqCst), 23);
xpe commented 7 years ago

Any other ways you'd recommend? Is this the simplest/cleanest way?

frewsxcv commented 7 years ago

This is now completed with https://github.com/frewsxcv/rust-threadpool/pull/63 by @dns2utf8 🎊