dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Distributed scheduler does not obey dask.order.order for num_workers=1, num_threads=1 #5555

Open davidhao3300 opened 2 years ago

davidhao3300 commented 2 years ago

What happened:

Distributed scheduler appears to be ignoring the priority set by dask.order.order and instead does a bread-first execution order. This problem is particularly prominent when doing simple per-partition operation chains, where we are able to read one input partition, do some work, then write the results to a partition.

The end result for this particular usecase is that we load ALL parquet partitions first, apply the function to each one, then write each one to file.

While the issue is about num_workers=1, num_threads=1, I do think the suboptimal scheduling occurs in situations involving concurrency as well, and hopefully addressing the serial case will help with the concurrent case.

dask.order.order execution order:

Screen Shot 2021-12-01 at 6 47 14 PM

See how we process the left chain as much as possible before moving to the right chain, as expected

Actual execution:

Screen Shot 2021-12-01 at 6 46 36 PM

I made the function pause in the middle of execution on the top green node. Note how both chains are in-memory. I would've expected to see the bottom chain either completely grayed out (not started), or that the bottom-right node be the only thing in memory.

What you expected to happen:

We instead should be doing depth-first execution order like dask.order.order suggests.

Minimal Complete Verifiable Example:

import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask
from dask.distributed import Client

# Write data to parquet as a setup, to demonstrate unnecessary memory usage
df = pd.DataFrame(np.random.randn(100,100), columns=[str(x) for x in range(100)])

# Can change npartitions to 10 to show that this problem becomes severe at higher partition count
ddf = dd.from_pandas(df, npartitions=2)
ddf.to_parquet("input")

# You may need to set host and dashboard_address to expose dashboard
# We set n_workers=1 and threads_per_worker=1 to get rid of concurrency for simplicity
client=Client(n_workers=1, threads_per_worker=1)

ddf = dd.read_parquet("input")

def f(df, partition_info=None):
    # Pause execution on second partition
    if partition_info["number"] == 1:
        import time
        while True:
            time.sleep(5)
    return df

ddf = ddf.map_partitions(f, meta=ddf)
ddf = ddf.to_parquet('test', compute=False)

# Run this to show that the dask.order.order output is reasonable
# ddf.visualize(color="order")

# This will hang (on purpose). Go to dashboard graph view to see the graph screenshotted in this issue.
dask.compute(ddf, scheduler=client)

Anything else we need to know?:

Environment:

fjetter commented 2 years ago

Thank you for your report @davidhao3300. This is indeed a known problem that is being introduced by us assigning tasks to workers only once we know for sure that they can be computed. That introduces a problem we typically refer to as "root task overproduction" and causes a non-optimal

We've had reports in various contexts already and they typically all require us to expand our scheduler task assignment logic. See also https://github.com/dask/distributed/issues/3974 for a lower level description and links to other tickets.

Another problem I can see in your example is that I would expect these computation graphs to be fused (i.e. linear chains should be merged such that there is only a single task). cc @gjoseph92 you've been looking into something like that recently, haven't you?

davidhao3300 commented 2 years ago

Thanks for the explanation, the root cause makes sense to me, and I can look into manually fusing as a workaround for now

gjoseph92 commented 2 years ago

Thanks @davidhao3300. I really appreciate having such a well-written issue open for this, because this is a known, widespread, and significant problem as Florian has said. "dask.order is not obeyed" is a good way to put it—ordering is actually mostly not considered by the scheduler, which is quite confusing and misleading to users. I suggest reading https://github.com/dask/distributed/issues/5223 as well for some more background on the problem (https://github.com/dask/distributed/issues/3974 is one possible solution, though not the only one).

Florian is correct that you'd expect this graph to be fused. Linear fusion is really the only bulwark we have against this problem right now. Interestingly, your graph isn't getting fused because you're using partition_info: https://github.com/dask/dask/issues/8309. If you removed that argument from f (and instead used a random number to control the sleep or something), or used https://github.com/dask/dask/pull/8310, this example code wouldn't trigger the underlying problem anymore.

davidhao3300 commented 2 years ago

It's good to know I accidentally uncovered a separate issue with partition_info haha; the actual code doesn't use partition_info. Our actual graph is more complicated, but it's good to know that linear fusion should happen automatically.

For automatic graph optimization, is this calling fuse_linear() or fuse()? We have more complicated operations chains in the actual code, and those don't appear to fuse together from what I can tell, need to play around some more

gjoseph92 commented 2 years ago

@davidhao3300 if you're using DataFrames, only Blockwise fusion happens by default now (https://github.com/dask/dask/pull/7620), which is neither fuse_linear() nor fuse()—those are both "low-level" fusion. optimize_blockwise is the main one happening here.

This change was made because most (but still not all) DataFrame optimizations use Blockwise/HighLevelGraphs. The upside of it is that HighLevelGraphs get sent to the scheduler, which are much smaller and more efficient. But, if there are any layers in the HighLevelGraph that aren't Blockwise, then those won't get fused anymore. That's exactly what you saw in your example: map_partitions with partition_info=True currently generates a low-level layer, but read_parquet and to_parquet are both Blockwise. So you have a Blockwise -> low-level -> Blockwise sandwich, which should be fuseable, but optimize_blockwise doesn't support.

Short story is if you do with dask.config.set({"optimization.fuse.active": True}): around your compute/persist/etc., low-level fusion will be turned back on, and these "sandwich" cases will again be fused properly (at the slight expense of a heavier graph to send to the scheduler).

You say your graph is more complicated—this is also especially important if you're using any non-DataFrame collections, such as Delayed. If you're going in and out of Delayed, but ending up with a DataFrame, you'll definitely want to set that "optimization.fuse.active": True, because none of the Delayed stuff is Blockwise.

davidhao3300 commented 2 years ago

Thank you for the detailed response, I think we have enough info to proceed forwards with optimizing our own graphs, I'll let you know if anything else weird pops up!

mrocklin commented 2 years ago

I was looking into this briefly. I think that it's a bit simpler than what folks are describing.

So, it's true that no parallel dask scheduler completely obeys dask.order. Parallelism gets in the way. However, it does come pretty close.

In this particular case what happens is that ...

  1. A worker gets a bunch of work that it can do
  2. The worker finishes a task, and sends back a message to the scheduler that that task is done
  3. The worker starts working on the next task, and so there are two result-sized things in memory at once
  4. The worker hears back from the scheduler a few milliseconds later that the first task can be deleted, so it goes ahead and does so, there is now only one result in memory

And so when you look at this problem with only two partitions it looks bad, but if you zoom out to a more realistic situation then it looks fine. Here is such a case. I've modified the example above to have fifty rather than two partitions:

import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask
from dask.distributed import Client

# Write data to parquet as a setup, to demonstrate unnecessary memory usage
df = pd.DataFrame(np.random.randn(100,100), columns=[str(x) for x in range(100)])

# Can change npartitions to 10 to show that this problem becomes severe at higher partition count
ddf = dd.from_pandas(df, npartitions=50)
ddf.to_parquet("input")

# You may need to set host and dashboard_address to expose dashboard
# We set n_workers=1 and threads_per_worker=1 to get rid of concurrency for simplicity
client=Client(n_workers=1, threads_per_worker=1)

ddf = dd.read_parquet("input")

def f(df, partition_info=None):
    # Pause execution on second partition
    import time
    time.sleep(1)
    return df

ddf = ddf.map_partitions(f, meta=ddf)
ddf = ddf.to_parquet('test', compute=False)

# Run this to show that the dask.order.order output is reasonable
# ddf.visualize(color="order")

# This will hang (on purpose). Go to dashboard graph view to see the graph screenshotted in this issue.
dask.compute(ddf, scheduler=client)

Screen Shot 2022-01-11 at 4 25 38 PM

We can see that, at least for the instant that this image was taken, there were no lingering pieces of data from the first column. Probably there was one for a few milliseconds though and I just didn't catch it.

Anyway, at least for this example Dask seems to be working as expected. You should make partitions small enough so that you can fit around 2 * nthreads in memory at once. This is described at https://docs.dask.org/en/stable/best-practices.html#avoid-very-large-partitions

Dask scheduling can certainly be improved. The issue where we have an extra partition in memory can be significant when people are running with partitions that are large relative to the size of their memory. Topics like more aggressive fusion, or speculative task assignment can resolve these problems in the medium term. Short term following the rule of "make partitions small enough so that you can fit 2*nthreads of them in memory at once" is a good rule of thumb.

davidhao3300 commented 2 years ago

Thank you, based on the conversation we had, it's possible that "longer" chains exacerbate the suboptimal scheduling issue described earlier, or that this problem has been mostly alleviated since we first had the issue 1-2 years ago. I will report back with findings later this week.

mrocklin commented 2 years ago

That could be true, yes. Mostly I think I'm acknowledging the problem you've raised, but rejecting the specific example (while commending you for taking the time to make a minimal example). If you can help to construct a better example then that would be welcome.

gjoseph92 commented 2 years ago

FYI, in the latest release of distributed (2022.11.0), this problem should be mostly alleviated. Thanks to the new "queuing" scheduling policy that's on by default, you should no longer see root task overproduction like this.

@davidhao3300 I've updated your reproducer slightly, since https://github.com/dask/dask/issues/8309 has been fixed (so partition_info no longer has the side effect of preventing task fusion). I've just turned off graph optimization, so we can purely see what the scheduler's doing.

```python import pandas as pd import numpy as np import dask.dataframe as dd import dask from dask.distributed import Client if __name__ == "__main__": # Write data to parquet as a setup, to demonstrate unnecessary memory usage df = pd.DataFrame(np.random.randn(100, 100), columns=[str(x) for x in range(100)]) # Can change npartitions to 10 to show that this problem becomes severe at higher partition count ddf = dd.from_pandas(df, npartitions=10) ddf.to_parquet("input") dask.config.set({"distributed.scheduler.worker-saturation": 1.0}) # You may need to set host and dashboard_address to expose dashboard # We set n_workers=1 and threads_per_worker=1 to get rid of concurrency for simplicity client = Client(n_workers=1, threads_per_worker=1) ddf = dd.read_parquet("input") def f(df, partition_info=None): # Pause execution on second partition if partition_info["number"] == 1: import time while True: time.sleep(5) return df ddf = ddf.map_partitions(f, meta=ddf) ddf = ddf.to_parquet("test", compute=False) # Run this to show that the dask.order.order output is reasonable # ddf.visualize(color="order") # This will hang (on purpose). Go to dashboard graph view to see the graph screenshotted in this issue. input("Press enter to start") dask.compute(ddf, scheduler=client, optimize_graph=False) ```

Before (worker-saturation: inf):

Notice the "execution width" is 3 inf

Current (worker-saturation: 1.1):

Execution width is 2 1 1

worker-saturation: 1.0:

Execution width is 1 (we entirely completed one chain before moving onto the next) 1 0

So we can see that 1.1 isn't quite ideal (there's still a little overproduction, aka a little unnecessary memory use), but it's better than before. In practice, this sets a very low cap on the extra memory use.

I think for your real-world workloads, if you upgrade to the latest version, it'll feel like the scheduler is obeying dask.order as you'd expect, without having to do any custom graph optimization.

gjoseph92 commented 2 years ago

One interesting little thing is your exact reproducer in this issue isn't fixed because of odd edge cases in the scheduling heuristic right now:

With only 2 partitions at worker-saturation: 1.0, you still get the overproduction you originally described in the issue (execution breadth of 2, even though we have 1 thread): 2p-1 0

But if you add any more partitions, suddenly it behaves as you'd expect (execution breadth of 1): 3p-1 0

The entire lower chain is blue (complete). We completed that chain before moving onto the one above. Whereas in the first figure, we're trying to do both chains at once and swapping between them.

This hysteresis happens because the 'new and improved' scheduling mode only kicks in when you have > 2*nthreads tasks. With 2 partitions, you're not over that threshold; with 3, you are: https://github.com/dask/distributed/blob/176a84cb2bb86d6c9fc1507fbceccd63178be260/distributed/scheduler.py#L3034-L3036

Though this edge case hopefully wouldn't come up too much in normal use (usually you'd have a lot more tasks than threads?), it could still be confusing, especially if trying to diagnose things with small examples like this.