apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.92k stars 1.12k forks source link

Add `coop_budget` to `datafusion_common::config::ExecutionOptions` #11451

Open thinkharderdev opened 2 months ago

thinkharderdev commented 2 months ago

Is your feature request related to a problem or challenge?

Certain operators (eg AggregateExec, SortExec) are "greedy" in that they will continue processing batches as long as they are produced from their input without yielding. This can can effectively create a hot loop which can monopolize worker threads and starve other tasks in the runtime.

Describe the solution you'd like

Add an optional coop_budget to the ExecutionOptions. When set, greedy operators would wrap their base record batch stream in a stream which ensures it yields back to the scheduler after every coop_budget record batches. This would only kick in after coop_budget batches are processed without yielding. If the underlying stream yields, the coop budget gets reset.

Describe alternatives you've considered

This can be done outside of DataFusion by inserting the cooperative stream at various places but it would be nice if this were built-in to the engine

Additional context

I think this is probably not a big issue if you are setting the partition parallelism to the number of CPU cores since the IO is fairly well pipelined inside ParquetExec and other operators which are doing IO, but we have found that in network-IO-heavy workloads (eg reading from object storage) scheduling one partition per core leaves the executors underutilized in most cases.

The goal of this feature would be to be able to oversubscribe the cores to effectively take advantage of IO parallelism while avoiding horrendous tail latencies in particularly CPU-intensive queries.

Rachelint commented 2 months ago

take

Rachelint commented 2 months ago

Seems to be a good supplement to the coop_budget in tokio!

Actually, we are encountering the tail latencies problem in our production, the heavy queries block the scheduler and make the light ones timeout... This feature maybe help much.

But I still don't quite understand about I think this is probably not a big issue if you are setting the partition parallelism to the number mentioned above... Mind explaining it in more detail?

thinkharderdev commented 2 months ago

But I still don't quite understand about I think this is probably not a big issue if you are setting the partition parallelism to the number mentioned above... Mind explaining it in more detail?

Honestly that was just a guess on my part so it may very well be that even with one partition per core you would see the same issue.

But I was thinking that with one partition per core the IO and CPU work are pipelined reasonably well. The table scan will do some IO, then decode the data and process through the rest of the pipeline. By the time the CPU work is required, there is no more IO in flight to block. With any repartitions though that would get complicated so not really sure.

Rachelint commented 2 months ago

But I still don't quite understand about I think this is probably not a big issue if you are setting the partition parallelism to the number mentioned above... Mind explaining it in more detail?

Honestly that was just a guess on my part so it may very well be that even with one partition per core you would see the same issue.

But I was thinking that with one partition per core the IO and CPU work are pipelined reasonably well. The table scan will do some IO, then decode the data and process through the rest of the pipeline. By the time the CPU work is required, there is no more IO in flight to block. With any repartitions though that would get complicated so not really sure.

Thanks, got it. And as I understand, it seems the reason why executors underutilized is when the cpu work is required, but the IO is in flight and not ready?

So we try to spawn more io tasks to improve the io parallelism. However, the schedule work is carried out by tokio, so maybe it can't reach our goal about improving io parallelism?

For example, assume we spawn:

The ideal scenario is that:

However, the tokio may schedule the tasks not as our expected:

thinkharderdev commented 2 months ago

Thanks, got it. And as I understand, it seems the reason why executors underutilized is when the cpu work is required, but the IO is in flight and not ready?

So we try to spawn more io tasks to improve the io parallelism. However, the schedule work is carried out by tokio, so maybe it can't reach our goal about improving io parallelism?

For example, assume we spawn:

  • 8 cpu tasks
  • and 16 io tasks for improving io prallelism

The ideal scenario is that:

  • 16 io tasks are scheduled and executed, fetched 16 batches, and start to fetch next 16, yield
  • 8 cpu tasks are scheduled, consume the current 16 batches, yield
  • 16 io tasks are scheduled, and the next 16 batches are ready now, get them, and start next 16, yield
  • 8 cpu tasks are scheduled, consume, yield ...

However, the tokio may schedule the tasks not as our expected:

  • 8 io tasks 1 are scheduled and executed, fetched 8 batches, and start to fetch next 8, yield
  • 8 cpu tasks are scheduled, consume the current 8 batches, yield
  • 8 io tasks 1 are scheduled and not ready.
  • 8 io task 2 are scheduled and not ready, yet. ...

Yeah, so my thinking here is that the underlying issue is that if you have CPU work that is not yielding back to the scheduler enough it will stall the tokio reactor and in-flight IO work will not be able to make progress. So by forcing CPU-heavy tasks to yield back to the scheduler more then it will be more likely that IO tasks can make progress even when CPU-heavy tasks are consuming a lot of resources. Latencies are always going to suffer if you have more parallel CPU work than cores to run it on, but ideally by forcing the CPU-heavy tasks to yield back to the scheduler more we can reduce tail latencies so you merely get a linear increase in tail latencies if you oversubscribe CPU-heavy work.

Rachelint commented 2 months ago

Thanks, got it. And as I understand, it seems the reason why executors underutilized is when the cpu work is required, but the IO is in flight and not ready? So we try to spawn more io tasks to improve the io parallelism. However, the schedule work is carried out by tokio, so maybe it can't reach our goal about improving io parallelism? For example, assume we spawn:

  • 8 cpu tasks
  • and 16 io tasks for improving io prallelism

The ideal scenario is that:

  • 16 io tasks are scheduled and executed, fetched 16 batches, and start to fetch next 16, yield
  • 8 cpu tasks are scheduled, consume the current 16 batches, yield
  • 16 io tasks are scheduled, and the next 16 batches are ready now, get them, and start next 16, yield
  • 8 cpu tasks are scheduled, consume, yield ...

However, the tokio may schedule the tasks not as our expected:

  • 8 io tasks 1 are scheduled and executed, fetched 8 batches, and start to fetch next 8, yield
  • 8 cpu tasks are scheduled, consume the current 8 batches, yield
  • 8 io tasks 1 are scheduled and not ready.
  • 8 io task 2 are scheduled and not ready, yet. ...

Yeah, so my thinking here is that the underlying issue is that if you have CPU work that is not yielding back to the scheduler enough it will stall the tokio reactor and in-flight IO work will not be able to make progress. So by forcing CPU-heavy tasks to yield back to the scheduler more then it will be more likely that IO tasks can make progress even when CPU-heavy tasks are consuming a lot of resources. Latencies are always going to suffer if you have more parallel CPU work than cores to run it on, but ideally by forcing the CPU-heavy tasks to yield back to the scheduler more we can reduce tail latencies so you merely get a linear increase in tail latencies if you oversubscribe CPU-heavy work.

It seems the key is to get it closer to the ideal scenario that once any io task is completed, we respond immediately, and make it able to start next io.

Thanks for the answer, I think I can now imagine the specific scenario, will start to make a poc soon and try to do some benchmark about it.