xarray-contrib / flox

Fast & furious GroupBy operations for dask.array
https://flox.readthedocs.io
Apache License 2.0
124 stars 18 forks source link

Handling sparse matrices #346

Open ivirshup opened 7 months ago

ivirshup commented 7 months ago

I'm getting unexpectedly high memory usage with flox. Here's what I've been doing:

import dask.distributed as dd
import dask.array as da
import numpy as np
import flox

cluster = dd.LocalCluster(n_workers=3)
client = dd.Client(cluster)

M, N = 1_000_000, 20_000

X = da.random.normal(size=(M, N), chunks=(10_000, N))
by = np.random.choice(5_000, size=M)

res, codes = flox.groupby_reduce(
    X.T,
    by,
    func="sum",
    fill_value=0,
    method="map-reduce",
    reindex=True,
)

res_comp = res.compute()

This always warns about memory usage then fails on my dev machine with 64 gb of memory. However, I'm able to do plenty of other operations with an array this size (e.g. PCA, simple reductions). To me, a tree reduction here should be more than capable of handling this size of array.

Is this just me and my compute being odd, or do I have an incorrect expectation here?

cc: @ilan-gold

dcherian commented 7 months ago

You're starting with 1.5GiB chunk sizes on X. I would reduce that to the 200MB range. The bottleneck is usually numpy_groupies here. You should see input_validation prominently in the dask flamegraph

So I would also try installing numbagg. It'll be a bit slow to compile but should be faster and make less memory copies.

dcherian commented 7 months ago

Running this locally, I also spot a dask scheduling bug where it doesn't treat normal as a data generating task and runs way too many of them initially before reducing that data. Can you open a dask issue please?

dcherian commented 7 months ago

Ah I keep forgetting this numbagg only helps with nan-skipping aggregations, so it won't really help here.

I think this is a dask scheduling issue.

ivirshup commented 7 months ago

I also spot a dask scheduling bug where it doesn't treat normal as a data generating task and runs way too many of them initially before reducing that data.

In my real world use case, I get this just loading data from a zarr store.

I think this is a dask scheduling issue.

Me too, but I'm not sure why flox seems to be triggering it. In the dask issue I show that other tree aggregations with this array (X.sum(axis=0)) seem fine.

dcherian commented 7 months ago

Your last comment is important context! (the zarr bit in particular). I would add that to the other issue

dcherian commented 5 months ago

Reproducer here: https://gist.github.com/ivirshup/eb4f5beb1bb33724b8c11bd0eacf03a6 from https://github.com/dask/dask/issues/11026#issuecomment-2151141540


This works on my laptop with 32GB RAM with some spilling

res, codes = flox.groupby_reduce(
    X_dense.T,
    by,
    func="sum",
    fill_value=0,
)

If you turn on logging with

import logging
logger = logging.getLogger("flox")
logger.setLevel("DEBUG")
console_handler = logging.StreamHandler()
logger.addHandler(console_handler)

you'll see it automatically chooses method="cohorts" and it just works albeit with spilling. There seems be a densification of each sparse block that could be improved. I'll have to look in to it.

EDIT: your choice of reindex=True was using way more memory than necessary to run this reduction. You have 2390 unique groups, but there are O(10) groups in the output chunks with cohorts. That means reindex=True was inflating each block to ~ 2390/~10 = 200X larger than it needed to be.

plt.hist(res.chunks[-1])

image

EDIT2: I guess you're densifying on your own...

EDIT3: Memory issues can be controlled by using numbagg and nansum. It's also 5X faster (approx). I guess we should look in to how to make flox just handle sparse matrices directly.

res, codes = flox.groupby_reduce(
    X_dense.T,
    by,
    func="nansum",
    fill_value=0,
    engine="numbagg",
)
dcherian commented 5 months ago

How would flox handle sparse matrices directly? graphblas sparse matrix product? We can construct the sparse matrix from by really quickly (we know where the ones are already).

ivirshup commented 5 months ago

graphblas can be nice, since we can pretty easily define a semi-ring for many tasks here (though, not so sure about median) and it's very fast. But it's always going to be only 2d, and is GPL.

In scanpy right now we are just doing matrix multiplication (code) for in-memory aggregation, but I suspect will end up using some numba kernels since we can get better performance and numerical stability.

@Intron7 may have some thoughts here as he wrote some CUDA kernels for these aggregations in memory.

dcherian commented 5 months ago

graphblas can be nice, since we can pretty easily define a semi-ring for many tasks here (though, not so sure about median) and it's very fast. But it's always going to be only 2d, and is GPL.

I was thinking of using it through python-graphblas which is Apache-2.

n scanpy right now we are just doing matrix multiplication (code) for in-memory aggregation, but I suspect will end up using some numba kernels since we can get better performance and numerical stability.

Oh nice! We could move your code into flox as an "engine" or alternatively tie in to scanpy with engine="scanpy". I prefer the former, I think, it'd be nice to not need an optional dependency for fairly simple code.

For reference, "engine"s handle the in-memory part of the aggregation. And we already have the codes ready.

ivirshup commented 5 months ago

I was thinking of using it through python-graphblas which is Apache-2.

The wrapper is apache-2, but it pulls in graphblas which is GPL. So effectively same distribution issues is my understanding.

We could move your code into flox as an "engine"

That's how I've been thinking this sort of thing would work.

Would it be weird to have a "sparse" engine that only works with sparse chunks? Or would you want to add it to another engine?


your choice of reindex=True was using way more memory than necessary to run this reduction. You have 2390 unique groups, but there are O(10) groups in the output chunks with cohorts. That means reindex=True was inflating each block to ~ 2390/~10 = 200X larger than it needed to be.

I am trying to remember why I was using reindex=True. I think it may have just been for a memory usage I could estimate as I had a lot of trouble getting this to work without running out of memory at all.

FWIW, if there is sparse support, then we can increase the chunk sizes, which means we end up hitting more groups.

Also, we don't really have set expectations about the distribution of groups per chunk, so I'd like to be sure this works in the worst case where the labels are well shuffled.

dcherian commented 5 months ago

Would it be weird to have a "sparse" engine that only works with sparse chunks?

Don't think so. In general, I'd like users to not set it at all (the default is None now). Constructing a sparse "weights" matrix is another way to do a groupby anyway (Ken Iversion stuck it in his "notation as a tool of thought" paper too!) and flox has scipy as a dependency already.

Also, we don't really have set expectations about the distribution of groups per chunk, so I'd like to be sure this works in the worst case where the labels are well shuffled.

Yeah that would be nice. But it would also be good to get a real example. We might instead look to speed up the reindex=False case to reduce memory usage, and have that be chosen by default.