cubed-dev / cubed

Bounded-memory serverless distributed N-dimensional array processing
https://cubed-dev.github.io/cubed/
Apache License 2.0
115 stars 14 forks source link

Cubed for larger-than-memory workloads on a single machine #492

Open TomNicholas opened 1 month ago

TomNicholas commented 1 month ago

One thing that's really obvious at Scipy this year is that people absolutely love DuckDB. The reason is that although DuckDB doesn't offer multi-machine parallelism so can never scale horizontally, it can nevertheless operate on larger-than-memory workloads, and is extremely reliable at just chugging through them until they complete. This confidence is all people really need for 90% of real-world workloads. It seems like a lot of users are happy to lose the horizontal scaling in favour of simplicity and reliability of no longer trying to run a distributed system.

This has got me thinking about whether we could write an executor for Cubed that is single-machine but can handle larger-than-memory workloads. It would parallelize to fully utilize that machine but only that machine.


I've also been reading this blog post on Memory Management in DuckDB, and basically all they seem to do is a) have a preset memory usage limit (defaulting to 80% of machine RAM), b) spill to disk when necessary, c) try to minimize spilling to disk by efficient caching, d) fully utilize the parallelism available to them on that one machine to get vertical scaling.

I feel like in Cubed's model we have all the information we need to take a similar approach. We have parallel single-machine executors already (synchronous and asynchronous), but (please correct me if I've misunderstood @tomwhite) neither of these executors would work on larger-than-memory workloads, because they currently would naively try to launch more cubed tasks than could fit in memory (because Cubed's allowed_mem refers to per-task).

(I'm assuming this because I notice neither single-machine executor actually uses the cubed.Spec object for anything, so whilst they would raise if you tried to run a workload where a single task would use > allowed_mem (because cubed wouldn't even let you execute in that case), they can't possibly have a model of exactly how many tasks they can safely run in parallel simultaneously before running out of RAM.)

So what if we wrote an executor which:

Maybe I'm completely misunderstanding something about how sharing memory across processes works, but wouldn't this strategy basically have the same characteristics that the DuckDB approach has?

cc @rabernat @jakirkham

TomNicholas commented 1 month ago

Thinking more about how we might prototype this idea in Cubed today...

In https://github.com/cubed-dev/cubed/issues/187 @tomwhite added support for TensorStore as an alternative on-disk Zarr store for writing intermediate results to. That choice of storage layer is currently a global config flag, but what if we made it so that each stage in the plan could make its own choice about this flag, and so choose to write out to a different type of Zarr store?

Then we have the single machine executor described above write out to an on-disk Zarr store for stages where it knows some spilling to disk is required, but for other stages have it write to an in-memory Zarr store instead (or maybe even a RedisStore...). That way data is easily accessed by the correct task in the next stage without writing that data to disk.

EDIT: Actually we don't need this in order to just prototype the idea - simply batching the tasks as above and writing and intermediate store to disk for every stage as cubed currently does would be enough to test running on a larger-than-memory workload, it would just do a lot more IO than necessary when running on one machine.

tomwhite commented 1 month ago

Excited to see this! I agree this would be very attractive for some users and workloads.

I also think that we basically have what you describe today using the processes executor. As long as num_processes * allowed_mem <= total_mem things will just work. (The reason I didn't add that check in #411 was because it would need the psutil dependency - but that should be straightforward to add.)

The optimizer is pretty good at reducing the amount of intermediate data written, so it would be worth trying it out on some workloads as it stands. That may suggest more improvements to reduce IO that we could make of course.

alxmrs commented 1 month ago

Maybe I'm completely misunderstanding something about how sharing memory across processes works, but wouldn't this strategy basically have the same characteristics that the DuckDB approach has?

Inter-process communication is expensive. This approach is different than DuckDB, which is a single-process query engine. IIUC, it’s also embeddable within other applications (I don’t think that would be a good goal for Cubed). With multithreading, I believe DuckDB takes advantage shared memory between threads.

For a prototype, I think single-machine, multiprocessing is OK, but to get the next level of performance, multithreading is necessary. To this end, I think exploring #497 is warranted.

rabernat commented 1 month ago

I also love DuckDB! πŸ¦†

However, from a software architecture point of view, it is almost the complete opposite from Cubed. DuckDB was designed from the beginning as a high-performance, vertically scalable, standalone C++ application. Cubed was designed from the beginning as a fully distributed, horizontally scalable, serverless execution framework.

I support the idea of a local multiprocessing scheduler. But we shouldn't expect same algorithms and approaches that work well in the serverless, horizontally scaling context will automatically translate to good vertical scaling.

One great innovation of cubed has been the development of an Intermediate Representation (the Cubed Plan) for array computations. This is an important part of any high-performance database query engine and could help work towards the DuckDB-type idea. However, I'd argue this is close to a Physical Plan than a Logical Plan. A very different Physical Plan may be needed for a local, vertically scaling executor.

rabernat commented 1 month ago

p.s. Python has a shared memory pool for IPC.

TomNicholas commented 1 month ago

Cubed was designed from the beginning as a fully distributed, horizontally scalable, serverless execution framework.

Sure you could do better by designing something for the vertical case for the ground up, but I think we still have a good chance of making something really useful by clever implementation of the stages that cubed's model breaks everything up into.

One great innovation of cubed has been the development of an Intermediate Representation (the Cubed Plan) for array computations.

I think the real power of cubed's model is that it allows us to clearly reason about the shuffle in advance of starting it, and categorize it into the in-memory and out-of-memory case (see #502). We could get a very long way just by optimizing implementation details of that shuffle.

p.s. Python has a shared memory pool for IPC.

@applio and I discussed this today during the sprint, and he is literally the person who maintains this part of the python standard library (🀯), so I'm hoping he can give us some pointers on how to optimise the in-memory shuffle by sharing memory intelligently between threads/processes!

tomwhite commented 1 month ago

it can nevertheless operate on larger-than-memory workloads, and is extremely reliable at just chugging through them until they complete

As an experiment I tried running the Cubed benchmarks for Quadratic Means on my local machine with 8 cores and 16GB of memory using the processes executor. The CPUs were fully utilised, and there were no memory problems. The 5000 case below is 150GB of input data.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                          name                          β”‚           start            β”‚      duration      β”‚
β”‚                        varchar                         β”‚         timestamp          β”‚       double       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ test_quadratic_means_xarray[50-new-optimizer]          β”‚ 2024-07-18 15:25:50.892829 β”‚ 3.7533631324768066 β”‚
β”‚ test_quadratic_means_xarray[500-new-optimizer]         β”‚ 2024-07-18 15:28:18.81189  β”‚ 30.925524950027466 β”‚
β”‚ test_quadratic_means_xarray[5000-new-optimizer]        β”‚ 2024-07-18 15:34:14.146469 β”‚ 294.28265595436096 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€

No doubt there's room for improvement, but this is a good starting point.

tomwhite commented 1 month ago

I'd like to emphasise the single machine case more prominently in the examples (#505), since it's an easy way to get started with Cubed.

rabernat commented 1 month ago

That's a great result Tom!

rabernat commented 1 month ago

150 GB / 295 s = 508 MB/s

That's not a bad throughput and might getting close to the I/O limit. I assume you've got an SSD. Some SSDs can go 5x faster, but for others this is consistent with published benchmarks.

tomwhite commented 1 month ago

Thanks Ryan. Yes, it's using an SSD on a Mac mini M1.

tomwhite commented 1 month ago

I've opened #514 to track the work on this