scverse / anndata

Annotated data.
http://anndata.readthedocs.io
BSD 3-Clause "New" or "Revised" License
571 stars 152 forks source link

dask.array.store but with anndata schema #1403

Open ivirshup opened 7 months ago

ivirshup commented 7 months ago

Please describe your wishes and possible alternatives to achieve the desired result.

We should have some way of persisting intermediate results in dask objects to non-volatile storage (disk/ object-store).

It should probably follow a similar API to da.store. Some relevant docs from dask:

Use cases I'm thinking of include:

adata.X: dask.array.Array

from anndata.io import TemporaryStorage # Comes up with unique key names so user doesn't have to think about overriding

temp_store = TemporaryStorage(zarr.open("s3:..."))

adata.layers["counts"] = adata.X.copy()
sc.pp.normalize_total(adata)
sc.pp.log1p(adata)
sc.pp.pca(adata)

adata.obsm["X_pca"] = temp_store .store(adata.obsm["X_pca"])
adata.X = temp_store .store(adata.X)
adata.layers["normalized_csc"] = temp_store.store(
    adata.X.map_blocks(sparse.csc_matrix).rechunk((100, -1)),
)

sc.pl.umap(adata, [...], layer="normalized_csc")

cc: @Intron7 @ilan-gold

It could be good to look into how xarray handles this.

ivirshup commented 6 months ago

While working on:

We got to see some performance benefits to this. For example:

sc.pp.normalize_total(adata_dask, target_sum=1e4)
sc.pp.log1p(adata_dask)
adata_dask.X = adata_dask.X.to_zarr(
    "cell_atlas_normalized_X.zarr",
    storage_options={"consolidated": True, "compressor": zarr.Zstd()},
    return_stored=True,
    overwrite=True,
)
sc.pp.highly_variable_genes(adata_dask, min_mean=0.0125, max_mean=3, min_disp=0.5)
sc.pp.pca(adata_dask)
adata_dask.obsm["X_pca"] = adata_dask.obsm["X_pca"].compute()

The to_zarr call seems to provide significant benefit by caching the normalization and log transformation that would otherwise need to be performed again for calculating HVGs, firing the PCA, and finally calculating the observation level embeddings.