pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.6k stars 1.08k forks source link

Performance issues using map_blocks with cftime indexes. #5701

Closed aulemahal closed 2 months ago

aulemahal commented 3 years ago

What happened: When using map_blocks on an object that is dask-backed and has a CFTimeIndex coordinate, the construction step (not computation done) is very slow. I've seen up to 100x slower than an equivalent object with a numpy datetime index.

What you expected to happen: I would understand a performance difference since numpy/pandas objects are usually more optimized than cftime/xarray objects, but the difference is quite large here.

Minimal Complete Verifiable Example:

Here is a MCVE that I ran in a jupyter notebook. Performance is basically measured by execution time (wall time). I included the current workaround I have for my usecase.

import numpy as np
import pandas as pd
import xarray as xr
import dask.array as da
from dask.distributed import Client

c = Client(n_workers=1, threads_per_worker=8)

# Test Data
Nt = 10_000
Nx = Ny = 100
chks = (Nt, 10, 10)

A = xr.DataArray(
    da.zeros((Nt, Ny, Nx), chunks=chks),
    dims=('time', 'y', 'x'),
    coords={'time': pd.date_range('1900-01-01', freq='D', periods=Nt),
            'x': np.arange(Nx),
            'y': np.arange(Ny)
           },
    name='data'
)

# Copy of a, but with a cftime coordinate
B = A.copy()
B['time'] = xr.cftime_range('1900-01-01', freq='D', periods=Nt, calendar='noleap')

#  A dumb function to apply
def func(data):
    return data + data

# Test 1 : numpy-backed time coordinate
%time outA = A.map_blocks(func, template=A)  # 
%time outA.load();
# Res on my machine:
# CPU times: user 130 ms, sys: 6.87 ms, total: 136 ms
# Wall time: 127 ms
# CPU times: user 3.01 s, sys: 8.09 s, total: 11.1 s
# Wall time: 13.4 s

# Test 2 : cftime-backed time coordinate
%time outB = B.map_blocks(func, template=B)
%time outB.load();
# Res on my machine
# CPU times: user 4.42 s, sys: 219 ms, total: 4.64 s
# Wall time: 4.48 s
# CPU times: user 13.2 s, sys: 3.1 s, total: 16.3 s
# Wall time: 26 s

# Workaround in my code
def func_cf(data):
    data['time'] = xr.decode_cf(data.coords.to_dataset()).time
    return data + data

def map_blocks_cf(func, data):
    data2 = data.copy()
    data2['time'] = xr.conventions.encode_cf_variable(data.time)
    return data2.map_blocks(func, template=data)

# Test 3 : cftime time coordinate with encoding-decoding
%time outB2 = map_blocks_cf(func_cf, B)
%time outB2.load();
# Res
# CPU times: user 536 ms, sys: 10.5 ms, total: 546 ms
# Wall time: 528 ms
# CPU times: user 9.57 s, sys: 2.23 s, total: 11.8 s
# Wall time: 21.7 s

Anything else we need to know?: After exploration I found 2 culprits for this slowness. I used %%prun to profile the construction phase of map_blocks and found that in the second case (cftime time coordinate):

  1. In map_blocks calls to dask.base.tokenize take the most time. Precisely, tokenizing a numpy ndarray of O dtype goes through the pickling process of the array. This is already quite slow and cftime objects take even more time to pickle. See Unidata/cftime#253 for the corresponding issue. Most of the construction phase execution time is spent pickling the same datetime array at least once per chunk.
  2. Second, but only significant when the time coordinate is very large (55000 in my use case). CFTimeIndex.__new__ is called more than twice as many times as there are chunks. And within the object creation there is this line : https://github.com/pydata/xarray/blob/3956b73a7792f41e4410349f2c40b9a9a80decd2/xarray/coding/cftimeindex.py#L228 The larger the array, the more time is spent in this iteration. Changing the example above to use Nt = 50_000, the code spent a total of 25 s in dask.base.tokenize calls and 5 s in CFTimeIndex.__new__ calls.

My workaround is not the best, but it was easy to code without touching xarray. The encoding of the time coordinate changes it to an integer array, which is super fast to tokenize. And the speed up of the construction phase is because there is only one call to encode_cf_variable compared to N_chunks calls to the pickling, As shown above, I have not seen a slowdown in the computation phase. I think this is mostly because the added decode_cf calls are done in parallel, but there might be other reason I do not understand.

I do not know for sure how/why this tokenization works, but I guess the best improvment in xarray could be to:

I have no idea if that would work, but if it does that would be the best speed-up I think.

Environment:

Output of xr.show_versions() INSTALLED VERSIONS ------------------ commit: None python: 3.9.6 | packaged by conda-forge | (default, Jul 11 2021, 03:39:48) [GCC 9.3.0] python-bits: 64 OS: Linux OS-release: 3.10.0-514.2.2.el7.x86_64 machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_CA.UTF-8 LOCALE: ('en_CA', 'UTF-8') libhdf5: 1.10.6 libnetcdf: 4.7.4 xarray: 0.19.1.dev18+g4bb9d9c.d20210810 pandas: 1.3.1 numpy: 1.21.1 scipy: 1.7.0 netCDF4: 1.5.6 pydap: None h5netcdf: None h5py: None Nio: None zarr: 2.8.3 cftime: 1.5.0 nc_time_axis: None PseudoNetCDF: None rasterio: None cfgrib: None iris: None bottleneck: 1.3.2 dask: 2021.07.1 distributed: 2021.07.1 matplotlib: 3.4.2 cartopy: 0.19.0 seaborn: None numbagg: None pint: 0.17 setuptools: 49.6.0.post20210108 pip: 21.2.1 conda: None pytest: None IPython: 7.25.0 sphinx: None
dcherian commented 2 years ago

I think this could be fixed by adding __dask_tokenize__ to CFTimeIndex.

For IndexVariables we pass self._data.array to normalize_token https://github.com/pydata/xarray/blob/ea994450b225324ae51c0ed47c80e9801c4ff2e3/xarray/core/variable.py#L2688-L2692

_data.array turns out to be CFTimeIndex on main (so that comment above is wrong). Dask knows how to handle pandas indexes so this isn't usually a problem.

phofl commented 2 months ago

The overhead seems to be gone now, the map_blocks call builds in a few hundred milliseconds for me instead of 5 seconds

aulemahal commented 2 months ago

I confirm the same! I don't know what was changed, but with xarray 2024.07.0 and 2024.08.0, the CFTime and Pandas cases yield the same performance.

dcherian commented 2 months ago

presumably dask is now better at tokenizing object arrays?

phofl commented 2 months ago

Yeah we changed a bunch of things there, I haven't dug into the release that actually made it faster though