dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Large time delay/white space between subsequent dask.compute() calls #7066

Open scottshambaugh opened 2 years ago

scottshambaugh commented 2 years ago

What happened: I use dask as a way to do parallel processing of single functions, and the repo where I use it essentially runs three different single-item task graphs one after the other. However, I am seeing huge time delays between consecutive runs of the task graphs.

I have tried to boil down the issue to a minimum viable example which should have essentially no delay due to serialization or surrounding code. The below example mirrors the rough structure of how I use dask in my code, and reproduces the the issue I am seeing. Attached is the html performance report, but in short this the large delay between the two groups of tasks is what I'm looking at:

image

I'm happy to do some more digging into what could be happening, but I have no idea how to interrogate what is happening in that gap.

What you expected to happen: There should not be such a large delay between the two groups of tasks.

Minimal Complete Verifiable Example:

import dask
from dask.distributed import Client, performance_report
from numpy.random import default_rng

def case_runner(case):
    return round(case)

class Sim:
    def __init__(self, ncases=int(1e3)):
        self.ncases = ncases
        self.rng = default_rng(seed=12345)
        self.cases = self.rng.random(self.ncases)
        self.runcases = []

        self.client = Client()
        self.cluster = self.client.cluster
        print(f'Dask dashboard link: {self.cluster.dashboard_link}')

    def run(self, n):
        with performance_report():
            for _ in range(n):
                runcases = []
                for case in self.cases:
                    case_delayed = dask.delayed(case_runner)(case)
                    runcases.append(case_delayed)
                self.runcases = dask.compute(*runcases)

def main():
    sim = Sim(ncases=int(1e3))
    sim.run(n=2)
    return

if __name__ == '__main__':
    main()

Anything else we need to know?:

Environment: Note that this behavior is not new and has been showing itself since I started using dask at the start of this year.

Cluster Dump State:

dask-report.zip

hayesgb commented 2 years ago

The reason for the long delay between the two groups of tasks is the dask.compute(*runcases) call. This blocks until the inner loop completes. The following alleviates that.

    def run(self, n):
        with performance_report():
            for _ in range(n):
                runcases = []
                for case in self.cases:
                    case_delayed = dask.delayed(case_runner)(case)
                    runcases.append(case_delayed)
            self.runcases = dask.compute(*runcases)

You can trigger computation without blocking with dask.persist(), then gather all of the results from the cluster, or maybe take a look here if you need to interleave submissions and computations.

scottshambaugh commented 2 years ago

I'm still confused, since even if dask.compute() is blocking, that inner loop should be complete at the 49 second mark in the plot above, right? Should it really take 5 whole seconds of wall time to gather all the results here when it's just a thousand ints? This is just an example, but the delay is scaling up in my real use.

Though playing around it looks like self.runcases = self.client.map(case_runner, self.cases) is a lot faster.

gjoseph92 commented 2 years ago

@scottshambaugh I'd suggest profiling this whole thing with py-spy: py-spy record --idle --subprocesses --format=speedscope -o profile.json -- python repro.py. That should give you a more definitive look at what's going on in the white space.

@hayesgb is correct that if you have a lot of things to compute, it's better to submit them all at once. But I also agree with you that this does feel slow.

Should it really take 5 whole seconds of wall time to gather all the results here when it's just a thousand ints?

Profiling should help show what's going on.

You might also be interested in https://github.com/dask/distributed/issues/4754, which I have a feeling this might end up being a duplicate of.

The way compute currently works is that first, all the results have to be aggregated onto one worker. Then, that aggregated result is sent to the client. This means:

  1. There's no overlap of communication and computation. Results don't start to move until the last result has been computed. Ideally, you'd want each result to start moving to the client as soon as it's done. So when the last result completes, most of the others would have have gotten to the client already, or be on their way.
  2. There's an extra hop of going to a worker first, before the client.
mrocklin commented 2 years ago

The delay could also be the new compute call, and graph submission time. In my experience that is a more likely culprit than data transfer time.

On Tue, Sep 27, 2022 at 5:01 PM Gabe Joseph @.***> wrote:

@scottshambaugh https://github.com/scottshambaugh I'd suggest profiling this whole thing with py-spy https://github.com/benfred/py-spy: py-spy record --idle --subprocesses --format=speedscope -o profile.json -- python repro.py. That should give you a more definitive look at what's going on in the white space.

@hayesgb https://github.com/hayesgb is correct that if you have a lot of things to compute, it's better to submit them all at once. But I also agree with you that this does feel slow.

Should it really take 5 whole seconds of wall time to gather all the results here when it's just a thousand ints?

Profiling should help show what's going on.

You might also be interested in #4754 https://github.com/dask/distributed/issues/4754, which I have a feeling this might end up being a duplicate of.

The way compute currently works is that first, all the results have to be aggregated onto one worker. Then, that aggregated result is sent to the client. This means:

  1. There's no overlap of communication and computation. Results don't start to move until the last result has been computed. Ideally, you'd want each result to start moving to the client as soon as it's done. So when the last result completes, most of the others would have have gotten to the client already, or be on their way.
  2. There's an extra hop of going to a worker first, before the client.

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/7066#issuecomment-1260099164, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTB4OIWBZDPDRWK2PE3WANVFJANCNFSM6AAAAAAQVINIW4 . You are receiving this because you are subscribed to this thread.Message ID: @.***>

scottshambaugh commented 1 year ago

Thank you all for the pointers!

@gjoseph92 attached is the speedscope profile of my original code which you can unzip to json and load with https://www.speedscope.app/ to examine. It does seem that there is a lot of time spent in each gather, as well as _get_computation_code on the first loop. Unfortunately it doesn't seem like there is more detail showing why it's taking so long to gather.

image

For my purposes I've moved to chaining things into a task graph, which has sped up things enough that I don't think I need resolution on this issue. I'll leave the issue open until someone reviews this and can confirm this is all expected behavior, at which point it can be closed.

profile.zip