sgkit-dev / sgkit

Scalable genetics toolkit
https://sgkit-dev.github.io/sgkit
Apache License 2.0
226 stars 32 forks source link

Genetics data IO performance stats/doc #437

Open ravwojdyla opened 3 years ago

ravwojdyla commented 3 years ago

This is a dump of some of the performance experiments. It's part of a larger issue of performance setup and best practices for dask/sgkit and genetic data. The goal is to share the findings and continue the discussion.

Where not otherwise stated, the test machine is a GCE VM, 16 cores and 64GB of memory, 400 SPD. Dask cluster is a single node process based. If the data is read from GCS, the bucket is in the same region as the VM:

Specs/libs ```bash ➜ ~ uname -a Linux rav-dev 4.19.0-13-cloud-amd64 #1 SMP Debian 4.19.160-2 (2020-11-28) x86_64 GNU/Linux ➜ ~ conda list # Name Version Build Channel _libgcc_mutex 0.1 conda_forge conda-forge _openmp_mutex 4.5 1_gnu conda-forge aiohttp 3.7.2 py38h1e0a361_0 conda-forge appdirs 1.4.4 pypi_0 pypi argon2-cffi 20.1.0 py38h1e0a361_2 conda-forge asciitree 0.3.3 pypi_0 pypi async-timeout 3.0.1 py_1000 conda-forge async_generator 1.10 py_0 conda-forge atk 2.36.0 ha770c72_4 conda-forge atk-1.0 2.36.0 h0d5b62e_4 conda-forge attrs 20.2.0 pyh9f0ad1d_0 conda-forge backcall 0.2.0 pyh9f0ad1d_0 conda-forge backports 1.0 py_2 conda-forge backports.functools_lru_cache 1.6.1 py_0 conda-forge bleach 3.2.1 pyh9f0ad1d_0 conda-forge blinker 1.4 py_1 conda-forge bokeh 2.2.3 py38h578d9bd_0 conda-forge brotlipy 0.7.0 py38h8df0ef7_1001 conda-forge ca-certificates 2020.11.8 ha878542_0 conda-forge cachetools 4.1.1 py_0 conda-forge cairo 1.16.0 h488836b_1006 conda-forge cbgen 0.1.6 pypi_0 pypi certifi 2020.11.8 py38h578d9bd_0 conda-forge cffi 1.14.3 py38h1bdcb99_1 conda-forge chardet 3.0.4 py38h924ce5b_1008 conda-forge click 7.1.2 pyh9f0ad1d_0 conda-forge cloudpickle 1.6.0 py_0 conda-forge cryptography 3.2.1 py38h7699a38_0 conda-forge cython 0.29.21 pypi_0 pypi cytoolz 0.11.0 py38h25fe258_1 conda-forge dask 2.30.0 py_0 conda-forge dask-core 2.30.0 py_0 conda-forge dask-glm 0.2.0 pypi_0 pypi dask-ml 1.7.0 pypi_0 pypi decorator 4.4.2 py_0 conda-forge defusedxml 0.6.0 py_0 conda-forge distributed 2.30.0 pypi_0 pypi entrypoints 0.3 py38h32f6830_1002 conda-forge expat 2.2.9 he1b5a44_2 conda-forge fasteners 0.15 pypi_0 pypi font-ttf-dejavu-sans-mono 2.37 hab24e00_0 conda-forge font-ttf-inconsolata 2.001 hab24e00_0 conda-forge font-ttf-source-code-pro 2.030 hab24e00_0 conda-forge font-ttf-ubuntu 0.83 hab24e00_0 conda-forge fontconfig 2.13.1 h7e3eb15_1002 conda-forge fonts-conda-ecosystem 1 0 conda-forge fonts-conda-forge 1 0 conda-forge freetype 2.10.4 h7ca028e_0 conda-forge fribidi 1.0.10 h36c2ea0_0 conda-forge fsspec 0.8.4 py_0 conda-forge gcsfs 0.7.1 py_0 conda-forge gdk-pixbuf 2.42.0 h0536704_0 conda-forge gettext 0.19.8.1 hf34092f_1004 conda-forge giflib 5.2.1 h36c2ea0_2 conda-forge gil-load 0.4.0 pypi_0 pypi glib 2.66.3 h58526e2_0 conda-forge gobject-introspection 1.66.1 py38h4eacb9c_3 conda-forge google-auth 1.23.0 pyhd8ed1ab_0 conda-forge google-auth-oauthlib 0.4.2 pyhd8ed1ab_0 conda-forge graphite2 1.3.13 h58526e2_1001 conda-forge graphviz 2.42.3 h6939c30_2 conda-forge gtk2 2.24.32 h194ddfc_3 conda-forge gts 0.7.6 h17b2bb4_1 conda-forge harfbuzz 2.7.2 ha5b49bf_1 conda-forge heapdict 1.0.1 py_0 conda-forge icu 67.1 he1b5a44_0 conda-forge idna 2.10 pyh9f0ad1d_0 conda-forge importlib-metadata 2.0.0 py_1 conda-forge importlib_metadata 2.0.0 1 conda-forge iniconfig 1.1.1 pypi_0 pypi ipykernel 5.3.4 py38h1cdfbd6_1 conda-forge ipython 7.19.0 py38h81c977d_0 conda-forge ipython_genutils 0.2.0 py_1 conda-forge ipywidgets 7.5.1 pypi_0 pypi jedi 0.17.2 py38h32f6830_1 conda-forge jinja2 2.11.2 pyh9f0ad1d_0 conda-forge joblib 0.17.0 pypi_0 pypi jpeg 9d h36c2ea0_0 conda-forge json5 0.9.5 pyh9f0ad1d_0 conda-forge jsonschema 3.2.0 py_2 conda-forge jupyter-server-proxy 1.5.0 py_0 conda-forge jupyter_client 6.1.7 py_0 conda-forge jupyter_core 4.6.3 py38h32f6830_2 conda-forge jupyterlab 2.2.9 py_0 conda-forge jupyterlab_pygments 0.1.2 pyh9f0ad1d_0 conda-forge jupyterlab_server 1.2.0 py_0 conda-forge lcms2 2.11 hcbb858e_1 conda-forge ld_impl_linux-64 2.35 h769bd43_9 conda-forge libblas 3.9.0 3_openblas conda-forge libcblas 3.9.0 3_openblas conda-forge libffi 3.2.1 he1b5a44_1007 conda-forge libgcc-ng 9.3.0 h5dbcf3e_17 conda-forge libgfortran-ng 9.3.0 he4bcb1c_17 conda-forge libgfortran5 9.3.0 he4bcb1c_17 conda-forge libglib 2.66.3 hbe7bbb4_0 conda-forge libgomp 9.3.0 h5dbcf3e_17 conda-forge libiconv 1.16 h516909a_0 conda-forge liblapack 3.9.0 3_openblas conda-forge libopenblas 0.3.12 pthreads_h4812303_1 conda-forge libpng 1.6.37 h21135ba_2 conda-forge libsodium 1.0.18 h516909a_1 conda-forge libstdcxx-ng 9.3.0 h2ae2ef3_17 conda-forge libtiff 4.1.0 h4f3a223_6 conda-forge libtool 2.4.6 h58526e2_1007 conda-forge libuuid 2.32.1 h14c3975_1000 conda-forge libwebp 1.1.0 h76fa15c_4 conda-forge libwebp-base 1.1.0 h36c2ea0_3 conda-forge libxcb 1.13 h14c3975_1002 conda-forge libxml2 2.9.10 h68273f3_2 conda-forge llvmlite 0.34.0 pypi_0 pypi locket 0.2.0 py_2 conda-forge lz4-c 1.9.2 he1b5a44_3 conda-forge markupsafe 1.1.1 py38h8df0ef7_2 conda-forge mistune 0.8.4 py38h1e0a361_1002 conda-forge monotonic 1.5 pypi_0 pypi msgpack-python 1.0.0 py38h82cb98a_2 conda-forge multidict 4.7.5 py38h1e0a361_2 conda-forge multipledispatch 0.6.0 pypi_0 pypi nbclient 0.5.1 py_0 conda-forge nbconvert 6.0.7 py38h32f6830_2 conda-forge nbformat 5.0.8 py_0 conda-forge ncurses 6.2 he1b5a44_2 conda-forge nest-asyncio 1.4.1 py_0 conda-forge notebook 6.1.4 py38h32f6830_1 conda-forge numba 0.51.2 pypi_0 pypi numcodecs 0.7.2 pypi_0 pypi numpy 1.19.3 pypi_0 pypi oauthlib 3.0.1 py_0 conda-forge olefile 0.46 pyh9f0ad1d_1 conda-forge openssl 1.1.1h h516909a_0 conda-forge packaging 20.4 pyh9f0ad1d_0 conda-forge pandas 1.1.4 pypi_0 pypi pandoc 2.11.0.4 hd18ef5c_0 conda-forge pandocfilters 1.4.2 py_1 conda-forge pango 1.42.4 h69149e4_5 conda-forge parso 0.7.1 pyh9f0ad1d_0 conda-forge partd 1.1.0 py_0 conda-forge pcre 8.44 he1b5a44_0 conda-forge pexpect 4.8.0 pyh9f0ad1d_2 conda-forge pickleshare 0.7.5 py_1003 conda-forge pillow 8.0.1 py38h70fbd49_0 conda-forge pip 20.2.4 py_0 conda-forge pixman 0.38.0 h516909a_1003 conda-forge pluggy 0.13.1 pypi_0 pypi pooch 1.2.0 pypi_0 pypi prometheus_client 0.8.0 pyh9f0ad1d_0 conda-forge prompt-toolkit 3.0.8 py_0 conda-forge psutil 5.7.3 py38h8df0ef7_0 conda-forge pthread-stubs 0.4 h14c3975_1001 conda-forge ptyprocess 0.6.0 py_1001 conda-forge py 1.9.0 pypi_0 pypi py-spy 0.3.3 pypi_0 pypi pyasn1 0.4.8 py_0 conda-forge pyasn1-modules 0.2.7 py_0 conda-forge pycparser 2.20 pyh9f0ad1d_2 conda-forge pygments 2.7.2 py_0 conda-forge pyjwt 1.7.1 py_0 conda-forge pyopenssl 19.1.0 py_1 conda-forge pyparsing 2.4.7 pyh9f0ad1d_0 conda-forge pyrsistent 0.17.3 py38h1e0a361_1 conda-forge pysocks 1.7.1 py38h924ce5b_2 conda-forge pytest 6.1.2 pypi_0 pypi python 3.8.6 h852b56e_0_cpython conda-forge python-dateutil 2.8.1 py_0 conda-forge python-graphviz 0.15 pypi_0 pypi python_abi 3.8 1_cp38 conda-forge pytz 2020.1 pypi_0 pypi pyyaml 5.3.1 pypi_0 pypi pyzmq 19.0.2 py38ha71036d_2 conda-forge readline 8.0 he28a2e2_2 conda-forge rechunker 0.3.1 pypi_0 pypi requests 2.24.0 pyh9f0ad1d_0 conda-forge requests-oauthlib 1.3.0 pyh9f0ad1d_0 conda-forge rsa 4.6 pyh9f0ad1d_0 conda-forge scikit-learn 0.23.2 pypi_0 pypi scipy 1.5.3 pypi_0 pypi send2trash 1.5.0 py_0 conda-forge setuptools 49.6.0 py38h924ce5b_2 conda-forge sgkit 0.1.dev290+gb81de07 pypi_0 pypi simpervisor 0.3 py_1 conda-forge six 1.15.0 pyh9f0ad1d_0 conda-forge sortedcontainers 2.2.2 pypi_0 pypi sqlite 3.33.0 h4cf870e_1 conda-forge tblib 1.7.0 pypi_0 pypi terminado 0.9.1 py38h32f6830_1 conda-forge testpath 0.4.4 py_0 conda-forge threadpoolctl 2.1.0 pypi_0 pypi tk 8.6.10 hed695b0_1 conda-forge toml 0.10.2 pypi_0 pypi toolz 0.11.1 py_0 conda-forge tornado 6.1 py38h25fe258_0 conda-forge traitlets 5.0.5 py_0 conda-forge typing-extensions 3.7.4.3 0 conda-forge typing_extensions 3.7.4.3 py_0 conda-forge urllib3 1.25.11 py_0 conda-forge wcwidth 0.2.5 pyh9f0ad1d_2 conda-forge webencodings 0.5.1 py_1 conda-forge wheel 0.35.1 pyh9f0ad1d_0 conda-forge widgetsnbextension 3.5.1 pypi_0 pypi xarray 0.16.1 pypi_0 pypi xorg-kbproto 1.0.7 h14c3975_1002 conda-forge xorg-libice 1.0.10 h516909a_0 conda-forge xorg-libsm 1.2.3 h84519dc_1000 conda-forge xorg-libx11 1.6.12 h516909a_0 conda-forge xorg-libxau 1.0.9 h14c3975_0 conda-forge xorg-libxdmcp 1.1.3 h516909a_0 conda-forge xorg-libxext 1.3.4 h516909a_0 conda-forge xorg-libxpm 3.5.13 h516909a_0 conda-forge xorg-libxrender 0.9.10 h516909a_1002 conda-forge xorg-libxt 1.1.5 h516909a_1003 conda-forge xorg-renderproto 0.11.1 h14c3975_1002 conda-forge xorg-xextproto 7.3.0 h14c3975_1002 conda-forge xorg-xproto 7.0.31 h14c3975_1007 conda-forge xz 5.2.5 h516909a_1 conda-forge yaml 0.2.5 h516909a_0 conda-forge yarl 1.6.2 py38h1e0a361_0 conda-forge zarr 2.5.0 pypi_0 pypi zeromq 4.3.3 he1b5a44_2 conda-forge zict 2.0.0 pypi_0 pypi zipp 3.4.0 py_0 conda-forge zlib 1.2.11 h516909a_1010 conda-forge ```

The issue with suboptimal saturation was originally reported for this code:

import fsspec
import xarray as xr
from sgkit.io.bgen.bgen_reader import unpack_variables
from dask.diagnostics import ProgressBar, ResourceProfiler, Profiler

path = "gs://foobar/data.zarr"
store = fsspec.mapping.get_mapper(path, check=False, create=False)
ds = xr.open_zarr(store, concat_characters=False, consolidated=False)
ds = unpack_variables(ds, dtype='float16')

ds["variant_dosage_std"] = ds["call_dosage"].astype("float32").std(dim="samples")
with ProgressBar(), Profiler() as prof, ResourceProfiler() as rprof:
    ds['variant_dosage_std'] = ds['variant_dosage_std'].compute()

With local input, performance graph:

bokeh_plot (1)

It's pretty clear the cores are well saturated. I also measure GIL, GIL was held for 13% of time and waited on for 2.1%, with each worker thread (16 threads) holding it for 0.7% and waiting for 0.1% of time.

For GCS input (via fsspec):

bokeh_plot (2)

GIL summary: GIL was held for 18% of time and waited on for 3.8%, with each worker thread (16 threads) holding it for 0.6% and waiting for 0.2% of time, with one thread holding GIL for 6.5% and waiting 1.6% time.

``` held: 0.186 (0.191, 0.187, 0.186) wait: 0.038 (0.046, 0.041, 0.039) <140287451305792> held: 0.015 (0.029, 0.017, 0.015) wait: 0.002 (0.002, 0.002, 0.002) <140284185433856> held: 0.065 (0.061, 0.064, 0.065) wait: 0.016 (0.015, 0.017, 0.016) <140284540389120> held: 0.0 (0.0, 0.0, 0.0) wait: 0.0 (0.0, 0.0, 0.0) <140284590728960> held: 0.006 (0.006, 0.006, 0.006) wait: 0.002 (0.002, 0.002, 0.002) <140284599121664> held: 0.006 (0.006, 0.006, 0.006) wait: 0.002 (0.002, 0.002, 0.001) <140284759570176> held: 0.006 (0.008, 0.007, 0.007) wait: 0.002 (0.001, 0.001, 0.002) <140284751177472> held: 0.006 (0.006, 0.006, 0.006) wait: 0.002 (0.001, 0.001, 0.002) <140283956950784> held: 0.006 (0.006, 0.006, 0.006) wait: 0.001 (0.001, 0.001, 0.001) <140283948558080> held: 0.006 (0.006, 0.006, 0.006) wait: 0.001 (0.001, 0.001, 0.001) <140283940165376> held: 0.006 (0.006, 0.006, 0.006) wait: 0.002 (0.002, 0.002, 0.002) <140283931772672> held: 0.006 (0.006, 0.006, 0.006) wait: 0.002 (0.001, 0.002, 0.002) <140283923379968> held: 0.006 (0.006, 0.006, 0.006) wait: 0.002 (0.001, 0.002, 0.002) <140283914987264> held: 0.006 (0.007, 0.007, 0.007) wait: 0.002 (0.001, 0.002, 0.002) <140283295561472> held: 0.006 (0.006, 0.006, 0.006) wait: 0.002 (0.002, 0.002, 0.002) <140283287168768> held: 0.006 (0.006, 0.006, 0.006) wait: 0.002 (0.002, 0.002, 0.002) <140283278776064> held: 0.006 (0.006, 0.007, 0.006) wait: 0.002 (0.002, 0.002, 0.002) <140283270383360> held: 0.006 (0.006, 0.006, 0.006) wait: 0.001 (0.001, 0.001, 0.001) <140283261990656> held: 0.006 (0.006, 0.006, 0.006) wait: 0.001 (0.002, 0.002, 0.001) <140283253597952> held: 0.006 (0.006, 0.006, 0.006) wait: 0.002 (0.001, 0.001, 0.001) <140283245205248> held: 0.0 (0.0, 0.0, 0.0) wait: 0.0 (0.0, 0.0, 0.0) <140282691581696> held: 0.001 (0.0, 0.001, 0.001) wait: 0.001 (0.001, 0.001, 0.001) <140282683188992> held: 0.002 (0.002, 0.002, 0.002) wait: 0.001 (0.001, 0.001, 0.001) <140282674796288> held: 0.001 (0.001, 0.001, 0.001) wait: 0.003 (0.012, 0.004, 0.003) ```

It's clear that the CPU usage is lower, and not fully saturated, GIL wait time is a bit up (with a concerning spike in one thread). With remote/fsspec input, we have the overhead of data decryption and potential network IO overhead (tho it doesn't seem like we hit network limits).

ravwojdyla commented 3 years ago

Some doc for fsspec:

This is a partial output of the method profile on the code above (https://github.com/pystatgen/sgkit/issues/437#issue-784385179):

  %Own   %Total  OwnTime  TotalTime  Function (filename:line)
 40.00%  40.00%    3811s     3811s   npy_floatbits_to_halfbits (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%    2593s     5033s   npy_half_to_float (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%   0.00%    2269s     2269s   npy_halfbits_to_floatbits (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%    1825s     5854s   HALF_add (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 40.00%  80.00%    1770s     3848s   _aligned_contig_cast_float_to_half (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%  20.00%    1720s     5337s   HALF_multiply (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%    1393s     1413s   PyArray_Where (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%   847.4s     1521s   _aligned_contig_cast_half_to_float (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%   0.00%   844.4s    844.4s   0x7fe616f5fdf0 (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%   0.00%   757.2s    757.2s   BOOL_logical_or (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
 20.00%  20.00%   752.4s    752.4s   DOUBLE_subtract (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)
  0.00%   0.00%   687.0s    687.0s   _aligned_contig_cast_ubyte_to_float (numpy/core/_multiarray_umath.cpython-38-x86_64-linux-gnu.so)

Unsurprising we get numpy methods, what is interesting and can't be seen here, are the sporadic spikes in CPU usage, looking at the network IO:

11:42:17        IFACE   rxpck/s   txpck/s    rxkB/s    txkB/s   rxcmp/s   txcmp/s  rxmcst/s   %ifutil
11:42:18         ens4     64.00     64.00     11.58     20.74      0.00      0.00      0.00      0.00
11:42:19         ens4    397.00     70.00   4581.27     14.38      0.00      0.00      0.00      0.00
11:42:20         ens4    146.00     67.00   1501.01     15.61      0.00      0.00      0.00      0.00
11:42:21         ens4    276.00    103.00   2982.42     27.17      0.00      0.00      0.00      0.00
11:42:22         ens4    235.00     91.00   4396.02     19.31      0.00      0.00      0.00      0.00
11:42:23         ens4     43.00     43.00      6.50     12.35      0.00      0.00      0.00      0.00

There are also spikes of network IO, which could suggest that we should look into smoothing out the network buffering. This reminded me about some stats I once stumbled upon in Beam codebase (here):

# This is the size of each partial-file read operation from GCS.  This
# parameter was chosen to give good throughput while keeping memory usage at
# a reasonable level; the following table shows throughput reached when
# reading files of a given size with a chosen buffer size and informed the
# choice of the value, as of 11/2016:
#
# +---------------+------------+-------------+-------------+-------------+
# |               | 50 MB file | 100 MB file | 200 MB file | 400 MB file |
# +---------------+------------+-------------+-------------+-------------+
# | 8 MB buffer   | 17.12 MB/s | 22.67 MB/s  | 23.81 MB/s  | 26.05 MB/s  |
# | 16 MB buffer  | 24.21 MB/s | 42.70 MB/s  | 42.89 MB/s  | 46.92 MB/s  |
# | 32 MB buffer  | 28.53 MB/s | 48.08 MB/s  | 54.30 MB/s  | 54.65 MB/s  |
# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s  | 79.13 MB/s  | 85.39 MB/s  |
# +---------------+------------+-------------+-------------+-------------+
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024

So Beam is using 16MiB read buffer. But it's also using Google's storage API client (python-storage), whilst fsspec calls GCS API directly via fsspec implementation of GCS gcsfs and there definitely are implementation differences between those two and thus the consequences of buffer size might be different.

ravwojdyla commented 3 years ago

In the original issue: I have tried fsspec block size of 16MiB, but the results look very similar to the 5MiB block size, which was somewhat suspicious, and turns out that the zarr data ("gs://foobar/data.zarr") is saved as a large number of small files, for example for the call_genotype_probability, there are 20570 chunks/objects with average object size of about 2MiB. This might be suboptimal for sequential reads.

As a general rule, when you download large objects within a Region from Amazon S3 to Amazon EC2, we suggest making concurrent requests for byte ranges of an object at the granularity of 8–16 MB.

From S3 best practices.

The Pangeo project has found that chunk sizes in the range of 10–200MB work well with Dask reading from Zarr data in cloud storage

From cloud performant reading of netcdf4 hdf5 data using the zarr library.

Interesting/related issue "File Chunk Store" zarr-python#556.

To validate the impact of the chunk size, let's rechunked the call_genotype arrays, specifically call_genotype_probability array from 20570 chunks (chunk size: (5216, 5792, 2), avg. GCS object size 2MiB, average in memory chunk size: 230MiB) to 2349 chunks (chunk size: (15648, 17376, 2), avg. GCS object size 17MiB, average in memory chunk size: 2GiB):

Performance graph:

bokeh_plot (3)

There is a cost to it tho, visible in the memory usage, notice that compressed chunk of 17MiB translates to 2GiB in memory (vs 2MiB -> 230MiB), this bumps the memory usage roughly from 6GB to 40GB.

We could look into investigating the cost of reading zarr data in smaller chunks (then it was written as). This is a profile of the computation with zarr chunks (15648, 17376, 2) read as (5216, 5792, 2) via xarray.open_zarr:

bokeh_plot (4)

Notice:

Say we have a random float zarr array shape (10_000, 100_000) saved on local disk, with chunk shape (10_000, 10_000), thus 10 chunks of size ~660MiB on disk (compressed).

We retrieve a single element:

zs = zarr.open("/tmp/zarr", mode="r")
print(zs.foo[0, 0])

Zarr will:

So overall the lesson from this exercise is that - slicing zarr in a way that doesn't align with the native chunks is inefficient as a common step in pipelines.

Zarr issues for partial chunk reads:

Highlights:

some compressors also support partial decompression which would allow for extracting out part of a compressed chunk (e.g. the blosc_getitem method in Blosc)

ravwojdyla commented 3 years ago

Some experiments that incorporate xarray + zarr + dask:

TLDR

Long story

Say we have a dataset:

fsmap = fsspec.get_mapper("/tmp/zarr")

ar = np.random.random((10_000, 100_000))
xar = xr.Dataset(data_vars={"foo": (("x", "y"), ar)})
xar.foo.encoding["chunks"] = (10_000, 10_000)

xar.to_zarr(fsmap)

This gives us a zarr store with one 2d array stored as 10 chunks (10_000, 10_000). Now, say we read this back via:

xar_back = xr.open_zarr(fsmap)

Dask graph looks like this:

Screenshot 2020-11-23 at 13 01 02

and an example of one zarr task definition:

('zarr-9c3a2bfde447a3cde69b39f8c28181f9', 0, 0):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   'zarr-9c3a2bfde447a3cde69b39f8c28181f9',
   (slice(0, 10000, None), slice(0, 10000, None)))

Notes:

The question now is - how can we force async zarr multiget via dask.

  1. If we follow up xr.open_zarr(fsmap) with a rechunk, like this:
xar_back = xr.open_zarr(fsmap).chunk({"y": 20_000, "x": 10_000})

This won't work:

Screenshot 2020-11-23 at 13 09 34

Notes:

  1. But if we specify chunk at read time, like this:
xar_back = xr.open_zarr(fsmap, chunks={"y": 20_000, "x": 10_000})

we get:

Screenshot 2020-11-23 at 13 11 37

and an example of one zarr task definition:

('zarr-351e7d43530c734ce1fd649b79f9fc59', 0,0):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   'zarr-351e7d43530c734ce1fd649b79f9fc59',
   (slice(0, 10000, None), slice(0, 20000, None)))

Notes:

Another interesting question is: how would dask react to chunking that doesn't align with zarr native chunks:

xar_back = xr.open_zarr(fsmap, chunks={"y": 12_500, "x": 10_000})

Notice that y now has 2_500 more elements than the chunk, so based on the comments above, dask will likely use slicing to request chunks, which in turn would delegate that to zarr slicing, and zarr would have to read 2 chunks for each dask task.

When we run the code above, xarray tells us:

/Users/rav/miniconda3/envs/tr-dev/lib/python3.8/site-packages/xarray/backends/zarr.py:695: UserWarning: Specified Dask chunks 12500 would separate Zarr chunk shape 10000 for dimension 'y'. This significantly degrades performance. Consider rechunking after loading instead. chunk_spec = get_chunk(name, var, chunks)

Let's see the graph:

Screenshot 2020-11-23 at 13 21 27

and an example of two zarr task definitions:

('zarr-c5cbd628b3db23f6ff80870f5d0d52ec', 0, 0):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   'zarr-c5cbd628b3db23f6ff80870f5d0d52ec',
   (slice(0, 10000, None), slice(0, 12500, None))),

  ('zarr-c5cbd628b3db23f6ff80870f5d0d52ec', 0, 1):
   (<function dask.array.core.getter(a, b, asarray=True, lock=None)>,
   'zarr-c5cbd628b3db23f6ff80870f5d0d52ec',
   (slice(0, 10000, None), slice(12500, 25000, None)))

Notes:

ravwojdyla commented 3 years ago

Experiments with simplified computation and random data.

We generate reasonably large float array 1e5 x 1e5 and save it in large chunks (400MB) and small chunks (4MB). Then run 3 experiments (many times 30 - 100):

  1. read the large chunks using native zarr chunks, this will read one 400MB chunk per task (aka 400/400/4)
  2. read the small chunks using native zarr chunks, this will read one 4MB chunk per task (aka 4/4/4)
  3. read the small chunks using large zarr chunks, this will read 100 small chunks concurrently (using zarr -> fsspec getitems) which accumulate to 400MB per task (aka 4/400/4)

In the (X/Y/Z) notation, X stands for chunk size at rest time (storage), Y is read chunk size (the size zarr will use to read data from storage, at this point the data is already in memory), Z stands for chunk size used for computation.

In each experiment we multiple the array by a scalar, and use 4MB chunking for this computation (this is to imitate that our problem requires certain chunk size that doesn't necessarily correspond to the zarr chunk size). We run the computations many times and measure time/memory/network. All data is stored on GCS (in the same region as the VM).

Dask cluster setup:

We are using distributed process pool based cluster on a single 16 core VM, with 16 workers/processes with 1 thread each. This is to avoid any GIL issues, and since the computation is embarrassingly parallel, unless there is some job stilling happening there is very little communication between workers. We run 2 rounds of computation to warm up the cluster. I have also tried a thread pool based cluster but was seeing GIL impact.

Results

Storage chunks IO chunks Computation chunks Min time Mean ± stdev Mean GCS BW
400MB 400MB 4MB 37.06 47.68 ± 17.35 774.45 MB/s
4MB 4MB 4MB 112.83 126.8 ± 10.19 287.62 MB/s
4MB 400MB 4MB 52.89 62.05 ± 15.13 572.71 MB/s

Notes:

newplot (44)

And next a macro (much less accurate than the data used for the network histogram above) view of the CPU/IO (Network)/Memory usage for the cases (400/400/4) and (4/400/4). First we perform many 400/400/4 from 3pm until around 4:30, and then switch to 4/400/4 until the end.

Screenshot

Notes:

Conclusion: this validates that larger chunks result in better throughput even when compared with concurrent read of many small chunks. But reading large chunks (or many small chunks) comes at the cost of memory (as mentioned in the previous comments).

Code

Code: generate the data ```python def create_data(path, chunks): fs_map = gcs.get_mapper(path) ar_dsk = da.random.random(size=(100_000, 100_000), chunks=chunks).astype(np.float32) ar_xr = xr.Dataset(data_vars=(dict(foo=(("x", "y"), ar_dsk)))) with ProgressBar(), ResourceProfiler() as rprof: ar_xr.to_zarr(fs_map, mode="w") ```
Code: computations ```python def large_chunks(path): # 16MB block size gcs = gcsfs.GCSFileSystem(block_size=16 * 1024 * 1024) fs_map = gcs.get_mapper(path) # use zarr chunking ar_xr = xr.open_zarr(fs_map) # rechunk/split to smaller chunks ar_xr["foo"] = ar_xr.foo.chunk(dict(x=1_000, y=1_000)) # Simple computation on part of the matrix dat = (ar_xr.foo * 2.0).data.persist() wait(dat) del dat def small_chunks(path): # default block size of 5MB gcs = gcsfs.GCSFileSystem() fs_map = gcs.get_mapper(path) # read in many zarr chunks at the same time ar_xr = xr.open_zarr(fs_map, chunks=dict(x=10_000, y=10_000)) # rechunk/split to smaller chunks ar_xr["foo"] = ar_xr.foo.chunk(dict(x=1_000, y=1_000)) # Simple computation on part of the matrix dat = (ar_xr.foo * 2.0).data.persist() wait(dat) del dat def native_chunks(path): # default block size of 5MB gcs = gcsfs.GCSFileSystem() fs_map = gcs.get_mapper(path) # use zarr chunking ar_xr = xr.open_zarr(fs_map) # Simple computation on part of the matrix dat = (ar_xr.foo * 2.0).data.persist() wait(dat) del dat ```
ravwojdyla commented 3 years ago

GCS stats:

ravwojdyla commented 3 years ago

TLDR thus far:

Another discussion that came up is the memory impact on the performance, given that larger chunk will require more memory. afaiu by default in a dask cluster:

In GCP, default VMs are divided into classes of machines like “standard” or “highmem”, in each class the memory to CPU ratio is constant, for standard class it's 4GB/CPU and for highmem it's 8GB/CPU. So in theory increasing the size of the VM within a class doesn’t affect the M/WT ratio. In the context of Spark or MR/YARN, a user would often set the executor/container memory and cores request per pipeline (each pipeline having different requirements), in the context of Dask the same thing can be achieved, but is arguably less explicit. There are worker resources and more recently layer annotations https://github.com/dask/dask/issues/6701, but we can also use the cluster configuration to manipulate the M/WT ratio, by setting the number of WT (+ making sure the worker gets full memory, and potentially adjusting the number of thread available for numpy ops).

jeromekelleher commented 3 years ago

A lot to digest here, thanks for the great work @ravwojdyla!

ravwojdyla commented 3 years ago

And a way to connect these with #390, I personally would be very curious to see the #390 GWAS benchmark with:

ravwojdyla commented 3 years ago

Based on the performance tests done above, here are some high level guidelines for dask performance experiments (this is a starting point, we might find a better home for this later, and potentially have someone from Dask review them):

TODO: