scverse / anndata

Annotated data.
http://anndata.readthedocs.io
BSD 3-Clause "New" or "Revised" License
569 stars 152 forks source link

Better indexing performance for backed sparse arrays via slices #1224

Open ivirshup opened 11 months ago

ivirshup commented 11 months ago

Please describe your wishes and possible alternatives to achieve the desired result.

The problem

Indexing statements for backed anndata objects often look like:

subset = backed_adata[
    (backed_adata.obs["self_reported_ethnicity"] == "European") & \
    (backed_adata.obs["sex"] == "female")
]

Notably, data is often ordered such that categories of interest are actually colocalized, e.g. data is sorted by patient or condition.

The indexing statement above generates a boolean mask or integer array indexer which can unfortunately be slow, especially with zarr backed matrices.

Zarr benchmarks

For example:

import numpy as np
import zarr
from anndata.experimental import read_elem, write_elem, sparse_dataset
from scipy import sparse

# Create data
rng = np.random.default_rng()

X = sparse.random(100_000, 10_000, density=0.01, random_state=rng, format="csr")
group = zarr.open("demo.zarr", mode="w")
write_elem(group, "X", X)

X_zarr = sparse_dataset(group["X"])
X_zarr

# Make a mask where contiguous chunks are True
inds = np.random.choice(X_zarr.shape[0], 100, replace=False)
inds.sort()

mask = np.zeros(X_zarr.shape[0], dtype=bool)
for i in range(0, len(inds) - 1, 2):
    mask[inds[i]:inds[i+1]] = True
%time as_mask = X_zarr[mask]
# CPU times: user 19.6 s, sys: 9.06 s, total: 28.7 s
# Wall time: 28.7 s
# converting to integer indices
%time as_indices = X_zarr[np.where(mask)]
# CPU times: user 19.8 s, sys: 8.68 s, total: 28.5 s
# Wall time: 28.5 s

Potential solution

However, if we simply translate the mask into contiguous slices:

# Converting to slices
def mask_to_slices(mask):
    return np.ma.extras._ezclump(mask)

%time as_slices = sparse.vstack([X_zarr[s] for s in mask_to_slices(mask)])
# CPU times: user 73.4 ms, sys: 32.5 ms, total: 106 ms
# Wall time: 105 ms

We see significant speed ups, even accounting for the overhead (and an admittedly inefficient quick implementation, we should be able to avoid constructing so many intermediate matrices). Of course, the results are all the same:

assert (as_indices != as_mask).nnz == 0
assert (as_slices != as_mask).nnz == 0

HDF5 benchmarks

HDF5 has similar behaviour, but is considerably faster with no optimization:

Demo using h5py + same matrices ```python import h5py h5_group = h5py.File("demo.h5", mode="w") # using compression for a fair-ish comparison write_elem(h5_group, "X", X, dataset_kwargs={"compression": "lzf"}) X_h5 = sparse_dataset(h5_group["X"]) X_h5 ``` ```python %time as_mask = X_h5[mask] # CPU times: user 1.77 s, sys: 11.8 ms, total: 1.78 s # Wall time: 1.79 s ``` ```python # converting to integer indices %time as_indices = X_h5[np.where(mask)] # CPU times: user 1.92 s, sys: 31 ms, total: 1.96 s # Wall time: 1.96 s ``` ```python # Converting to slices %time as_slices = sparse.vstack([X_h5[s] for s in mask_to_slices(mask)]) # CPU times: user 59 ms, sys: 15.7 ms, total: 74.7 ms # Wall time: 74.7 ms ```

Proposal

We should make some effort to optimize our indexing into stores. The solution above works well for boolean masks. The overhead of calling the function is also very low (~0.1% of the optimized operation).

%timeit mask_to_slices(mask)
# 39.4 µs ± 73.3 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

Complications

Heuristics are needed for choosing when to bail out of operations:

The above optimization can actually tank performance in worst case scenarios:

mask_alternating = np.zeros(X_zarr.shape[0], dtype=bool)
mask_alternating[::2] = True

%time X_h5[mask_alternating]
# CPU times: user 2.2 s, sys: 51.6 ms, total: 2.26 s
# Wall time: 2.26 s

%time sparse.vstack([X_h5[s] for s in mask_to_slices(mask_alternating)])
# CPU times: user 1min 21s, sys: 5.49 s, total: 1min 27s
# Wall time: 1min 27s

But the slices are very cheap to compute. So I would suggest we look at the slices generated and choose a path based on some threshold.

Integer based indexing is more complicated

For integer based indexing we will have multiple additional sorting steps. These are significantly more expensive than finding runs of bools.

Alternatives?

Upstreaming

Some performance enhancements could be upstreamed. Ideally enough to get zarr on par with hdf5. However, indexing into the sparse data structures is very much something we should be handling directly.

Alternative optimization strategies

There are likely even more effective optimization strategies we could pursue. These are not strictly alternatives, however. These include:

Demo using real data (TODO, make available)

Not as much of a performance boost, but does put zarr inline with h5py. Not totally sure why

mask = (
    (obs["self_reported_ethnicity"] == "European") & \
    (obs["sex"] == "female")
).values

%time X_zarr[mask]
# CPU times: user 36.6 s, sys: 7.29 s, total: 43.9 s
# Wall time: 43.9 s

%time sparse.vstack([X[s] for s in mask_to_slices(mask)])
# CPU times: user 7.19 s, sys: 804 ms, total: 7.99 s
# Wall time: 7.99 s

cc: @ilan-gold

ivirshup commented 10 months ago

Some benchmarking results using more fixed inputs:

Benchmarking code ```python from itertools import product import h5py, zarr, numpy as np, pandas as pd from anndata.experimental import read_elem, write_elem, sparse_dataset f = h5py.File("/mnt/workspace/data/cd19-carT-atlas-417k.h5ad") X = sparse_dataset(f["X"]) obs = read_elem(f["obs"]) mask = ( (obs["self_reported_ethnicity"] == "European") & \ (obs["sex"] == "female") ).values def write_hdf5(group: h5py.Group, name: str, x, chunks: int, compression: str): import hdf5plugin if compression == "lz4": compression = {**hdf5plugin.LZ4()} else: compression = {"compression": compression} write_elem(group, name, x, dataset_kwargs={"chunks": chunks, **compression}) def write_zarr(group: zarr.Group, name: str, x, chunks: int, compression: str): import numcodecs if compression == "lz4": compression = {"compressor": numcodecs.LZ4()} else: compression = {"compressor": compression} write_elem(group, name, x, dataset_kwargs={"chunks": chunks, **compression}) # Create datasets zarr_group = zarr.open("bench.zarr", mode="w") h5_group = h5py.File("bench.h5", mode="w") compression = [None, "lz4"] chunk_size = [100_000, 10_000] keys = [] for comp, chunking in product(compression, chunk_size): key = f"X_{comp}_{chunking}" keys.append(key) print(key) %time write_zarr(zarr_group, key, X, chunking, comp) %time write_hdf5(h5_group, key, X, chunking, comp) for k in keys: zarr_X = sparse_dataset(zarr_group[k]) h5_X = sparse_dataset(h5_group[k]) print(f"zarr {k} bool") %timeit zarr_X[mask] print(f"zarr {k} slices") %timeit sparse.vstack([zarr_X[s] for s in mask_to_slices(mask)]) print(f"h5 {k} bool") %timeit h5_X[mask] print(f"h5 {k} slices") %timeit sparse.vstack([h5_X[s] for s in mask_to_slices(mask)]) ```
fmt compression chunksize index_type time
h5 None 100000 slices 1.35
zarr None 100000 slices 1.42
h5 None 10000 slices 1.57
zarr lz4 100000 slices 1.6
zarr None 10000 slices 2.53
h5 lz4 100000 slices 3.15
zarr lz4 10000 slices 3.33
h5 lz4 10000 slices 4.54
h5 None 100000 bool 5.32
h5 None 10000 bool 5.47
h5 lz4 100000 bool 6.93
h5 lz4 10000 bool 8.4
zarr None 10000 bool 15.8
zarr None 100000 bool 21.3
zarr lz4 10000 bool 21.4
zarr lz4 100000 bool 41.5
ilan-gold commented 10 months ago

I just want to give a reason as to why this is slowing down in general when using mask vs. slice vs. integer (i.e., for both zarr and h5):

Internally, scipy intercepts our mask here and converts it to an integer index (or at least that is what seems to happen) before actually indexing. So this explains why the performance of those two (boolean and integer index) are the same. Something else I noticed is that we even do our own internal conversion here but this actually is not called in this demo due to scipy doing the same thing as noted.

Now I'll try to speak as to why the performance of integer indices is so bad since this is actually what is going on under the hood. The problem appears to be on our end from calling get_compressed_vectors where the following occurs, operating on integer indices (as noted):

slices = [slice(*(x.indptr[i : i + 2])) for i in row_idxs]

So of course this is going to be very slow. Creating very small slices for every entry in row_idxs and then accessing the data/indices every time is not efficient.

I'm looking into a solution and a heuristic now (for the slices). I think that we should either

  1. Intercept here: https://github.com/scverse/anndata/blob/af7a5b74bdab8cf4b640730bca0f99ac4e0b36b3/anndata/_core/sparse_dataset.py#L343-L352 so that the boolean mask is not converted to integers, and we instead call mask_to_slices, or
  2. Solve the problem inclusive of integers as an indexer. So this would mean creating slices for integer indexes, as we have now for masks, and doing the conversion later e.g., in _get_arrayXslice which will work for both boolean and integer indices.

I will also need to benchmark to find the heuristic so that we don't tank performance as you mentioned.

ilan-gold commented 10 months ago

Something I forgot to mention/do is to see how this performs without our backing class. This could reveal a potential upstreamed-fix but I doubt it, given the problematic methods we use are ones that we overrode.

ivirshup commented 10 months ago

I'm looking into a solution and a heuristic now

I did have an idea about the heuristic, but also a bigger brain moment.

Is there any difference between the proposed solution and the current behaviour in the worst case scenario? Either way a long list of slices is generated then accessed.

So unless we also develop something more optimal for our worst case scenario, maybe performance won't be tanked by using this approach everywhere?


I believe a lot of the performance is lost based on us needing to decompress the whole chunk every time we do any indexing. So if we could have an LRU cache for the decompressed chunks that could be quite useful. But maybe separate PR since it's a bit orthogonal though would also help.

ilan-gold commented 10 months ago

I think you showed there was a difference so I believe that something is going on. I tried to find out what but only have some suspicions.

I think the difference is the the number of calls to csr_matrix - there's a non-trivial time-overhead to creating these and the functions get_compressed_vectors and get_compressed_vector seem to have different speed profiles (i.e, get_compressed_vectors != n * get_compressed_vector for n accesses via get_compressed_vectors).

So when we do the the proposed solution, we are making many small csr_matrix calls. For example, when I do one X_h5[mask_to_slices(mask_alternating)[0]] call, I timed the underlying get_compressed_vector and subsequent ss.csr_matrix call in the code - the ss.csr_matrix takes on ~1.5x as long. Whereas in the current case, there is only one ss.csr_matrix call (which can take longer, but not on the scale of the number of calls we make).

There also seems to be some advantage to doing the list comprehension over the arrays themselves as opposed to on the top-level i.e., the proposed solution. But I can't reproduce this. I suspect there is some sort of hot-start/cold-start thing going on since some random access where much worse than others (perhaps what you were saying about caching?). What I can say for sure is that csr_matrix in the best case scenario doubles/triples the needed time, and that the list comprehension within get_compressed_vectors is also likely giving performance speedups (but I don't know why).

TL;DR There is definitely a difference.

ilan-gold commented 10 months ago

Just to pull back a bit (for myself):

  1. We know that internally, these boolean arrays are being converted to integer indices. This is why the performance of the two is identical.
  2. Slices are of course better than a bunch of individual integer accesses, but there is a "tipping point" where integer accesses are faster than slices.
  3. Relatedly, there is a worst-case scenario where performance tanks, potentially because of the way the list-comprehension is performed and at what point. But also potentially because of slices vs. integers.

I have a little prototype demo locally that does not completely tank performance in the worst case (but is still worse). Maybe it's a starting point.

In terms of the heuristic for when to stop using our improvement, performance stops completely tanking pretty quickly:

for max_cont in range(2, 100):
    mask_alternating = np.zeros(X_zarr.shape[0], dtype=bool)
    for i in range(max_cont - 1):
        mask_alternating[i::max_cont] = True
    print('solution with slices of size ', max_cont - 1, ' contiguous chunks')
    %time l = [X_zarr[s] for s in mask_to_slices(mask_alternating)]
    print('current with slices of size ', max_cont - 1, ' contiguous chunks')
    %time l = X_zarr[mask_alternating]

So perhaps an average size of like 5 or 10 for contiguous chunks? Or maybe some other summary statistic? In any case this should give a decent guide. I'll push my local branch soon

ivirshup commented 10 months ago

Is there any difference between the proposed solution and the current behaviour in the worst case scenario? Either way a long list of slices is generated then accessed.

So when we do the the proposed solution, we are making many small csr_matrix calls.

We could probably write the proposed solution to put together a matrix from the underlying arrays and not doing all the intermediate construction of sparse matrices. Then these cases should be equivalent?