Open gjoseph92 opened 2 years ago
I know I tend to blame things on root task overproduction, so I'm trying hard to come up with some other explanation here.
Root task overproduction is an observation but not a cause. As you are telling us already, repartition-merge
is not a root task so this is rather a general "data producer overproduction".
What is causing this overproduction? Why are the ready subs not computed asap? There always seems to be a couple of sub
ready, i.e. grayed out in the bar. IIUC this is the processing
state on scheduler side, i.e. there is already a message queued up to the worker. Do we really believe that scheduler latency is causing this?
Especially after a disk-write I would expect the workers to immediately pick up sub
tasks. If not sub
, they should pick up repartition tasks. What I see more often than not is that they are executing yet another make-timeseries. It's really hard to tell, though with all the green shades. I think the only worker that executes stuff as I would expect it to is the last worker on the bottom. There is a lot of green stuff and there is barely any memory in RAM. That's what my intuition would tell is how a near perfect scheduling should look like in this situation.
I was under the impression that the queuing PR was supposed to have no impact at all if you leave worker-saturation: .inf
? If you take away the new code paths that are exclusively triggered when worker-saturation is a finite number, what changed?
I was under the impression that the queuing PR was supposed to have no impact at all if you leave worker-saturation: .inf? If you take away the new code paths that are exclusively triggered when worker-saturation is a finite number, what changed?
First of all, a bit of magic after running the computation once
def get_stim_logs(dask_worker):
return list(dask_worker.state.stimulus_log)
stim_logs = client.run(get_stim_logs)
then reconstruct the worker state at a given point in time and have fun!
addr = 'tls://10.0.1.197:37555'
def reconstruct_state(addr, ix_max=0):
"""
Reconstruct the worker state at a given time ``ix_max``
"""
ws = WorkerState(threads={}, address=addr, nthreads=2)
ws.validate = False
instructions = []
try:
log = stim_logs[addr]
# print(f"Available stimuli: {len(log)}")
for ix, stim in enumerate(log):
if ix_max and ix > ix_max:
break
instructions.append(
ws.handle_stimulus(stim)
)
except:
# There is some assertion error during forgetting
# Story looks as if we tried to forget twice
pass
return ws
ws = reconstruct_state(addr)
I reconstructed the worker state for two of the workers and ran the following. The following returns the top priority task on the workers ready heap immediately after a sub
task was received.
Assuming task co-assignment works reasonably well, perfect scheduling would require us to have this exact task on the top of our heap since it is very deep in our graph and should run asap. The only exception are things like dataframe-sum-chunk
which are even further down the graph. If it is not scheduled asap this means either the priority is off or it needs to fetch dependencies.
The following function inspects the worker state for all ComputeTask
messages instructing the worker to compute a sub
task. It collects a snapshot of the top of the heap and counts the states of the dependencies
from collections import Counter
def get_top_ready_heap(addr):
from distributed.worker_state_machine import ComputeTaskEvent
log = stim_logs[addr]
sub_stimuli = []
for ix, stim in enumerate(log):
if isinstance(stim, ComputeTaskEvent):
if "sub" in stim.key:
sub_stimuli.append(ix)
top_ready = []
for sub_stim_ix in sub_stimuli:
ws = reconstruct_state(addr, ix_max=sub_stim_ix)
if ws.ready:
stimulus = log[sub_stim_ix]
sub_ts = ws.tasks[stimulus.key]
ts = ws.ready.peek()
missing_deps = Counter({dep.state for dep in sub_ts.dependencies})
top_ready.append((ts, missing_deps))
return top_ready
I'm inspecting two workers
The first one is basically the "doesn't us any memory" worker of the above gif. The other one is just a random other worker.
At the very least the first couple of tasks are scheduled perfectly. There are a couple of bad apples but overall this worker is getting great task assignments
[(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 0)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 331)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 330)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 332)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 333)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 334)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 335)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 336)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 337)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 338)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 339)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 340)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 341)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 342)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 343)" ready>, Counter({'memory': 2})),
(<TaskState "('sub-6f76eb3f7f376678bde6c2c7084c9674', 344)" ready>, Counter({'memory': 2})),
(<TaskState "('dataframe-count-chunk-9fe16c8cf9f65c347b1a644e2de9a32f-4eabd21b86af5c4984f612322e91d13c', 343)" ready>, Counter({'memory': 2})),
...
# There are also a couple of "bad apples". Another task is scheduled because dependencies need to be fetched
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 370)" ready>,
Counter({'memory': 1, 'flight': 1})),
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 370)" ready>, Counter({'fetch': 1, 'memory': 1})),
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 375)" ready>, Counter({'memory': 1, 'fetch': 1})),
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 375)" ready>, Counter({'fetch': 1, 'memory': 1})),
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 380)" ready>, Counter({'fetch': 1, 'memory': 1})),
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 384)" ready>, Counter({'fetch': 1, 'memory': 1})),
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 386)" ready>, Counter({'fetch': 1, 'memory': 1})),
...
Right from the start, this worker is having a bad time. Every sub
computation needs to fetch data and instead the worker schedules another task.
[(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 501)" ready>, Counter({'flight': 1, 'memory': 1})),
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 501)" ready>, Counter({'fetch': 1, 'memory': 1})),
(<TaskState "('repartition-split-0e00ebd15540325c87d2f05e09f6e5dd', 501)" ready>, Counter({'memory': 1, 'fetch': 1})),
(<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 405)" ready>, Counter({'fetch': 1, 'memory': 1})),
(<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 488)" ready>, Counter({'fetch': 1, 'memory': 1})),
(<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 488)" ready>, Counter({'fetch': 1, 'memory': 1})),
(<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 535)" ready>, Counter({'memory': 1, 'fetch': 1})),
(<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 535)" ready>, Counter({'memory': 1, 'fetch': 1})),
(<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 535)" ready>, Counter({'fetch': 1, 'memory': 1})),
(<TaskState "('repartition-merge-0e00ebd15540325c87d2f05e09f6e5dd', 568)" ready>, Counter({'flight': 1, 'memory': 1})),
Edit: There was a bug in my script showing only a dependency count of one. I accidentally used a set on the states. It's corrected now.
If that's not interesting enough, the worker that is doing really well has the "smallest" IP address if we sort them all lexicographically. The workers dict on the scheduler is a SortedDict
https://github.com/dask/distributed/blob/bfc5cfea80450954dba5b87a5858cb2e3bac1833/distributed/scheduler.py#L3327
somewhere in our decision making logic we prefer the "first worker" and we are apparently doing a great job of initial task placement for that worker but this quickly breaks down for other workers.
Edit: There was a bug in my script showing only a dependency count of one. I accidentally used a set on the states.
Why are the ready subs not computed asap? There always seems to be a couple of
sub
ready, i.e. grayed out in the bar. IIUC this is theprocessing
state on scheduler side, i.e. there is already a message queued up to the worker. Do we really believe that scheduler latency is causing this?
This is the main thing I'm curious about.
Here's a screenshot from 2s before the side-by-side I posted. Both versions are at ~740 make-timeseries
complete.
Notice how 'before', 200 sub
s had already completed. 'after', only 31 have, and no more are in processing yet. I think this is the difference. Even though there are ~200 fewer repartition-merge
s complete, more of them are still in memory. And fewer of their downstream sub
tasks have even been scheduled yet—even though clearly the scheduler knows that they're in memory, since it's displaying on the dashboard.
Replaying worker state like this is amazing. We should do this for the commit before and compare though. The difference in sub
s needing to fetch dependencies vs having them in memory is interesting, but see above—I think the problem might more be that we're traversing the graph in a slightly different order, which is making that so that repartition-merge
s are piling up which (somehow?) aren't input to sub
s.
I'm not sure it's only that sub
s are being scheduled on the "wrong" workers, requiring transfer—maybe they're not even being scheduled until too late?
https://github.com/dask/distributed/pull/6614 causes a significant performance improvement to
test_vorticity
:And a significant performance regression to
test_dataframe_align
:I think the key change is fixing https://github.com/dask/distributed/issues/6597.
test_vorticity
comes from https://github.com/dask/distributed/issues/6571. This workload is the one where we discovered the co-assignment bug that's now fixed. So it's encouraging (and not surprising) that improving co-assignment significantly reduces transfers, and improves performance.test_dataframe_align
is a bit surprising. You'd think that assigning matching partitions of the two input dataframes to the same worker would reduce downstream transfers—which it indeed does.Worth noting: in my original benchmarks,
test_dataframe_align
was probably the most affected by root task overproduction out of everything I tested:I ran it manually a few times both before and after the queuing PR. I also tried turning work stealing off, but it didn't make a difference.
Before
After
If you stare at these GIFs for a while, you can notice that:
Here's a comparison between the two dashboards at the same point through the graph (785
make-timeseries
tasks complete):This is also a critical point, because it's when we start spilling to disk in the "after" case. The "before" case never spills to disk.
You can see that:
repartition-merge
tasks in memory (top progress bar) in the "after" case. Nearly every executed task is in memory. Compare that to the before case, where half have already been released from memory.sub
,dataframe-count
, anddataframe-sum
executed. These are the data-consuming tasks.sub
s have just been scheduled. A moment before, none of those were in processing. So sadly, they arrive just a moment too late to prevent the workers from spilling to disk.The simplest hypothesis is that the
repartition-merge
s are completing way faster now, since they don't have to transfer data. Maybe that increase in speed gives them the chance to run further ahead of the scheduler before it can submit thesub
tasks? This pushes the workers over the spill threshold, so then everything slows down.I know I tend to blame things on root task overproduction, so I'm trying hard to come up with some other explanation here. But it does feel a bit like, because there's no data transfer holding the tasks back anymore, they are able to complete faster than the scheduler is able to schedule data-consuming tasks.
What's confusing is just that
repartition-merge
isn't a root task—and we see relatively fewmake-timeseries
orrepartition-split
s hanging around in memory. So why is it that schedulingsub
s lags behind more?