cubed-dev / cubed

Bounded-memory serverless distributed N-dimensional array processing
https://cubed-dev.github.io/cubed/
Apache License 2.0
121 stars 14 forks source link

In-memory rechunk #502

Open TomNicholas opened 3 months ago

TomNicholas commented 3 months ago

On some systems there are scenarios where we know we can perform a rechunk entirely in-memory without writing to disk. For example if I locally run this test which performs a single rechunk stage:

spec = cubed.Spec(work_dir="tmp", allowed_mem="400MB")

# 80MB chunks
a = cubed.random.random((10000, 10000), chunks=(10000, 1000), spec=spec)
b = a.rechunk((1000, 10000))
b.visualize(optimize_graph=True)
b.compute()
array([[0.49041066, ..., 0.98331677],
       ...,
       [0.17032274, ..., 0.77605032]])

image

then it writes an intermediate Zarr store to disk (along with the initial and final arrays),

ls tmp/cubed-20240714T181729-c3b4f32b-ccdc-4a8b-8d13-173f3b49e412 
array-003.zarr      array-004-int.zarr  array-004.zarr

even though I know that my laptop has plenty enough RAM to materialize all the 10 80MB chunks needed for this rechunk into memory at once, which would have avoided writing that intermediate array to disk.

This scenario of uneccessarily writing to disk occurs anytime that: a) there is some way that workers could potentially communicate data that isn't writing to persistent storage, b) The whole rechunk stage is small enough that we have n_tasks * allowed_mem_per_task < total_system_memory.

(a) will never happen in a serverless cloud context, because AFAIK there is no other possible way that two lambda functions can communicate other than by writing to s3. But this is not true on other systems. HPC nodes can alternatively communicate via their interconnect (see https://github.com/cubed-dev/cubed/issues/467), and threads running on a single machine can potentially communicate via shared memory (see #492 and https://github.com/cubed-dev/cubed/issues/497).

(b) requires an additional piece of information - the total memory available across the entire system. On a HPC cluster this is mem_per_node * n_nodes, and on a single machine this is just system RAM.

Obviously we don't want to try to do an in-memory rechunk every time, so I propose that we add an additional parameter total_system_mem to the Spec, and allow executors the option to use a different rechunking strategy if condition (b) is fulfilled for a particular rechunk stage in the plan. That way we can use optimized in-memory shuffle algorithms for rechunks that don't need to be written to disk, whilst still writing out to disk for the rechunks that are definitely going to be too big to do in-memory.

You might say "but Tom, I thought the whole point of Cubed was to avoid a complicated centralized shuffle?" But I think what I'm proposing is okay, because we have still broken up the shuffle problem into two clearly separate cases, the larger-than-system-memory case and the smaller-than-system-memory case. We can reason about each of these individually, we know which one we will need in advance, we can still control memory usage as the size of the data to be passed around in the shuffle is known, and we don't have a shuffle algorithm that has to awkwardly deal with both cases by spilling some data to disk when it suddenly realises half-way through that it's about to run out of RAM.

cc @ayushnag @applio

TomNicholas commented 3 months ago

To demonstrate this point I was thinking I could get the above rechunking task done by using a different strategy - if I use an in-memory Zarr store as the "storage" layer I'm effectively doing my shuffle in-memory, and could complete the calculation without writing to disk. (I was going to try this at the sprint but I ran out of time.)

TomNicholas commented 3 months ago

broken up the shuffle problem into two clearly separate cases

I guess one downside of this binary separation is that it would create a performance cliff with respect to problem size for the user when their rechunk problem requires slightly less than vs slightly more than system memory.

TomNicholas commented 3 months ago

On some systems you could actually have 3 different options for the rechunk. If you ran Cubed on a single fat machine in the Cloud you could do small rechunks in RAM, larger ones by spilling to the machine's disk, and the largest rechunks only by writing and reading to s3.

Imagine doing a big reduction operation down to a small result on such a machine. Each round of the reduction requires a rechunk, which get logarithmically smaller at each stage. So you might imagine that the first uses s3, the second uses local disk, and the last uses RAM.

Of course the downside of running on that one machine is that the horizontal scale of the parallelism is limited. There is a fundamental distinction between executors which provision a fixed amount of resources for use (e.g. single-machine and HPC) and truly serverless executors which can provision arbitrarily large resources at each step, but don't have an alternative method of communication to use as a fast path.

tomwhite commented 3 months ago

This reminds me of a memory cache hierarchy (level 1, level 2, level 3, main memory).

It would be a legitimate optimization, but I'm not sure how significant the performance improvement would be - reduction workloads tend to spend most of the their time in the initial round of reduction, with later rounds (that would potentially fit in memory) being very fast by comparison.

TomNicholas commented 3 months ago

It would be a legitimate optimization, but I'm not sure how significant the performance improvement would be - reduction workloads tend to spend most of the their time in the initial round of reduction, with later rounds (that would potentially fit in memory) being very fast by comparison.

I see what you mean for reductions, but for other workloads (that aren't front-loaded like reductions are) surely the saving could still be significant? If the users' computation is small enough to fit in memory (perhaps because they aggressively sliced the data down at the start) but still requires some rechunks then this could be the difference between doing some IO and doing no IO. But then again I don't actually know if there are any operations like that - your operations dependency graph implies that there aren't...

I guess optimization work like this should be motivated by benchmarking rather than by guesswork. It would be quite easy to look at existing workloads and estimate what the savings might be (i.e. by looking at the timings and imagining removing the time taken for all rechunk steps smaller than cluster size).

TomNicholas commented 3 months ago

A tangential thought: it's interesting to think about the dask executor using a rechunk strategy that isn't always the full on-disk rechunker algorithm. If the dask executor could somehow use dask's newer P2P rechunking algorithm, then the difference between dask-on-dask and cubed-on-dask would become a lot smaller...