dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Scheduler Experiments #4881

Open quasiben opened 3 years ago

quasiben commented 3 years ago
  1. Is tornado usage causing significant performance degradations ? a. Should we replace tornado with asyncio/custom library (uvloop friendly) b. Test UCX (TCP only) this will not use tornado for comms and give us some idea of the performance benefits if we replaced tornado

  2. Are we sending messages too frequently ? a. We can batch writes from worker -> scheduler by increasing worker batch sizes (2ms -> 20ms)

cc @gjoseph92

fjetter commented 3 years ago

Are we sending messages too frequently ?

I'd be interested in some statistics for this, e.g. how large are our batches we send on average (# msgs not necessarily byte size)

mrocklin commented 3 years ago

It would be easy to add some basic running statistics on the BatchedSend object to start. That's something that I could imagine putting into main.

On Fri, Jun 4, 2021 at 10:23 AM Florian Jetter @.***> wrote:

Are we sending messages too frequently ?

I'd be interested in some statistics for this, e.g. how large are our batches we send on average (# msgs not necessarily byte size)

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4881#issuecomment-854811673, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTBNTZQMF77Y2PN4LGDTRDVXNANCNFSM46C6PKPQ .

quasiben commented 3 years ago

UCX Experiment (Linux Only):

Build UCX/UCX-Py from latest:

conda create -n ucx -c conda-forge    automake make libtool pkg-config cython setuptools libhwloc psutil dask distributed
git clone https://github.com/openucx/ucx
conda activate ucx
cd ucx
./autogen
mkdir build
../contrib/configure-release --prefix=$CONDA_PREFIX --enable-debug --enable-mt --disable-cma --disable-numa
make -j install

cd -
git clone git@github.com:rapidsai/ucx-py
cd ucx-py
python -m pip install .

Configure scheduler:

UCX_TCP_TX_SEG_SIZE=8M UCX_TCP_RX_SEG_SIZE=8M dask-scheduler --protocol ucx

Worker:

UCX_TCP_TX_SEG_SIZE=8M UCX_TCP_RX_SEG_SIZE=8M dask-worker ucx://192.168.1.43:8786 --protocol ucx

Client

UCX_TCP_TX_SEG_SIZE=8M UCX_TCP_RX_SEG_SIZE=8M python foo.py

The UCX_TCP_TX/RX_SEG_SIZE env var is something which can be tuned. We've found 8M to be a good size (large requires fewer copies) for setting the send/receive buffer size but have not investigated deeply here

cc @gjoseph92

mrocklin commented 3 years ago

Gabe had decent results with batching messages, which shows that there are certainly some benefits to be gained. I do think it would be useful to do the UCX experiment as well to see if it's more about Tornado IOStreams or more about the concurrency system, but in general we have good signal that there is room for optimization here.

On Wed, Jun 9, 2021 at 3:11 PM Benjamin Zaitlen @.***> wrote:

UCX Experiment (Linux Only):

Build UCX/UCX-Py from latest:

conda create -n ucx -c conda-forge automake make libtool pkg-config cython setuptools libhwloc psutil dask distributed git clone https://github.com/openucx/ucx conda activate ucx cd ucx ./autogen mkdir build ../contrib/configure-release --prefix=$CONDA_PREFIX --enable-debug --enable-mt --disable-cma --disable-numa make -j install

cd - git clone @.***:rapidsai/ucx-py cd ucx-py python -m pip install .

Configure scheduler:

UCX_TCP_TX_SEG_SIZE=8M UCX_TCP_RX_SEG_SIZE=8M dask-scheduler --protocol ucx

Worker:

UCX_TCP_TX_SEG_SIZE=8M UCX_TCP_RX_SEG_SIZE=8M dask-worker ucx:// 192.168.1.43:8786 --protocol ucx

Client

UCX_TCP_TX_SEG_SIZE=8M UCX_TCP_RX_SEG_SIZE=8M python foo.py

The UCX_TCP_TX/RX_SEG_SIZE https://ucx-py.readthedocs.io/en/latest/configuration.html#ucx-tcp-rx-seg-size env var is something which can be tuned. We've found 8M to be a good size for setting the send/receive buffer size but have not investigated deeply here

cc @gjoseph92 https://github.com/gjoseph92

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4881#issuecomment-858068836, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGXMJDLO67QMD6Y7N3TR7DIFANCNFSM46C6PKPQ .

gjoseph92 commented 3 years ago

Here are some initial batched send results.

I made the worker batched send interval configurable in my distributed fork. Here are profile results:

Note that when looking at speedscope profiles, make sure MainThread is selected via the thread selector at the top or the n key

GC active (normal)

GC disabled (better for comparing speedscope profiles, so GC pauses don't skew runtimes)

Notable observations:

So I'm not seeing a strong conclusion in this yet. We know read is slow / has high overhead, and here we managed to call it a lot less—great! And it made things a bit better, though not a ton better. Perhaps it made them better by around how much we thought it would? I think having BatchedSend statistics and trying out more intervals would help here.

I also want to note again: turning off GC is still giving us a 30% reduction in runtime (gc -> no gc)! Also, in the system tab, the scheduler memory usage is roughly similar (would need to test on much longer-running clusters to be sure about that though). I still think this is worth more investigation.

mrocklin commented 3 years ago

This is exciting. I see two possible actions:

  1. Improve our comm infrastructure to use some lower-level asyncio protocol rather than Tornado
  2. Have the scheduler identify that it is under strain and ask workers to slow down on their batch rates. We do something similar in two ways today

    1. Heartbeat frequencies are sent from scheduler to workers on every heartbeat response. Based on the number of current workers the scheduler tells the worker to check back in in longer and longer intervals. This shows a mechanism by which we can convey information about desired timings to the worker
    2. The reevaluate_occupancy periodic callback regulates itself by tracking CPU usage.

    I suggest that we track CPU usage on the scheduler (we already do this with the SystemMonitor) and if that usage gets above a certain amount, and if we have received many messages, then we start asking workers to slow down. How do we check to see if we've received many messages? Maybe we keep an exponentially weighted moving average of the time between the last hundred messages. We can do this by keeping a bit of state on the scheduler and adding an every_cycle method that does an EMWA on every received message.

We should consider both. The second option is easier. The first option is more general.

fjetter commented 3 years ago

I am currently a bit surprised about the 2ms default interval. that's incredibly small (I cannot even ping my own router in that time but that's probably my crappy home network 😁 ). I would argue this should be at the very least a bit larger than our network latencies, especially considering that we're performing serialization and stuff as well. While our latency measurements are a bit crude, that could also be used for a minimal bound. otoh, I'm not a network expert and this comment may be nonsense 🤷

mrocklin commented 3 years ago

I remember originally tuning this number by running something like a rapid ping-pong computation and bringing this number down until it no longer had an effect. I agree that we could probably bump it up to something like 5ms by default and probably be fine. We could also make it configurable.

I like the idea of it being auto-tunable though. I think that this is probably pretty doable. It worked well for heartbeats. No one has complained about those being too frequent or too costly since we set up the auto-tuning behavior.

fjetter commented 3 years ago

We could also make it configurable.

I like the idea of it being auto-tunable though.

I agree with you and I would prefer the auto tune over introducing another expert level parameter no actual user can reason about. That also feels like the only sensible way to handle scaling well, even if it is as simple as the heartbeat scaling.

jakirkham commented 3 years ago

Gabe, have we been able to trackdown what is being GC'd? Is it particular *State objects, Task* objects, builtin Python objects, or something else? Also does it show up at a particular time like serialization, task submission, task completion, etc.?

gjoseph92 commented 3 years ago

@jakirkham I haven't looked into GC further. And I haven't noticed much pattern around when it shows up, though you can see on the profile it seems to be at a relatively regular interval.

gjoseph92 commented 3 years ago

Update: I've also tried a 60ms, 120ms, 500ms interval (just to push to the extreme). The amount of time spent in reads does decrease with each of these, but interestingly the overall runtime doesn't seem to change much.

GC active

GC off

Performance reports
Profiles

Notice how as the batched send interval increases, read becomes dominated by msgpack deserialization (unpackb) because Tornado/SSL IO gets smaller:

2ms (main) 20ms 60ms 120ms 500ms

mrocklin commented 3 years ago

Cool. As you warned, variation in total runtime is high. If the next coding step here is to choose a new interval (or interval range, if we're being adaptive) then we're going to want to know what min/max/default values of that range should be, which means that we might want to get more precise measurements.

FWIW, I expect us to focus more in the 2ms-20ms range than out in the 100ms range. I could be wrong though.