cubed-dev / cubed

Bounded-memory serverless distributed N-dimensional array processing
https://cubed-dev.github.io/cubed/
Apache License 2.0
97 stars 7 forks source link

Executor feature comparison #276

Open tomwhite opened 11 months ago

tomwhite commented 11 months ago

(I wrote this to help track what works needs to be done on the executors, but it might be useful to add to the user docs at some point.)

This table shows the features that each executor supports.

Feature Python Python Async Lithops Modal Beam (Dataflow) Dask
Callbacks :white_check_mark: :white_check_mark: :white_check_mark: :white_check_mark: :white_check_mark: :white_check_mark:
Task concurrency :x: :white_check_mark: :white_check_mark: :white_check_mark: :white_check_mark: :white_check_mark:
Retries :x: :white_check_mark: :white_check_mark: :white_check_mark: :white_check_mark: 1 :white_check_mark: 2
Timeouts :x: :x: :white_check_mark: :white_check_mark: ? :x: 3
Straggler mitigation :x: :x: :white_check_mark: :white_check_mark: ? :white_check_mark:
Input batching :white_check_mark: 4 :white_check_mark: :x: :white_check_mark: N/A 5 :white_check_mark:
Resume :white_check_mark: :white_check_mark: :white_check_mark: :white_check_mark: :x: :white_check_mark:
Compute arrays in parallel :x: :white_check_mark: :white_check_mark: :white_check_mark: :white_check_mark: :white_check_mark:
Supported clouds N/A N/A AWS, GCP, and others AWS, GCP GCP AWS, GCP
Runtime memory check :x: :x: :white_check_mark: :white_check_mark: :x: :white_check_mark:

Executors

The Python executor is a very simple executor, used for running tests, and is deliberately designed not to have anything except the most basic features.

The Python async executor is also for testing, but has a few features, mostly as a way to test async features locally, without having to use Modal, which is the only other async executor.

The other executors are all designed for real workloads running at scale, so all of the features are desirable. Some are included in the platform, while others are implemented in Cubed. For example, both Lithops and Modal provide timeouts as a part of the platform, whereas of the two only Modal provides retries as a built-in feature (for Lithops we implement retries in Cubed). Neither platform provides anything for mitigating stragglers, so Cubed provides a backup tasks implementation for both.

Features

Task concurrency - can the executor run multiple tasks at once?

Input batching - for very large computations it's important that not all inputs for a given array are materialized at once, as that might lead to an out of memory situation on the client. The remedy for this is to submit the input in batches, or in a streaming fashion if the platform supports it. See #239

Resume - can an executor resume a computation that didn't complete? (This requires that the computation is pickled so it can be restarted.)

Compute arrays in parallel - are arrays computed one at a time, or in parallel? For small arrays the latter can take advantage of more parallelism if it is available and speed up computations.

Runtime memory check - does the executor make sure that your allowed_mem setting is no greater than what the runtime provides? #220

Footnotes

  1. Google Cloud Dataflow has four retry attempts.
  2. Dask added retries in 2017. See also this SO thread. There is also a Reschedule exception that serves a similar purpose.
  3. Dask doesn't seem to have task timeouts. There's a discussion about timeouts and very slow tasks here, including how to work around very slow or hanging tasks.
  4. This is only true in the sense that one task is run at a time, so it's a batch size of one.
  5. For Beam, the client submits a DAG to the service, so there is no problem with running out of memory on the client for very large arrays, thus there is no need to implement input batching.
TomNicholas commented 11 months ago

This is very helpful.

For the Coiled Functions executor #260 I think everything is the same as the Dask column except that Callbacks have been implemented. Adding a runtime memory check should be straightforward too.

tomwhite commented 11 months ago

I've been looking at the Dask executor today, and I think using the distributed.Client.map API may make it a lot easier to implement the missing features in the table. (A very minor downside is that you can't use the Dask local scheduler, but we have the local Python executors for that.)

Here's a prototype AsyncDaskDistributedExecutor that does this. Since it uses asyncio, I was able to copy the Modal implementation for backups fairly easily. I think adding compute arrays in parallel, and input batching, would both be very similar to the Modal implementation too. The only missing feature would be timeouts, but I think with backups that's less important.

As far as I can tell Coiled Functions don't have an asyncio version - but perhaps the futures that it returns can be used in an asyncio context, in which case we'd be able to share a lot of code.

TomNicholas commented 11 months ago

Nice! If we have an AsyncDaskDistributedExecutor is there any reason to keep the DaskDelayedExecutor?

tomwhite commented 11 months ago

Probably not.

tomwhite commented 11 months ago

As far as I can tell Coiled Functions don't have an asyncio version - but perhaps the futures that it returns can be used in an asyncio context, in which case we'd be able to share a lot of code.

Reading the docs, it says that .submit() will return a Dask Future - so we should be able to use everything from AsyncDaskDistributedExecutor.

tomwhite commented 10 months ago

291 added batching for Python Async, Modal, and Dask.