cubed-dev / cubed

Bounded-memory serverless distributed N-dimensional array processing
https://cubed-dev.github.io/cubed/
Apache License 2.0
115 stars 14 forks source link

Docs on how to write a new executor #498

Open TomNicholas opened 1 month ago

TomNicholas commented 1 month ago

There are a lot of parallel frameworks that Cubed Plans could be converted to (e.g. #499). We have executors for dask & beam, but instead of trying to write more executors ourselves it would be nice to just more clearly document how to create a new executor so that others can contribute.

tomwhite commented 1 month ago

That's a great suggestion. I'll sketch out something here to start with.

A Cubed executor is a subclass of DagExecutor:

https://github.com/cubed-dev/cubed/blob/e7ff3651467f6a7c5ab2f87a211d3f4581e5f370/cubed/runtime/types.py#L9-L15

The execute_dag method is responsible for taking a Cubed plan - which is a DAG of operations and arrays - and turning it into sets of parallel tasks that it runs in stages. It is also responsible for issuing callbacks whenever an operation starts or a task finishes, which the client uses to update progress bars, and record statistics.

The simplest example of a Cubed executor is the SingleThreadedExecutor which runs tasks sequentially:

https://github.com/cubed-dev/cubed/blob/e7ff3651467f6a7c5ab2f87a211d3f4581e5f370/cubed/runtime/executors/local.py#L29-L58

In practice backends will have the following characteristics that make them suitable targets for Cubed:

  1. Parallel. The ability to efficiently run multiple tasks in parallel. (The task inputs are generated by the the pipeline.mappable iterator.)
  2. Code distribution. The ability to run arbitrary code in the remote process via some distribution mechanism. (The task function is pipeline.function.)
  3. Memory guarantees. The backend should guarantee that the task gets a certain (configurable) amount of memory.
  4. Retries. The backend should have some way of retrying a task if it fails.
  5. Timeouts. Tasks should fail after a certain amount of time, so they can be retried.
  6. (Optional) Straggler mitigation. Very slow tasks are detected and retried with a backup task so as to not slow down the entire computation. Most backends do not have support for this, but it can be implemented as a part of the executor.

These features (and a few more) are discussed in #276, which also has a table showing which executor has each feature.

Some of the executors in Cubed use asyncio APIs to call the backend (e.g. Modal, Dask, local threads and processes), which makes implementing some of these features easier since the code can be shared (e.g. backup tasks). However, backends do not have to offer an asyncio API to be integrated with Cubed (e.g. Lithops).

All the executor implementations can be found in cubed.runtime.executors.

For testing, new executors can be conditionally added to the ALL_EXECUTORS list - if the backend dependency is present.