AnyarInc / Ascent

A fast and flexible C++ simulation engine and differential equation solver.
Apache License 2.0
123 stars 15 forks source link

wait() exiting too early #8

Closed mwalcott3 closed 4 years ago

mwalcott3 commented 4 years ago

Wait appears to have some issues

asc::Pool pool;
pool.n_threads(std::thread::hardware_concurrency());
std::atomic<int> test = 0;
for (int i(0); i < 10; ++i) {
   for (int j(0); j < 10; ++j)
      pool.emplace_back([&]() {test++;});
   pool.wait();
   std::cout << test << std::endl;
}

Resulted in:

6
7
7
7
7
47
47
67
67
67

Just to make sure it was an issue with not waiting long enough for the jobs to finish here are the results with a sleep

asc::Pool pool;
pool.n_threads(std::thread::hardware_concurrency());
std::atomic<int> test = 0;
for (int i(0); i < 10; ++i) {
   for (int j(0); j < 10; ++j)
      pool.emplace_back([&]() {test++;});
   pool.wait();
   std::this_thread::sleep_for(std::chrono::milliseconds(20));
   std::cout << test << std::endl;
}

Results in:

10
20
30
40
50
60
70
80
90
100

Ill look into this but I'm not fully familiar with C++ multi-threading.

mwalcott3 commented 4 years ago

Checking the computing function in queue doesn't seem to be a reliable enough test for not waiting on the queue's work_done conditional variable.

The bellow changes are not thread safe but fix the above issue

Queue.h:

void wait()
{
   if (!running_jobs && jobs.empty() && !adding) {
      return;
   }
   std::unique_lock<std::mutex> lock(mtx_wait);
   work_done.wait(lock);
}

Pool.h:

void wait()
{
   for (auto& q : pool)
   {
      q->wait();
   }
}
mwalcott3 commented 4 years ago

The problem is caused by the sleep for the pool_balancer. It sleeps for 1 ms after running_jobs is set to false. If a job shows up during this period of time then running_jobs is false but the queue isn't empty.

Regardless I feel like the pool_balancer can be taken out by having a single work queue with multiple workers.

I have a new pool implementation that doesn't need balancing but it isn't entirely backwards compatible.

mwalcott3 commented 4 years ago

Another thing to note is that is that we should never wait without a condition since spurious wake-ups can occur. https://www.modernescpp.com/index.php/c-core-guidelines-be-aware-of-the-traps-of-condition-variables

Perhaps the more problematic issue is that there needs to be a mutex lock before changing running_jobs and sending the notification through the conditional variable since otherwise we can miss a notification and hang forever because we received the notification between the time we checked to see if things were running and the time we set up the wait.

The current threading system hardly runs into it because the sleep generally causes early exits but the the only way to make sure the system wont hang infinitely is to do the cv notification in a mutex lock. I have been able to get it to hang infinitely by disabling the load balancing or fixing the early exit problem and running it with tons of batches jobs with waits like below.

   asc::Pool pool;
   pool.n_threads(std::thread::hardware_concurrency());
   std::atomic<int> test = 0;

   auto t1 = std::chrono::high_resolution_clock::now();
   for (int i(0); i < 1000000; ++i) {
      for (int j(0); j < 10; ++j)
         pool.emplace_back([&]() {test++;});
      pool.wait();
      if (test % 10 != 0)
         std::cout << "Failed" << std::endl;
   }
   auto t2 = std::chrono::high_resolution_clock::now();

   if (test != 10000000)
      std::cout << "Failed" << std::endl;
   std::cout << "Result: " << test << '\n';
   std::cout << "Time(ms): " << std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count() << '\n';

I don't know if it is problem free since there seen to be various gotchas with thread synchronization but an example pool with a working wait that is able to pass the test is bellow.

class Pool
{
public:
   Pool(const unsigned int n = std::thread::hardware_concurrency())
   {
      n_threads(n);
   }
   void n_threads(const unsigned int n)
   {
      for (size_t i = threads.size(); i < n; ++i)
         threads.emplace_back(std::thread(&Pool::worker,this));
   }
   void emplace_back(std::function<void()>&& task) 
   {
      std::unique_lock<std::mutex> lock(m);
      queue.emplace_back(std::forward<std::function<void()>>(task));
      work_cv.notify_one();
   }
   void wait() {
      std::unique_lock<std::mutex> lock(m);
      if (queue.empty() && (working == 0))
         return;
      done_cv.wait(lock, [&]() { return queue.empty() && (working == 0); });
   }
   ~Pool()
   {
      //Close the queue and finish all the remaining work
      std::unique_lock<std::mutex> lock(m);
      closed = true;
      work_cv.notify_all();
      lock.unlock();

      for (auto& t : threads)
         if (t.joinable())
            t.join();
   }

private:
   std::vector< std::thread > threads;
   std::deque< std::function<void()> > queue;
   unsigned int working = 0;
   bool closed = false;
   std::mutex m;
   std::condition_variable work_cv;
   std::condition_variable done_cv;

   void worker()
   {
      while (true)
      {
         //Wait for work
         std::unique_lock<std::mutex> lock(m);
         work_cv.wait(lock, [this]() { return closed || !queue.empty(); });
         if (queue.empty()) {
            if (closed) {
               return;
            }
            continue;
         }

         //Grab work
         ++working;
         auto work = queue.front();
         queue.pop_front();
         lock.unlock();

         work();

         //Notify that work is finished
         lock.lock();
         --working;
         done_cv.notify_all();
      }
   }
};

This is what I'm currently using, but I don't recommend this particular implementation because there are 3 mutex locks per task (adding it to the queue, removing it from the queue, and notifying when done). Having multiple queues like the current system allows for less locks since you only have to lock for notification when the individual queues are empty. To allow work stealing like the current load balancing system you still need a lock when removing items from each individual queue.

All in all though 3 vs slightly over 2 mutex locks per task probably doesn't matter too much and it does simplify the system by having a single queue.

mwalcott3 commented 4 years ago

I don't like the overhead at all. The Pool adds an extra 0.3 microseconds per task compared to vanilla. That is 3 seconds for 1 million batches of 10 jobs.

Using someones else's lock free queue implementation could get rid of 2 locks per task. https://github.com/cameron314/concurrentqueue

And we can put a timeout on the wait and put it in a loop to get rid of the third (The third was to deal with missed signals witch are pretty rare but do show up reliably when testing 1000000 waits). Not the most efficient way but it should be fine.

Or is the overhead acceptable?

mwalcott3 commented 4 years ago

Fixed by pull #9 might revisit in the future if the performance hit is notable.