dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 720 forks source link

Testing network performance #5258

Open gjoseph92 opened 3 years ago

gjoseph92 commented 3 years ago

I wrote a quick script to test network performance between workers.

We try 3 approaches:

This script was a quick hack (DataFrame size doesn't match iperf3 transfer size, for instance) but others might still find it interesting.

The script ```python import time from dask.utils import format_bytes import distributed from distributed.comm.addressing import parse_address, parse_host_port from distributed.protocol import to_serialize import numpy as np import pandas as pd import coiled def test_tasks(client: distributed.Client): "Test network performance using tasks (scheduler forces a transfer)" client.wait_for_workers(2) client.restart() a, b, *workers = client.scheduler_info()["workers"] print(f"send: {a} recv: {b} - performance over task `get_data`") # Store data on a global variable so we don't have to recompute distributed.wait( client.submit( lambda: setattr( distributed, "GLOBAL_DF", pd.DataFrame(np.random.random((30_000, 1000))) ), workers=[a], pure=False, ) ) size = client.submit( lambda: distributed.GLOBAL_DF.memory_usage().sum(), workers=[a] ).result() for i in range(15): dff = client.submit( lambda: distributed.GLOBAL_DF, workers=[a], pure=False, ) start = time.perf_counter() distributed.wait(client.submit(lambda df: None, dff, workers=[b])) elapsed = time.perf_counter() - start print( f"{format_bytes(size)}: {elapsed:.2f}sec, {format_bytes(size / elapsed)}/sec" ) # Clean up the global variable distributed.wait( client.submit( lambda: delattr(distributed, "GLOBAL_DF"), workers=[a], pure=False ) ) def test_handlers(client: distributed.Client): "Test network performance using pure comms handlers" client.wait_for_workers(2) client.restart() a, b = client.scheduler_info()["workers"] print(f"send: {a} recv: {b} - performance over comms handler") async def send(dask_worker: distributed.Worker): df = pd.DataFrame(np.random.random((30_000, 1000))) dask_worker._send_size = df.memory_usage().sum() s = to_serialize(df) dask_worker._send_times = [] while True: start = time.perf_counter() await dask_worker.rpc(b).stuff_receive(data=s) elapsed = time.perf_counter() - start dask_worker._send_times.append(elapsed) def add_receiver(dask_worker: distributed.Worker): def receive(comm, data=None): pass dask_worker.handlers["stuff_receive"] = receive client.run(add_receiver, workers=[b]) client.run(send, workers=[a], wait=False) def get_times(dask_worker: distributed.Worker): times = dask_worker._send_times dask_worker._send_times = [] return dask_worker._send_size, times for i in range(8): time.sleep(2) size, times = client.run(get_times, workers=[a])[a] for t in times: print(f"{format_bytes(size)}: {t:.2f}sec, {format_bytes(size / t)}/sec") # TODO stop send coroutine and clean up handlers def test_iperf(client: distributed.Client): "Install iperf on workers and test network and disk performance with it" import subprocess client.wait_for_workers(2) client.restart() try: client.run( subprocess.run, "iperf3 -v", shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except subprocess.CalledProcessError: print("Installing iperf3 on workers from conda...") client.run( subprocess.run, "conda install -c conda-forge iperf", shell=True, check=True ) a, b = client.scheduler_info()["workers"] a_ip = parse_host_port(parse_address(a)[1])[0] b_ip = parse_host_port(parse_address(b)[1])[0] print(f"A: {a} B: {b} - performance from iperf3") print("B memory -> A memory") # start iperf server (daemon & oneshot mode so `client.run` doesn't block) client.run( subprocess.run, "iperf3 -s -D -1 -p 5001", shell=True, check=True, workers=[a] ) result = client.run( subprocess.run, f"iperf3 -c {a_ip} -p 5001 -f M -t 30", shell=True, capture_output=True, text=True, workers=[b], ) proc = result[b] print(proc.stdout) if proc.returncode != 0: print(proc.stderr) return print("B memory -> A disk") # reference: https://fasterdata.es.net/performance-testing/network-troubleshooting-tools/iperf/disk-testing-using-iperf/ client.run( subprocess.run, "iperf3 -s -D -1 -p 5001 -F iperf_out", shell=True, workers=[a], ) time.sleep(0.5) result = client.run( subprocess.run, f"iperf3 -c {a_ip} -p 5001 -f M -t 15", shell=True, capture_output=True, text=True, workers=[b], ) proc = result[b] print(proc.stdout) if proc.returncode != 0: print(proc.stderr) return print("A disk -> B disk") client.run( subprocess.run, "iperf3 -s -D -1 -p 5001 -F iperf_out", shell=True, workers=[b], ) result = client.run( subprocess.run, f"iperf3 -c {b_ip} -p 5001 -f M -F iperf_out -t 30", shell=True, capture_output=True, text=True, workers=[a], ) proc = result[a] print(proc.stdout) if proc.returncode != 0: print(proc.stderr) return client.run( subprocess.run, "rm iperf_out", shell=True, check=True, ) if __name__ == "__main__": with distributed.Client( memory_limit=None, n_workers=2, processes=True, worker_class=distributed.Worker, threads_per_worker=1, scheduler_port=8786, ) as client: # cluster = coiled.Cluster( # name="perf", # software="gjoseph92/shuffleservice", # n_workers=2, # worker_cpu=2, # worker_memory="2GiB", # scheduler_cpu=1, # scheduler_memory="2GiB", # ) # with distributed.Client(cluster) as client: test_tasks(client) test_handlers(client) test_iperf(client) ```

Initial results:

On a Coiled cluster (docker on AWS EC2 VMs; don't know the exact instance type, but I requested 2CPU and 2GiB memory, so something low-end):

So dask's networking is only half as fast as raw TCP here. That's better than I expected actually. Using comms handlers directly is faster, though not hugely. Also not surprising.

On these low-end EC2 nodes, networking is slow. And disk is very slow.

Full results ``` (env) gabe dask-playground/shuffle-service » python network.py Using existing cluster: 'perf' send: tls://10.6.20.175:33093 recv: tls://10.6.31.53:42065 - performance over task `get_data` 228.88 MiB: 1.12sec, 204.94 MiB/sec 228.88 MiB: 1.06sec, 216.87 MiB/sec 228.88 MiB: 1.14sec, 200.35 MiB/sec 228.88 MiB: 1.20sec, 190.22 MiB/sec 228.88 MiB: 1.23sec, 186.78 MiB/sec 228.88 MiB: 0.84sec, 272.11 MiB/sec 228.88 MiB: 0.87sec, 261.72 MiB/sec 228.88 MiB: 0.87sec, 264.40 MiB/sec 228.88 MiB: 0.82sec, 278.06 MiB/sec 228.88 MiB: 0.89sec, 256.02 MiB/sec send: tls://10.6.20.175:43659 recv: tls://10.6.31.53:46709 - performance over comms handler 228.88 MiB: 0.79sec, 290.32 MiB/sec 228.88 MiB: 0.68sec, 336.06 MiB/sec 228.88 MiB: 0.85sec, 268.17 MiB/sec 228.88 MiB: 0.93sec, 245.97 MiB/sec 228.88 MiB: 0.68sec, 334.61 MiB/sec 228.88 MiB: 0.74sec, 308.71 MiB/sec 228.88 MiB: 0.70sec, 328.58 MiB/sec 228.88 MiB: 0.75sec, 303.90 MiB/sec 228.88 MiB: 0.79sec, 288.88 MiB/sec 228.88 MiB: 0.75sec, 304.38 MiB/sec 228.88 MiB: 0.73sec, 315.54 MiB/sec 228.88 MiB: 0.75sec, 303.72 MiB/sec 228.88 MiB: 0.72sec, 319.23 MiB/sec 228.88 MiB: 1.12sec, 204.12 MiB/sec 228.88 MiB: 0.77sec, 298.89 MiB/sec 228.88 MiB: 0.74sec, 307.82 MiB/sec 228.88 MiB: 0.78sec, 292.28 MiB/sec 228.88 MiB: 0.72sec, 318.10 MiB/sec 228.88 MiB: 0.85sec, 268.82 MiB/sec 228.88 MiB: 0.82sec, 279.50 MiB/sec 228.88 MiB: 0.74sec, 310.35 MiB/sec 228.88 MiB: 0.78sec, 294.62 MiB/sec 228.88 MiB: 0.77sec, 295.47 MiB/sec 228.88 MiB: 0.70sec, 327.29 MiB/sec 228.88 MiB: 0.78sec, 294.05 MiB/sec 228.88 MiB: 0.68sec, 335.36 MiB/sec A: tls://10.6.20.175:45103 B: tls://10.6.31.53:39869 - performance from iperf3 B memory -> A memory Connecting to host 10.6.20.175, port 5001 [ 5] local 10.6.31.53 port 58252 connected to 10.6.20.175 port 5001 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 597 MBytes 597 MBytes/sec 0 1.89 MBytes [ 5] 1.00-2.00 sec 592 MBytes 592 MBytes/sec 0 2.44 MBytes [ 5] 2.00-3.00 sec 589 MBytes 589 MBytes/sec 0 2.85 MBytes [ 5] 3.00-4.00 sec 588 MBytes 587 MBytes/sec 0 3.00 MBytes [ 5] 4.00-5.00 sec 590 MBytes 590 MBytes/sec 0 3.00 MBytes [ 5] 5.00-6.00 sec 580 MBytes 580 MBytes/sec 46 2.26 MBytes [ 5] 6.00-7.00 sec 594 MBytes 594 MBytes/sec 0 2.45 MBytes [ 5] 7.00-8.00 sec 590 MBytes 590 MBytes/sec 2 1.97 MBytes [ 5] 8.00-9.00 sec 591 MBytes 591 MBytes/sec 0 2.13 MBytes [ 5] 9.00-10.00 sec 591 MBytes 591 MBytes/sec 0 2.24 MBytes [ 5] 10.00-11.00 sec 588 MBytes 588 MBytes/sec 0 2.29 MBytes [ 5] 11.00-12.00 sec 585 MBytes 585 MBytes/sec 15 1.66 MBytes [ 5] 12.00-13.00 sec 588 MBytes 587 MBytes/sec 0 1.94 MBytes [ 5] 13.00-14.00 sec 590 MBytes 590 MBytes/sec 0 2.13 MBytes [ 5] 14.00-15.00 sec 592 MBytes 592 MBytes/sec 0 2.23 MBytes [ 5] 15.00-16.00 sec 592 MBytes 593 MBytes/sec 0 2.27 MBytes [ 5] 16.00-17.00 sec 592 MBytes 593 MBytes/sec 0 2.30 MBytes [ 5] 17.00-18.00 sec 592 MBytes 592 MBytes/sec 8 1.78 MBytes [ 5] 18.00-19.00 sec 592 MBytes 592 MBytes/sec 0 2.07 MBytes [ 5] 19.00-20.00 sec 592 MBytes 593 MBytes/sec 0 2.17 MBytes [ 5] 20.00-21.00 sec 590 MBytes 590 MBytes/sec 0 2.24 MBytes [ 5] 21.00-22.00 sec 594 MBytes 594 MBytes/sec 0 2.30 MBytes [ 5] 22.00-23.00 sec 586 MBytes 586 MBytes/sec 0 2.30 MBytes [ 5] 23.00-24.00 sec 592 MBytes 593 MBytes/sec 0 2.30 MBytes [ 5] 24.00-25.00 sec 594 MBytes 594 MBytes/sec 0 2.31 MBytes [ 5] 25.00-26.00 sec 581 MBytes 581 MBytes/sec 0 2.36 MBytes [ 5] 26.00-27.00 sec 592 MBytes 592 MBytes/sec 0 2.39 MBytes [ 5] 27.00-28.00 sec 592 MBytes 593 MBytes/sec 0 2.62 MBytes [ 5] 28.00-29.00 sec 592 MBytes 592 MBytes/sec 0 2.73 MBytes [ 5] 29.00-30.00 sec 594 MBytes 594 MBytes/sec 0 2.73 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-30.00 sec 17.3 GBytes 591 MBytes/sec 71 sender [ 5] 0.00-30.00 sec 17.3 GBytes 590 MBytes/sec receiver iperf Done. B memory -> A disk Connecting to host 10.6.20.175, port 5001 [ 5] local 10.6.31.53 port 58258 connected to 10.6.20.175 port 5001 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 48.8 MBytes 48.8 MBytes/sec 0 288 KBytes [ 5] 1.00-2.00 sec 45.3 MBytes 45.3 MBytes/sec 0 297 KBytes [ 5] 2.00-3.00 sec 46.0 MBytes 46.0 MBytes/sec 0 297 KBytes [ 5] 3.00-4.00 sec 46.7 MBytes 46.7 MBytes/sec 0 315 KBytes [ 5] 4.00-5.00 sec 46.6 MBytes 46.6 MBytes/sec 0 350 KBytes [ 5] 5.00-6.00 sec 45.0 MBytes 45.0 MBytes/sec 0 350 KBytes [ 5] 6.00-7.00 sec 44.5 MBytes 44.5 MBytes/sec 0 350 KBytes [ 5] 7.00-8.00 sec 45.5 MBytes 45.5 MBytes/sec 0 350 KBytes [ 5] 8.00-9.00 sec 45.2 MBytes 45.2 MBytes/sec 0 350 KBytes [ 5] 9.00-10.00 sec 45.5 MBytes 45.5 MBytes/sec 0 350 KBytes [ 5] 10.00-11.00 sec 45.5 MBytes 45.5 MBytes/sec 0 350 KBytes [ 5] 11.00-12.00 sec 42.7 MBytes 42.7 MBytes/sec 0 350 KBytes [ 5] 12.00-13.00 sec 45.0 MBytes 45.0 MBytes/sec 0 350 KBytes [ 5] 13.00-14.00 sec 43.0 MBytes 43.0 MBytes/sec 0 350 KBytes [ 5] 14.00-15.00 sec 41.5 MBytes 41.5 MBytes/sec 0 350 KBytes [ 5] 15.00-16.00 sec 41.0 MBytes 41.0 MBytes/sec 0 350 KBytes [ 5] 16.00-17.00 sec 43.6 MBytes 43.5 MBytes/sec 0 350 KBytes [ 5] 17.00-18.00 sec 43.6 MBytes 43.6 MBytes/sec 0 350 KBytes [ 5] 18.00-19.00 sec 45.6 MBytes 45.6 MBytes/sec 0 350 KBytes [ 5] 19.00-20.00 sec 46.1 MBytes 46.1 MBytes/sec 0 350 KBytes [ 5] 20.00-21.00 sec 45.8 MBytes 45.8 MBytes/sec 0 350 KBytes [ 5] 21.00-22.00 sec 42.5 MBytes 42.5 MBytes/sec 0 350 KBytes [ 5] 22.00-23.00 sec 43.3 MBytes 43.3 MBytes/sec 0 367 KBytes [ 5] 23.00-24.00 sec 42.1 MBytes 42.1 MBytes/sec 0 367 KBytes [ 5] 24.00-25.00 sec 43.1 MBytes 43.1 MBytes/sec 0 367 KBytes [ 5] 25.00-26.00 sec 43.9 MBytes 43.9 MBytes/sec 0 385 KBytes [ 5] 26.00-27.00 sec 43.1 MBytes 43.1 MBytes/sec 0 385 KBytes [ 5] 27.00-28.00 sec 42.1 MBytes 42.1 MBytes/sec 0 385 KBytes [ 5] 28.00-29.00 sec 42.1 MBytes 42.1 MBytes/sec 0 385 KBytes [ 5] 29.00-30.00 sec 43.6 MBytes 43.6 MBytes/sec 0 385 KBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-30.00 sec 1.30 GBytes 44.3 MBytes/sec 0 sender [ 5] 0.00-30.00 sec 1.29 GBytes 44.2 MBytes/sec receiver iperf Done. A disk -> B disk Connecting to host 10.6.31.53, port 5001 [ 5] local 10.6.20.175 port 59674 connected to 10.6.31.53 port 5001 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 9.75 MBytes 9.75 MBytes/sec 11 271 KBytes [ 5] 1.00-2.00 sec 4.84 MBytes 4.84 MBytes/sec 11 271 KBytes [ 5] 2.00-3.00 sec 5.79 MBytes 5.79 MBytes/sec 12 271 KBytes [ 5] 3.00-4.00 sec 5.20 MBytes 5.20 MBytes/sec 11 271 KBytes [ 5] 4.00-5.00 sec 5.02 MBytes 5.02 MBytes/sec 9 271 KBytes [ 5] 5.00-6.00 sec 5.14 MBytes 5.14 MBytes/sec 11 271 KBytes [ 5] 6.00-7.00 sec 4.48 MBytes 4.48 MBytes/sec 9 271 KBytes [ 5] 7.00-8.00 sec 6.27 MBytes 6.27 MBytes/sec 13 271 KBytes [ 5] 8.00-9.00 sec 7.29 MBytes 7.29 MBytes/sec 12 271 KBytes [ 5] 9.00-10.00 sec 5.44 MBytes 5.44 MBytes/sec 12 271 KBytes [ 5] 10.00-11.00 sec 5.91 MBytes 5.91 MBytes/sec 11 271 KBytes [ 5] 11.00-12.00 sec 5.32 MBytes 5.32 MBytes/sec 11 271 KBytes [ 5] 12.00-13.00 sec 5.14 MBytes 5.14 MBytes/sec 10 271 KBytes [ 5] 13.00-14.00 sec 5.79 MBytes 5.79 MBytes/sec 12 271 KBytes [ 5] 14.00-15.00 sec 5.26 MBytes 5.25 MBytes/sec 10 271 KBytes [ 5] 15.00-16.00 sec 5.62 MBytes 5.62 MBytes/sec 13 271 KBytes [ 5] 16.00-17.00 sec 5.50 MBytes 5.50 MBytes/sec 11 271 KBytes [ 5] 17.00-18.00 sec 4.84 MBytes 4.84 MBytes/sec 10 271 KBytes [ 5] 18.00-19.00 sec 5.14 MBytes 5.14 MBytes/sec 10 271 KBytes [ 5] 19.00-20.00 sec 5.50 MBytes 5.50 MBytes/sec 10 271 KBytes [ 5] 20.00-21.00 sec 4.66 MBytes 4.66 MBytes/sec 10 271 KBytes [ 5] 21.00-22.00 sec 5.38 MBytes 5.37 MBytes/sec 10 271 KBytes [ 5] 22.00-23.00 sec 5.38 MBytes 5.38 MBytes/sec 12 271 KBytes [ 5] 23.00-24.00 sec 5.74 MBytes 5.74 MBytes/sec 11 271 KBytes [ 5] 24.00-25.00 sec 4.00 MBytes 4.00 MBytes/sec 8 271 KBytes [ 5] 25.00-26.00 sec 4.84 MBytes 4.84 MBytes/sec 10 271 KBytes [ 5] 26.00-27.00 sec 5.02 MBytes 5.02 MBytes/sec 10 271 KBytes [ 5] 27.00-28.00 sec 4.90 MBytes 4.90 MBytes/sec 9 271 KBytes [ 5] 28.00-29.00 sec 5.74 MBytes 5.73 MBytes/sec 11 271 KBytes [ 5] 29.00-30.00 sec 4.96 MBytes 4.96 MBytes/sec 11 271 KBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-30.00 sec 164 MBytes 5.46 MBytes/sec 321 sender Sent 164 MByte / 1.29 GByte (12%) of iperf_out [ 5] 0.00-30.03 sec 162 MBytes 5.38 MBytes/sec receiver iperf Done. ```

Locally on my mac:

Full results ```python (env) gabe dask-playground/shuffle-service » python network.py /Users/gabe/dev/dask-playground/env/lib/python3.9/site-packages/pandas/compat/__init__.py:124: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError. warnings.warn(msg) /Users/gabe/dev/dask-playground/env/lib/python3.9/site-packages/setuptools/distutils_patch.py:25: UserWarning: Distutils was imported before Setuptools. This usage is discouraged and may exhibit undesirable behaviors or errors. Please use Setuptools' objects directly or at least import Setuptools first. warnings.warn( send: tcp://127.0.0.1:57650 recv: tcp://127.0.0.1:57652 - performance over task `get_data` 228.88 MiB: 0.22sec, 1.02 GiB/sec 228.88 MiB: 0.13sec, 1.74 GiB/sec 228.88 MiB: 0.15sec, 1.53 GiB/sec 228.88 MiB: 0.12sec, 1.89 GiB/sec 228.88 MiB: 0.12sec, 1.86 GiB/sec 228.88 MiB: 0.12sec, 1.81 GiB/sec 228.88 MiB: 0.12sec, 1.82 GiB/sec 228.88 MiB: 0.12sec, 1.83 GiB/sec 228.88 MiB: 0.15sec, 1.51 GiB/sec 228.88 MiB: 0.13sec, 1.76 GiB/sec 228.88 MiB: 0.12sec, 1.91 GiB/sec 228.88 MiB: 0.11sec, 1.98 GiB/sec 228.88 MiB: 0.12sec, 1.83 GiB/sec 228.88 MiB: 0.12sec, 1.87 GiB/sec 228.88 MiB: 0.14sec, 1.62 GiB/sec send: tcp://127.0.0.1:57650 recv: tcp://127.0.0.1:57652 - performance over comms handler 228.88 MiB: 0.17sec, 1.32 GiB/sec 228.88 MiB: 0.10sec, 2.33 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.09sec, 2.39 GiB/sec 228.88 MiB: 0.10sec, 2.30 GiB/sec 228.88 MiB: 0.10sec, 2.28 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.09sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.31 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.10sec, 2.28 GiB/sec 228.88 MiB: 0.09sec, 2.38 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.29 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.10sec, 2.13 GiB/sec 228.88 MiB: 0.10sec, 2.18 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.09sec, 2.38 GiB/sec 228.88 MiB: 0.10sec, 2.25 GiB/sec 228.88 MiB: 0.09sec, 2.38 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.10sec, 2.28 GiB/sec 228.88 MiB: 0.10sec, 2.34 GiB/sec 228.88 MiB: 0.09sec, 2.41 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.10sec, 2.31 GiB/sec 228.88 MiB: 0.10sec, 2.33 GiB/sec 228.88 MiB: 0.09sec, 2.39 GiB/sec 228.88 MiB: 0.09sec, 2.40 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.27 GiB/sec 228.88 MiB: 0.11sec, 2.11 GiB/sec 228.88 MiB: 0.11sec, 2.01 GiB/sec 228.88 MiB: 0.11sec, 2.09 GiB/sec 228.88 MiB: 0.11sec, 2.04 GiB/sec 228.88 MiB: 0.10sec, 2.24 GiB/sec 228.88 MiB: 0.10sec, 2.28 GiB/sec 228.88 MiB: 0.10sec, 2.34 GiB/sec 228.88 MiB: 0.10sec, 2.34 GiB/sec 228.88 MiB: 0.10sec, 2.27 GiB/sec 228.88 MiB: 0.10sec, 2.16 GiB/sec 228.88 MiB: 0.10sec, 2.19 GiB/sec 228.88 MiB: 0.10sec, 2.24 GiB/sec 228.88 MiB: 0.10sec, 2.22 GiB/sec 228.88 MiB: 0.10sec, 2.22 GiB/sec 228.88 MiB: 0.10sec, 2.19 GiB/sec 228.88 MiB: 0.10sec, 2.24 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.09sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.25 GiB/sec 228.88 MiB: 0.10sec, 2.16 GiB/sec 228.88 MiB: 0.12sec, 1.91 GiB/sec 228.88 MiB: 0.10sec, 2.26 GiB/sec 228.88 MiB: 0.10sec, 2.27 GiB/sec 228.88 MiB: 0.11sec, 2.02 GiB/sec 228.88 MiB: 0.15sec, 1.48 GiB/sec 228.88 MiB: 0.10sec, 2.25 GiB/sec 228.88 MiB: 0.10sec, 2.16 GiB/sec 228.88 MiB: 0.10sec, 2.23 GiB/sec 228.88 MiB: 0.10sec, 2.15 GiB/sec 228.88 MiB: 0.11sec, 1.98 GiB/sec 228.88 MiB: 0.10sec, 2.20 GiB/sec 228.88 MiB: 0.10sec, 2.15 GiB/sec 228.88 MiB: 0.10sec, 2.25 GiB/sec 228.88 MiB: 0.10sec, 2.26 GiB/sec 228.88 MiB: 0.10sec, 2.28 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.10sec, 2.29 GiB/sec 228.88 MiB: 0.11sec, 2.00 GiB/sec 228.88 MiB: 0.10sec, 2.29 GiB/sec 228.88 MiB: 0.10sec, 2.19 GiB/sec 228.88 MiB: 0.11sec, 2.09 GiB/sec 228.88 MiB: 0.10sec, 2.23 GiB/sec 228.88 MiB: 0.13sec, 1.72 GiB/sec 228.88 MiB: 0.11sec, 2.10 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.10sec, 2.30 GiB/sec 228.88 MiB: 0.10sec, 2.13 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.10sec, 2.16 GiB/sec 228.88 MiB: 0.12sec, 1.81 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.09sec, 2.38 GiB/sec 228.88 MiB: 0.09sec, 2.38 GiB/sec 228.88 MiB: 0.10sec, 2.14 GiB/sec 228.88 MiB: 0.10sec, 2.21 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.10sec, 2.31 GiB/sec 228.88 MiB: 0.10sec, 2.21 GiB/sec 228.88 MiB: 0.12sec, 1.81 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.10sec, 2.23 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.10sec, 2.34 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.10sec, 2.29 GiB/sec 228.88 MiB: 0.12sec, 1.92 GiB/sec 228.88 MiB: 0.10sec, 2.30 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.31 GiB/sec 228.88 MiB: 0.10sec, 2.30 GiB/sec 228.88 MiB: 0.10sec, 2.34 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.09sec, 2.39 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.10sec, 2.22 GiB/sec 228.88 MiB: 0.13sec, 1.77 GiB/sec 228.88 MiB: 0.10sec, 2.26 GiB/sec 228.88 MiB: 0.10sec, 2.34 GiB/sec 228.88 MiB: 0.10sec, 2.30 GiB/sec 228.88 MiB: 0.10sec, 2.23 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.34 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.31 GiB/sec 228.88 MiB: 0.09sec, 2.40 GiB/sec 228.88 MiB: 0.12sec, 1.92 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.10sec, 2.31 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.09sec, 2.39 GiB/sec 228.88 MiB: 0.09sec, 2.38 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.28 GiB/sec 228.88 MiB: 0.10sec, 2.30 GiB/sec 228.88 MiB: 0.12sec, 1.87 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.26 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.09sec, 2.40 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.10sec, 2.32 GiB/sec 228.88 MiB: 0.10sec, 2.34 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.09sec, 2.37 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec 228.88 MiB: 0.10sec, 2.30 GiB/sec 228.88 MiB: 0.10sec, 2.29 GiB/sec 228.88 MiB: 0.10sec, 2.34 GiB/sec 228.88 MiB: 0.09sec, 2.39 GiB/sec 228.88 MiB: 0.09sec, 2.38 GiB/sec 228.88 MiB: 0.10sec, 2.35 GiB/sec 228.88 MiB: 0.10sec, 2.29 GiB/sec 228.88 MiB: 0.09sec, 2.36 GiB/sec A: tcp://127.0.0.1:57650 B: tcp://127.0.0.1:57652 - performance from iperf3 B memory -> A memory Connecting to host 127.0.0.1, port 5001 [ 5] local 127.0.0.1 port 57689 connected to 127.0.0.1 port 5001 [ ID] Interval Transfer Bitrate [ 5] 0.00-1.00 sec 7.39 GBytes 7566 MBytes/sec [ 5] 1.00-2.00 sec 6.35 GBytes 6507 MBytes/sec [ 5] 2.00-3.00 sec 7.18 GBytes 7355 MBytes/sec [ 5] 3.00-4.00 sec 7.30 GBytes 7480 MBytes/sec [ 5] 4.00-5.00 sec 6.87 GBytes 7039 MBytes/sec [ 5] 5.00-6.00 sec 7.92 GBytes 8112 MBytes/sec [ 5] 6.00-7.00 sec 7.86 GBytes 8054 MBytes/sec [ 5] 7.00-8.00 sec 7.88 GBytes 8065 MBytes/sec [ 5] 8.00-9.00 sec 7.61 GBytes 7795 MBytes/sec [ 5] 9.00-10.00 sec 6.83 GBytes 6996 MBytes/sec [ 5] 10.00-11.00 sec 7.15 GBytes 7324 MBytes/sec [ 5] 11.00-12.00 sec 7.79 GBytes 7974 MBytes/sec [ 5] 12.00-13.00 sec 7.80 GBytes 7989 MBytes/sec [ 5] 13.00-14.00 sec 7.84 GBytes 8026 MBytes/sec [ 5] 14.00-15.00 sec 7.86 GBytes 8044 MBytes/sec [ 5] 15.00-16.00 sec 7.79 GBytes 7979 MBytes/sec [ 5] 16.00-17.00 sec 7.92 GBytes 8110 MBytes/sec [ 5] 17.00-18.00 sec 6.76 GBytes 6921 MBytes/sec [ 5] 18.00-19.00 sec 7.45 GBytes 7627 MBytes/sec [ 5] 19.00-20.00 sec 7.38 GBytes 7558 MBytes/sec [ 5] 20.00-21.00 sec 7.52 GBytes 7699 MBytes/sec [ 5] 21.00-22.00 sec 7.22 GBytes 7397 MBytes/sec [ 5] 22.00-23.00 sec 7.26 GBytes 7432 MBytes/sec [ 5] 23.00-24.00 sec 7.00 GBytes 7170 MBytes/sec [ 5] 24.00-25.00 sec 7.19 GBytes 7362 MBytes/sec [ 5] 25.00-26.00 sec 6.80 GBytes 6961 MBytes/sec [ 5] 26.00-27.00 sec 6.91 GBytes 7072 MBytes/sec [ 5] 27.00-28.00 sec 7.22 GBytes 7390 MBytes/sec [ 5] 28.00-29.00 sec 7.41 GBytes 7591 MBytes/sec [ 5] 29.00-30.00 sec 7.31 GBytes 7484 MBytes/sec - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate [ 5] 0.00-30.00 sec 221 GBytes 7536 MBytes/sec sender [ 5] 0.00-30.00 sec 221 GBytes 7536 MBytes/sec receiver iperf Done. ```

Dask lags further behind a very fast network, but it's still much faster than a slow network! Does it need to be faster? How often are we actually bandwidth-constrained (versus being constrained by event loop/GIL/worker threads/things that can do something useful with that bandwidth)?

mrocklin commented 3 years ago

Thanks for doing this @gjoseph92 . This is interesting.

task dependencies: 180-280MiB/sec comms handler: 290-330MiB/sec iperf3 raw TCP: 590 MBytes/sec

So dask's networking is only half as fast as raw TCP here. That's better than I expected actually. Using comms handlers directly is faster, though not hugely. Also not surprising.

I actually am surprised by both of these actually.

For reference, my historical expectation is that on a full (non-virtual) machine I would expect the following:

So adding bandwidths as one does in an inverse summation kind of way, I would us expect to get something like 700MB/s through Dask comms. This is obviously hardware dependent though, and certainly VMs are not real "M"s.

cc @quasiben @jakirkham

mrocklin commented 3 years ago

@gjoseph92 for the purposes of tuning shuffle computations, I think that we should build up some intuition around the size of dataframes when using comms. I would be curious to know the expected bandwidth as we send dataframes of varying sizes. Do we stay at the 300MB/s level if we shift from 100MB dataframes to 20MB? to 5MB? to 1MB? to 100kB? Understanding that response curve is probably useful for us.

mrocklin commented 3 years ago

Dask lags further behind a very fast network, but it's still much faster than a slow network! Does it need to be faster? How often are we actually bandwidth-constrained (versus being constrained by event loop/GIL/worker threads/things that can do something useful with that bandwidth)?

Yeah, I'll admit that a 2 GB/s communication bandwidth feels fast to me for data processing workloads. I'm fine leaving this one alone for a while :)

gjoseph92 commented 3 years ago

For the curious, here are py-spy profiles of the sending and receiving workers running locally (go to MainThread on both, left-heavy view):

On the receive side, 30% is actual socket reading in Tornado, 25% is making the read buffer in TCP comms, 25% idle. On the send side, 30% is Tornado socket writing, 42% is idle.

jakirkham commented 3 years ago

Yeah with the buffer allocation time, it is worth noting that builtin Python creation operations, like bytearray (though not limited to it), will zero initialize the memory. This takes additional time, which can make it quite slow. As these zeros ended up being overwritten this is also quite wasteful.

We could avoid this by using numpy.empty, which will not zero initialize the memory. We will eventually pay a small cost when writing to those pages, but expect this is smaller than Tornado's overhead. It would also bypass the cost of zero initializing them.

In [1]: import numpy

In [2]: %timeit bytearray(1_000_000)
18.3 µs ± 125 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [3]: %timeit numpy.empty((1_000_000,), dtype="u1")
630 ns ± 7.44 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

I've been thinking we would benefit here from collecting the routines for memory allocation somewhere and then using the most efficient ones when available ( https://github.com/dask/distributed/issues/3970 ).

gjoseph92 commented 3 years ago

Good to know. Not calloc-ing here definitely seems desirable.

I also just realized another discrepancy: iperf is using plain TCP, whereas my Coiled example was using TLS.

jakirkham commented 3 years ago

It goes a bit beyond not using calloc. NumPy will use Huge Pages on Linux if available at runtime ( https://github.com/numpy/numpy/pull/14216 ). Also NumPy has its own caching allocator. In the future NumPy will allow the underlying allocator to be configureable ( https://github.com/numpy/numpy/pull/17582 ). So expect there will be other gains by using NumPy for memory allocations here.

gjoseph92 commented 3 years ago

Running our shuffles, I'm seeing something that looks to me like SSL reads within Tornado could be blocking the event loop. But I haven't dug into Tornado much before so I might be understanding this incorrectly.

Here's a py-spy profile of one worker: https://www.speedscope.app/#profileURL=https%3a%2f%2fgistcdn.githack.com%2fgjoseph92%2fa32ffb1d9cf6248ed4fb56897d2b9208%2fraw%2f51e5bad24d3125ef7a861a33a15c91e12392c02e%2ftls-10_3_17_192-38675.json (go to MainThread, left-heavy view)

The worker should constantly be both reading and writing about the same amount of data. But in left-heavy view, you see ssl.read takes up 58% of time, but ssl.send takes 11%.

In tracing through the code from Tornado BaseIOStream.read_into, I noticed this comment in _read_to_buffer_loop:

while not self.closed():
    # Read from the socket until we get EWOULDBLOCK or equivalent.
    # SSL sockets do some internal buffering, and if the data is
    # sitting in the SSL object's buffer select() and friends
    # can't see it; the only way to find out if it's there is to
    # try to read it.
    if self._read_to_buffer() == 0:
        break

To me that sounds like the only way to know reading from SSL is done is to try reading and see what happens. But what happens when there isn't data sitting in the SSL object's buffer?

It seems like that will eventually call ssl.read() with however many bytes of self._read_buffer are remaining.

Is it possible that if the SSL buffer was empty, the ssl.read() could block on the socket until more data arrived?

Again I'm not very Tornado- or SSL-literate and this was a pretty cursory read-through, so this might be off-base.

jakirkham commented 3 years ago

FWICT this is the underlying read used by Python's SSLSocket. Maybe it will be clearer what is going on after looking at that

gjoseph92 commented 3 years ago

I took a py-spy profile of the same workload as above running on an equivalent cluster, but using TCP instead of TLS.

Anecdotally I noticed tasks just completed a lot faster, even though the dashboard reported similar average worker bandwidths. In the profile, the worker event loop only gets 15% idle time with TLS, vs 40% idle time with TCP. With TLS, ~60% of the event loop is spent on Tornado's read_from_fd; with TCP it's ~20%.

gjoseph92 commented 3 years ago

I ran a version of my script above on identical clusters using TCP vs TLS. This particular trial showed TCP ~1.5x faster; on others it's been up to 1.9x faster.

image image

Full data ``` (env) gabe dask-playground/shuffle-service » python network.py Using existing cluster: 'tls' AWS instance types: {'tls://10.6.25.185:37333': 't3a.small', 'tls://10.6.28.157:44541': 't3a.small'} send: tls://10.6.25.185:42549 recv: tls://10.6.28.157:37465 - performance over comms handler 124 B: p5 0.00sec, 49.08 kiB/sec 124 B: p50 0.00sec, 87.26 kiB/sec 124 B: p95 0.00sec, 106.21 kiB/sec send: tls://10.6.25.185:45677 recv: tls://10.6.28.157:37907 - performance over comms handler 7.94 kiB: p5 0.00sec, 3.05 MiB/sec 7.94 kiB: p50 0.00sec, 5.43 MiB/sec 7.94 kiB: p95 0.00sec, 6.56 MiB/sec send: tls://10.6.25.185:46735 recv: tls://10.6.28.157:45483 - performance over comms handler 93.88 kiB: p5 0.00sec, 31.51 MiB/sec 93.88 kiB: p50 0.00sec, 54.78 MiB/sec 93.88 kiB: p95 0.00sec, 66.58 MiB/sec send: tls://10.6.25.185:42835 recv: tls://10.6.28.157:41033 - performance over comms handler 1.00 MiB: p5 0.01sec, 162.17 MiB/sec 1.00 MiB: p50 0.00sec, 241.55 MiB/sec 1.00 MiB: p95 0.00sec, 278.84 MiB/sec send: tls://10.6.25.185:36965 recv: tls://10.6.28.157:44421 - performance over comms handler 3.00 MiB: p5 0.01sec, 236.60 MiB/sec 3.00 MiB: p50 0.01sec, 332.83 MiB/sec 3.00 MiB: p95 0.01sec, 379.63 MiB/sec send: tls://10.6.25.185:35735 recv: tls://10.6.28.157:34113 - performance over comms handler 9.99 MiB: p5 0.03sec, 298.00 MiB/sec 9.99 MiB: p50 0.02sec, 407.80 MiB/sec 9.99 MiB: p95 0.02sec, 458.55 MiB/sec send: tls://10.6.25.185:44853 recv: tls://10.6.28.157:36471 - performance over comms handler 100.00 MiB: p5 0.30sec, 331.10 MiB/sec 100.00 MiB: p50 0.28sec, 356.10 MiB/sec 100.00 MiB: p95 0.25sec, 394.01 MiB/sec send: tls://10.6.25.185:44913 recv: tls://10.6.28.157:36359 - performance over comms handler 500.00 MiB: p5 1.47sec, 339.68 MiB/sec 500.00 MiB: p50 1.36sec, 366.40 MiB/sec 500.00 MiB: p95 1.29sec, 387.31 MiB/sec (env) gabe dask-playground/shuffle-service » python network.py Using existing cluster: 'tcp' AWS instance types: {'tcp://10.6.23.136:43093': 't3a.small', 'tcp://10.6.29.243:46371': 't3a.small'} send: tcp://10.6.23.136:42379 recv: tcp://10.6.29.243:37361 - performance over comms handler 124 B: p5 0.00sec, 34.54 kiB/sec 124 B: p50 0.00sec, 69.10 kiB/sec 124 B: p95 0.00sec, 93.15 kiB/sec send: tcp://10.6.23.136:42497 recv: tcp://10.6.29.243:36421 - performance over comms handler 7.94 kiB: p5 0.00sec, 2.42 MiB/sec 7.94 kiB: p50 0.00sec, 4.58 MiB/sec 7.94 kiB: p95 0.00sec, 5.94 MiB/sec send: tcp://10.6.23.136:44721 recv: tcp://10.6.29.243:37403 - performance over comms handler 93.88 kiB: p5 0.00sec, 22.87 MiB/sec 93.88 kiB: p50 0.00sec, 45.35 MiB/sec 93.88 kiB: p95 0.00sec, 60.30 MiB/sec send: tcp://10.6.23.136:42129 recv: tcp://10.6.29.243:35227 - performance over comms handler 1.00 MiB: p5 0.01sec, 155.52 MiB/sec 1.00 MiB: p50 0.00sec, 290.19 MiB/sec 1.00 MiB: p95 0.00sec, 392.86 MiB/sec send: tcp://10.6.23.136:41833 recv: tcp://10.6.29.243:40797 - performance over comms handler 3.00 MiB: p5 0.01sec, 329.73 MiB/sec 3.00 MiB: p50 0.01sec, 466.97 MiB/sec 3.00 MiB: p95 0.01sec, 517.41 MiB/sec send: tcp://10.6.23.136:43359 recv: tcp://10.6.29.243:33185 - performance over comms handler 9.99 MiB: p5 0.02sec, 461.39 MiB/sec 9.99 MiB: p50 0.02sec, 556.19 MiB/sec 9.99 MiB: p95 0.02sec, 568.35 MiB/sec send: tcp://10.6.23.136:38937 recv: tcp://10.6.29.243:37997 - performance over comms handler 100.00 MiB: p5 0.24sec, 416.11 MiB/sec 100.00 MiB: p50 0.21sec, 467.91 MiB/sec 100.00 MiB: p95 0.21sec, 478.96 MiB/sec send: tcp://10.6.23.136:45309 recv: tcp://10.6.29.243:46773 - performance over comms handler 500.00 MiB: p5 1.27sec, 395.25 MiB/sec 500.00 MiB: p50 1.09sec, 459.01 MiB/sec 500.00 MiB: p95 1.07sec, 465.86 MiB/sec ```
jakirkham commented 3 years ago

Since we are comparing low-level sends, maybe it is worth running some benchmarks with ery as well

gjoseph92 commented 3 years ago

@quasiben I know you and others have looked at cloud network performance a bit already. Is there anyone who might be interested in looking at these performance issues so we don't repeat work?

@jakirkham from reading the SSLSocket read implementation and OpenSSL docs, I'm now not so sure of my theory. Assuming the socket is non-blocking (it is, right?), the SSL_read should return and set an SSL_ERROR_WANT_READ error, according to the docs. _ssl__SSLSocket_read_impl handles that, stops trying to read, and raises a Python exception with that code. Tornado's SSLIOStream catches that SSL_ERROR_WANT_READ error specifically, which ends the read_to_buffer_loop.

A different interesting thing from the OpenSSL docs though:

As at any time it's possible that non-application data needs to be sent, a read function can also cause write operations.

I don't know OpenSSL very well. Would an SSL write on the same socket through the same SSL object count as non-application data? Since this shuffle workload has each worker communicating with all others, there's a lot of concurrent sending and receiving. If reads could somehow be blocking on the writes, that might explain why we see so much event loop blocking in reads.

Maybe a good next step would be to have this test script to all-to-all transfers, instead of A->B. Also, I don't think I was py-spying in native mode; I'll try that and see if we can get traces for what's happening inside of read.

gjoseph92 commented 3 years ago

Got a native profile from py-spy. First I just must say @benfred, so many thanks for this great tool—being able to get native profiles integrated with the Python callstack with a single flag and no mucking around in perf is amazing!

Worker profile (see MainThread in left-heavy view):

Screen Shot 2021-09-07 at 11 22 41 PM

So we can immediately see that 71% of the event loop's time is spent blocking on glibc pthread_cond_timedwait, invoked within CPython's SSL socket read implementation. This does not seem like what we want for non-blocking reads!

One odd thing is that py-spy's line numbers in _ssl.c seem to be off-by-one what I'm seeing for the CPython 3.9.1 source on GitHub (@benfred you might be interested in this) (I'm running a conda build of Python on this cluster FYI). If you look at the SSL_read call, py-spy says it's called on _ssl.c:2588, which is one line below the SSL_read on GitHub. If you look at the part we're really interested in (the condition wait), py-spy says _ssl.c:2590. That line's just a struct assignment, but the line one above is PySSL_END_ALLOW_THREADS. We can see that macro calls PyEval_RestoreThread which calls take_gil.

So the problem is the classic https://bugs.python.org/issue7946: contention re-acquiring the GIL when returning from a socket.send in a multithreaded application. This has already been discussed in https://github.com/dask/distributed/issues/4443#issuecomment-764522201.

But that thread was about the scheduler. It's interesting to see how much this affects workers' performance transferring data. We have much more control to optimize the scheduler; when thinking about networking code, since workers are running user code, we should just assume there could always be high GIL contention.

jakirkham commented 3 years ago

Yeah was looking at the GIL release/acquisition lines earlier and was wondering if that was involved somehow.

That said, how much data are we receiving here? Currently we are receiving at most 2GB per read ( due to an OpenSSL 1.0.2 bug; see https://bugs.python.org/issue42853 ). Are we filling more than 2GB and if so how much more?

gjoseph92 commented 3 years ago

Yeah I'm curious if releasing the GIL could actually be skipped for non-blocking sockets.

I think we're definitely getting less than 2GB per read but I can confirm later. Our sends aim to be ~5MB MB for memory reasons.

mrocklin commented 3 years ago

Yeah I'm curious if releasing the GIL could actually be skipped for non-blocking sockets.

How hard would this be to try? Is this a small change in cPython followed by a rebuild?

gjoseph92 commented 3 years ago

I think the issue is just that network IO is much more serial than we want it to be.

You can see in the same py-spy profile in Dask-DefaultThreads-62-4 and -2, these threads are blocked 80% of the time in our maybe_pack function acquiring a semaphore.maybe_pack is a function that processes some DataFrames, serializes them, and puts them on a queue for a pool of coroutines to send over the comm. When a coroutine has sent an item, it releases the semaphore. (So this is basically just a bounded queue, just with the blocking points shifted a little for memory reasons.)

This tells me that we can't write data out fast enough to the network. But looking at the event loop, we saw writes only take 7% of the time. The issue is that the sending coroutines don't get scheduled on the event loop often enough, because the read coroutines spend so much time blocking the event loop trying to re-acquire the GIL.

mrocklin commented 3 years ago

Do we know why the GIL would be hard to reacquire here?

On Wed, Sep 8, 2021 at 10:44 AM Gabe Joseph @.***> wrote:

I think the issue is just that network IO is much more serial than we want it to be.

You can see in the same py-spy profile https://speedscope.app/#profileURL=https%3a%2f%2fgistcdn.githack.com%2fgjoseph92%2f65b80849d5a7b3a39ee34c92b603f287%2fraw%2f8634c96a13c3026bc1c6dc4444741def1802d171%2ftls-10_6_23_69-41859.json in Dask-DefaultThreads-62-4 and -2, these threads are blocked 80% of the time in our maybe_pack function acquiring a semaphore.maybe_pack is a function that processes some DataFrames, serializes them, and puts them on a queue for a pool of coroutines to send over the comm. When a coroutine has sent an item, it releases the semaphore. (So this is basically just a bounded queue, just with the blocking points shifted a little for memory reasons.)

This tells me that we can't write data out fast enough to the network. But looking at the event loop, we saw writes only take 7% of the time. The issue is that the sending coroutines don't get scheduled on the event loop often enough, because the read coroutines spend so much time blocking the event loop trying to re-acquire the GIL.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/5258#issuecomment-915353531, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGK2YIYLAGU3OP3YQTUA6AF7ANCNFSM5CV4H4AQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

gjoseph92 commented 3 years ago

How hard would this be to try? Is this a small change in cPython followed by a rebuild?

I think so? Alternatively if we could get _ssl_locks_count (set here) to be 0 then it wouldn't release the GIL. I'm not sure of the consequences of letting SSL use threads but not running PyEval_SaveThread. That's probably bad. Maybe we could make versions of PyEval_SaveThread/RestoreThread that hold onto the GIL though.

Do we know why the GIL would be hard to reacquire here?

Probably just because our worker threads are mostly doing GIL-holding things when they're not blocked?

jakirkham commented 3 years ago

A much simpler test would be to turn SSL off. Have we already done this somewhere (guessing so)?

gjoseph92 commented 3 years ago

See the profile in https://github.com/dask/distributed/issues/5258#issuecomment-907388031.

I spent far too much time trying to get an equivalent profile with --native on TCP last night, but the workload kept running out of memory and crashing (for somewhat unrelated reasons https://github.com/dask/distributed/issues/5250#issuecomment-914913489).

Also TCP sockets still release and retake the GIL on every send, so is that the test we'd want?

jakirkham commented 3 years ago

Just to add to what Gabe said above on this.

Do we know why the GIL would be hard to reacquire here?

I think this is the same answer that Antione gave related to socket.send ( https://github.com/dask/distributed/issues/4443#issuecomment-764735278 ) as the problem is the same. Quoting the relevant bit below

acquire the GIL (slow if need to wait for some other thread to release it!)

So presumably something in Python picks up the slack while OpenSSL is doing work. However once OpenSSL is ready to rejoin it has to wait until there is a moment where it can reacquire the GIL

jcrist commented 3 years ago

How hard would this be to try? Is this a small change in cPython followed by a rebuild?

My gut says this would be fairly quick to hack badly into cpython to see if things improve (just stop dropping the gil for all socket calls). A proper patch would interact more with the asyncio eventloop implementation (asyncio or uvloop) to only keep the gil for async socket operations.

gjoseph92 commented 3 years ago

@jcrist agreed. Building cpython (and getting it into a docker image if we wanted to test on our real shuffle workload) would be the slowest part. I would be very interested to see the results of this!

jakirkham commented 2 years ago

Have we looked at this again in light of the addition of asyncio comms ( https://github.com/dask/distributed/pull/5450 )?

Also put together PR ( https://github.com/dask/distributed/pull/5750 ), which optionally uses NumPy to allocate frames.

jakirkham commented 2 years ago

Saw PR ( https://github.com/python/cpython/pull/31492 ) recently, which looks potentially interesting in this context