dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

deserialization seems much slower than expected #3925

Closed h3jia closed 4 years ago

h3jia commented 4 years ago

What happened & What you expected to happen:

See the notebook in the repo below. I was trying to parallelize some function with dask, but encountered a large overhead. I checked the dask dashboard, which says most of the time (~3 sec) is used to deserialize the function. Then I tried to manually serialize and deserialize the function with cloudpickle, which however only needs a few ms. I also tried to parallelize with multiprocess instead, which needs about 30 ms, 100 times faster than dask.

Minimal Complete Verifiable Example:

See this repo. I suppose it would work on Linux and Mac, but maybe not on Windows for now (because of the cython extensions). This example uses bayesfast, a package I'm currently developing, while I also encountered similar issues with another package (cosmosis to be exact, which is more difficult to install, so I didn't not post the cosmosis examples in that repo).

Anything else we need to know?:

  1. screenshot of the dask dashboard:

图片

  1. I guess it's because dask tries to use a combination of serializers & deserializers, but the optimization fails for the function here, so maybe it's worth trying to force dask use cloudpickle only? Although simply setting serializers=['cloudpickle'], deserializers=['cloudpickle'] for the Client does not work on my side (it raises a long error and does not return the correct result.)

Environment:

mrocklin commented 4 years ago

My first guess is that that deserialization time is actually the time to import your library. Is it possible that it takes a long time to import the first time?

On Thu, Jun 25, 2020 at 8:06 AM He Jia notifications@github.com wrote:

What happened & What you expected to happen:

See the notebook in the repo below. I was trying to parallelize some function with dask, but encountered a large overhead. I checked the dask dashboard, which says most of the time (~3 sec) is used to deserialize the function. Then I tried to manually serialize and deserialize the function with cloudpickle, which however only needs a few ms. I also tried to parallelize with multiprocess instead, which needs about 30 ms, 100 times faster than dask.

Minimal Complete Verifiable Example:

See this repo https://github.com/HerculesJack/test-dask. I suppose it would work on Linux and Mac, but maybe not on Windows for now (because of the cython extensions). This example uses bayesfast https://github.com/HerculesJack/bayesfast/tree/dev, a package I'm currently developing, while I also encountered similar issues with another package (cosmosis https://bitbucket.org/joezuntz/cosmosis/wiki/Home to be exact, which is more difficult to install, so I didn't not post cosmosis examples in that repo).

Anything else we need to know?:

  1. screenshot of the dask dashboard:

[image: 图片] https://user-images.githubusercontent.com/34429853/85745095-01889000-b738-11ea-91a3-f509eca718cd.png

  1. I guess it's because dask tries to use a combination of serializers & deserializers, but the optimization fails for the function here, so maybe it's worth trying to force dask use cloudpickle only? Although simply setting serializers=['cloudpickle'], deserializers=['cloudpickle'] for the Client does not work on my side (it raises a long error and does not return the correct result.)

Environment:

  • Dask version: 2.19.0
  • Python version: 3.6.10
  • Operating System: Ubuntu 18.04 Subsystem on Windows 10
  • Install method (conda, pip, source): conda-forge

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3925, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTEAPQQ3RBRHO64HCDLRYNRWNANCNFSM4OIOZK3Q .

h3jia commented 4 years ago

@mrocklin Thanks for the quick reply. So I just tried this at the end of the notebook:

import cloudpickle

with open('foo.p', "wb") as f:
    cloudpickle.dump(den, f)

# restart the kernel

import cloudpickle

with open('foo.p', "rb") as f:
    %time foo = cloudpickle.load(f)

The timing is about 1.9 sec, consistent with the timing for importing the bayesfast library. So yes, a substantial part of the deserialization time (~3-4 sec) is used to import the library, but still there's a 1-2 sec overhead that is caused by some other reason.

For the other cosmosis example (which I haven't uploaded to the repo), I found that the dask parallelization overhead (12 jobs, 12 workers) can be ~20-30 sec, while the timing of the similar cloudpickle.load test is only ~300 ms. It seems that here the majority of the overhead is not caused by importing the library.

TomAugspurger commented 4 years ago

but still there's a 1-2 sec overhead that is caused by some other reason.

Can you time running a function that imports the library on the cluster? Depending on the filesystem your workers are loading the module from it might take longer that your local file system.

mrocklin commented 4 years ago

Or having many Python processes load the module at once might be slow on your file system.

On Fri, Jun 26, 2020 at 4:51 AM Tom Augspurger notifications@github.com wrote:

but still there's a 1-2 sec overhead that is caused by some other reason.

Can you time running a function that imports the library on the cluster? Depending on the filesystem your workers are loading the module from it might take longer that your local file system.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3925#issuecomment-650138316, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTCMJKF2YWRT6UGWFSLRYSDU5ANCNFSM4OIOZK3Q .

mrocklin commented 4 years ago

You might try adding your module name as a preload for your workers. That way you can separate import time from deserialization time. https://docs.dask.org/en/latest/setup/custom-startup.html

On Fri, Jun 26, 2020 at 6:41 AM Matthew Rocklin mrocklin@gmail.com wrote:

Or having many Python processes load the module at once might be slow on your file system.

On Fri, Jun 26, 2020 at 4:51 AM Tom Augspurger notifications@github.com wrote:

but still there's a 1-2 sec overhead that is caused by some other reason.

Can you time running a function that imports the library on the cluster? Depending on the filesystem your workers are loading the module from it might take longer that your local file system.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3925#issuecomment-650138316, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTCMJKF2YWRT6UGWFSLRYSDU5ANCNFSM4OIOZK3Q .

h3jia commented 4 years ago

Can you time running a function that imports the library on the cluster? Depending on the filesystem your workers are loading the module from it might take longer that your local file system.

Or having many Python processes load the module at once might be slow on your file system.

OK, this explains it. It's true that importing the module is slower on the workers, and on a 32-core machine it's slower if I have 20 instead of 4 workers.

Thanks for helping to debug this issue. Maybe for now I'll turn to multiprocess for single machine parallelization, since it uses fork and seems to not suffer from this slow import problem...

mrocklin commented 4 years ago

Sounds good. You might also want to try setting the dask config value distributed.worker.multiprocessing-method: fork

On Fri, Jun 26, 2020 at 7:58 PM He Jia notifications@github.com wrote:

Closed #3925 https://github.com/dask/distributed/issues/3925.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3925#event-3488594064, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTH7ODNMIMVE7L26UKDRYVN3TANCNFSM4OIOZK3Q .

h3jia commented 4 years ago

Sounds good. You might also want to try setting the dask config value distributed.worker.multiprocessing-method: fork

Thanks for the suggestion, though I just tried this and still saw the overhead:

import dask
dask.config.set({'distributed.worker.multiprocessing-method': 'fork'})
from distributed import Client, LocalCluster

Btw, I also tried another package called loky which claims to use fork on POSIX by dafault, and found similar level of overhead as dask. For now it seems that multiprocess is the only package that does not suffer from the overhead on my side. I guess it's because the internal mechanism of fork are different among these packages?

mrocklin commented 4 years ago

They all use the multiprocessing library

On Sat, Jun 27, 2020 at 9:19 AM He Jia notifications@github.com wrote:

Sounds good. You might also want to try setting the dask config value distributed.worker.multiprocessing-method: fork

Thanks for the suggestion, though I just tried this and still saw the overhead:

import dask dask.config.set({'distributed.worker.multiprocessing-method': 'fork'}) from distributed import Client, LocalCluster

Btw, I also tried another package called loky which claims to use fork on POSIX by dafault, and found similar level of overhead as dask. For now it seems that multiprocess is the only package that does not suffer from the overhead on my side. I guess it's because the internal mechanism of fork are different among these packages?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3925#issuecomment-650581372, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTBD5GS4CYW26RZAWS3RYYLXXANCNFSM4OIOZK3Q .