google / marl

A hybrid thread / fiber task scheduler written in C++ 11
Apache License 2.0
1.88k stars 194 forks source link

Multiple tasks queued to the same worker while other workers are idle #232

Closed natepaynefb closed 11 months ago

natepaynefb commented 1 year ago

Great library. Thanks to all the contributors! Here's the situation...

I sometimes subdivide a larger body of work into multiple tasks. In this case exactly 4 tasks. Those tasks are enqueued in rapid succession (for loop). I have many free workers on my Threadripper CPU and the vast majority of the cores are waiting for work (not spinning). In this situation, I often observe that two of the tasks will be assigned to the same worker. That worker will do both tasks one after the other, taking twice as long as the other workers which only had to perform one task. The caller has to wait for all of the tasks to complete, so the end-to-end time is twice as long as it could have been if the tasks were each assigned to different workers. Each task takes 5+ milliseconds, so there should be plenty of time for other workers to steel the task, if they were attempting to do so. I would like to avoid this situation, and I would appreciate your advice.

The obvious options include: 1) Modify Scheduler::enqueue so that it keeps iterating over workers until it finds one that is ready to begin the work immediately, if such a worker exists. This would increase the cost of enqueueing a tasks so it might have a detrimental affect when the tasks outnumber the workers. For that reason, I have considered adding a flag which would opt-in to this behavior (to be used for tasks that are known to be long enough in duration to be worth the extra effort up front). 2) Another variation of option 1 would be to keep a list of idle workers, but the book keeping overhead might be significant. 3) Or, I could allow the tasks to be doubled up on the same worker, and then wake up one of the other workers so that it has the opportunity to steel work.

What do you think? Is there a recommended solution for this problem? I know that subdividing the work into smaller tasks would help the scheduler but there is also a cost that increases with the number of tasks (combining the results). Thank you for taking the time to consider this question.

ben-clayton commented 1 year ago

Hi @natepaynefb

It's been a good while since I worked on marl, and looked at the scheduling logic.

The task should be scheduled in a basic round-robin fashion. Given that you say you have a number of threads idle, it seems like something has gone wonky here.

Looking over this with fresh eyes, I wonder if the following logic could end up scheduling tasks to the same thread:

https://github.com/google/marl/blob/a47a3a5c54435751d30e3154fdf17e3b29fc6796/src/scheduler.cpp#L174-L181

If you replace that block with just idx = nextEnqueueIndex++ % cfg.workerThread.count;, does that improve the problem?

Cheers, Ben

natepaynefb commented 1 year ago

Ben, sorry it took so long for me to circle back on this issue. It continues to be a problem for us. In profile traces, I keep seeing examples where we wait for one worker to complete two or more tasks in a row, when there were plenty of idle workers that could have performed the work concurrently. This mainly occurs in the context of parallel-for-each loops where one thread subdivides the work, enqueues several tasks, then waits for all of them. The overall operation ends up taking twice as long as it should if a single worker has to do twice as much work as the others.

To demonstrate this problem, I wrote a very simple unit test. It adds 8 tasks to 8 worker threads, then waits for all of them, then repeats the process 8 times. Each task does 10ms of simulated work. Here's the code:

TEST(marl, Scheduler) {
  constexpr int threadCount = 8;
  constexpr int repeatCount = 8;
  marl::Scheduler::Config cfg;
  cfg.workerThread.count = threadCount;
  marl::Scheduler scheduler(cfg);
  for (int repeat = 0; repeat < repeatCount; ++repeat) {
    marl::WaitGroup wg;
    for (int i = 0; i < threadCount; ++i) {
      wg.add(1);
      scheduler.enqueue(marl::Task{[wg]() {
        ZoneScopedN("DoTask");
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        wg.done();
      }});
    }
    wg.wait();
  }
}

In theory, this test should take just over 80ms to complete (10ms per block of tasks repeated 8 times). In practice it takes much longer to complete because workers sometimes go to sleep when they should be stealing work. Here is an example trace. In this example the overall process took about 50ms longer than it should have because tasks got doubled up in 5 of the 8 batches. 230509_marl_scheduler_trace_before

I did some digging and I was able to make some minor changes that significantly improve this behavior for our uses cases. Here's the trace after my changes: 230509_marl_scheduler_trace_after

Here's what I think is happening: 1) A worker can stop spinning after it successfully steals a task. However, there is nothing to prevent another worker from stealing the same task immediately after. As a result the first worker stops spinning prematurely. It goes to sleep even though there is still work that it could successfully steal. 2) A worker can stop spinning when it observes that (work.num > 0), indicating that new work was assigned to it. However, it does not have a mutex lock so it is still possible for another worker to steal it. If that happens, the result will be the same as 1. The worker will stop spinning prematurely and go to sleep without performing any work. 3) The array marl::Scheduler::spinningWorkers has a hard coded length of 8. If there are more than 8 spinning workers then information will be lost. As a result, a new task may be assigned to a worker that already has work even though some of the workers are still spinning.

My solution to the above: 1) I modified spinForWork so that it returns the stolen task via out parameter. The calling function (waitForWork) then locks the mutex and then enqueues the stolen task. The mutex will not be unlocked again until after the task has been dequeued by runUntilIdle. This ordering ensures that another worker cannot steal the task that was already stolen by the first worker. 2) Similarly, spinForWork does not simply check (work.num > 0). Instead, it attempts to steal a task from its own queue. If successful, then the task will be returned via out parameter. Again, this ensures that the worker will not stop spinning until it has a task that nobody else can steal. 3) I increased the size of spinningWorkers to MaxWorkerThreads so that it could contain the index of every worker. The modulus operation uses the number of actual workers instead of the size of the array. Thus, spinningWorkers is now a full list of the workers that currently have no work (they may or may not actually still be spinning). This helps Scheduler::enqueue to assign work to tasks that have none (if any), rather than assigning work in arbitrary round-robin fashion.

You suggested ignoring spinningWorkers so that task would always be assigned in round-robin fashion. I tried that. The problem was that it resulted in a large number of spinning workers, which I thought was somewhat wasteful (presumably increasing heat and power consumption).

What do you think? Does my analysis make sense to you? Do my solutions sound reasonable? If so, I would be happy to prepare a pull request. If not, then I would like to discuss it further. Thanks for reading this long post. I hope to hear from you soon.

ben-clayton commented 1 year ago

Hi @natepaynefb ,

This is a fantastic investigation, and your results look compelling.

I'd love to see a PR, along with your new unit test.

Before accepting, I'll have to run against a number of benchmarks to check there's no major performance regressions, but given that this is all relating to spinForWork, I'm optimistic that this should be fine. We mostly care about performance of continual streams of work.

Many thanks, Ben

natepaynefb commented 1 year ago

Great. I'll work on preparing that PR, but I have a busy week ahead so it might take a bit. Also, I've been scrutinizing traces of our actual code and I can see that a problem still exists in some cases. I think the remaining problems have a different cause, however.

Here's what I observe (with or without the improvements mentioned above):

I don't see any way that all 3 tasks could have been assigned to the same worker originally. I think it is more likely that the work was assigned to multiple workers, but some of those workers were asleep and failed to wake up. I think the active worker probably finished the first task, then stole the second, then stole the third. I can't think of any other explanation for the behavior that I'm seeing.

I'll try to dig deeper when I have time. Does my hypothesis sound plausible to you? Can you provide any pointers regarding possible points of failure in the code?

ben-clayton commented 1 year ago

It sounds plausible, but it seems more likely that you've uncovered a bug, or at least some logic that behaves badly in certain situations.

Disclaimer: It's been a few years since I last properly worked on the scheduler, and so my memory is a little hazy on this. Given your awesome investigations, don't assume I have vastly more in-depth knowledge than you :)

Looking over the code again, it seems the spinningWorkers logic still is a likely candidate for tasks getting scheduled on the same worker. onBeginSpinning() will add the worker ID to the spinningWorkers circular buffer, and this is only ever removed when a new task is enqueued.

Consider the following:

The end result is that you'll have a whole bunch of entries in spinningWorkers for worker A. The next bunch of future tasks will take the worker A entries in spinningWorkers, and all the tasks get assigned to worker A, without waking other workers.

Could some variation on this be what you are experiencing?

natepaynefb commented 11 months ago

@ben-clayton I finally got around to posting a PR for this issue. See: https://github.com/google/marl/pull/261

ben-clayton commented 11 months ago

Thank you @natepaynefb - I'll take a look when I get a chance. Apologies for the delay, this is a very busy couple of weeks.

ben-clayton commented 11 months ago

261 has been merged. Thank you!