feldera / dist-design

Design documents for distributed DBSP
6 stars 1 forks source link

Distributed DBSP design

This document proposes a design for supporting the following features in DBSP:

The following features are out of scope:

We aim to satisfy the following base requirements:

Distribution of computation

DBSP implements computation in the form of a circuit. A circuit runs in a single thread, passing data from "input handles" through "operators" to "output handles":

[Figure 1]

         ┌────────────┐   ┌────────────┐   ┌────────────┐
input───>│ operator 1 │──>│ operator 2 │──>│ operator 3 │───>output
         └────────────┘   └────────────┘   └────────────┘

To run a computation across multiple worker threads, DBSP instantiates multiple copies of the circuit, all identical. Each copy performs the same computation in parallel across a disjoint part of the data. In a multi-worker computation, DBSP input handles automatically spread the input evenly across the workers, and DBSP output handles allow for merging the output from multiple workers for the most common kinds of data.

For multi-worker computation, operators require individual care. Some operators, such as filter and map, work unchanged multi-worker. Others require data to be distributed appropriately across workers; for example, equi-join requires data with equal keys to be processed in the same worker. These operators internally use a special "exchange" operator to "re-shard" data across workers, using a hash function to obtain an equal distribution of keys. The diagram below shows data being re-sharded before operators 2 and 3 execute:

[Figure 2]

         ┌────────────┐   ┌────────────┐   ┌────────────┐
      ┌─>│ operator 1 │──>│ operator 2 │──>│ operator 3 │───>output
      │  └────────────┘\ /└────────────┘\ /└────────────┘
input─┤                 X                X
      │  ┌────────────┐/ \┌────────────┐/ \┌────────────┐
      └─>│ operator 1 │──>│ operator 2 │──>│ operator 3 │───>output
         └────────────┘   └────────────┘   └────────────┘

To extend multi-worker computation to multi-host computation, we instantiate the circuit multiple times on different hosts. This simply requires extending the "exchange" operator to work across hosts using RPC:

[Figure 3]

         ╔══host 1═══════════════════════════════════╗
         ║   ┌────────────┐           ┌────────────┐ ║
      ┌─────>│ operator 1 │──┐     ┌─>│ operator 2 │───>output
      │  ║   └────────────┘  │     │  └────────────┘ ║
      │  ║                   ├─┐ ┌─┤                 ║
      │  ║   ┌────────────┐  │ │ │ │  ┌────────────┐ ║
      ├─────>│ operator 1 │──┘ │ │ └─>│ operator 2 │───>output
      │  ║   └────────────┘    │ │    └────────────┘ ║
      │  ╚═════════════════════│ │═══════════════════╝
input─┤                        ├─┤
      │  ╔══host 2═════════════│ │═══════════════════╗
      │  ║   ┌────────────┐    │ │    ┌────────────┐ ║
      ├─────>│ operator 1 │──┐ │ │ ┌─>│ operator 2 │───>output
      │  ║   └────────────┘  │ │ │ │  └────────────┘ ║
      │  ║                   ├─┘ └─┤                 ║
      │  ║   ┌────────────┐  │     │  ┌────────────┐ ║
      └─────>│ operator 1 │──┘     └─>│ operator 2 │───>output
         ║   └────────────┘           └────────────┘ ║
         ╚═══════════════════════════════════════════╝

Fault tolerance

For DBSP, fault tolerance means that the system can recover from the crash of one or more workers while providing "exactly once" semantics. We implement fault tolerance by storing state in a database that is periodically checkpointed. To recover, all of the workers load the most recent checkpoint and replay the input starting from that point, discarding any duplicate output that this produces. Graceful shutdown and restart is a special case of crash recovery where there is no output beyond the checkpoint and thus no input to be replayed (although there might be unprocessed input).

This form of fault tolerance has the following requirements, each of which may be implemented in a variety of fashions:

Database

Each worker maintains its circuit's state in a database. We describe the database as per-worker. It could also be implemented as a single distributed database.

Running the circuit queries and updates each database. Occasionally, a coordinator tells all of the workers to commit their databases. If all of them succeed, this forms a checkpoint that becomes the new basis for recovery.

This section specifies the abstract interface that the database must implement. It is a key-value database that must support two versions. Let Version be a u64 type representing a database version serial number:

A database implementation presents the following interface to workers:

Input

A step is one full execution of the distributed circuit. We number steps sequentially as Step (an alias for u64). A step includes the circuit's input and output during that step.

To execute a step in the distributed circuit, all of the workers need to obtain input for the step. To ensure that recovery always produces the same output as previous runs, the input for a given step must be fixed. That means that input must be logged durably.

The division of input between steps must also be durable (see Input/output synchronization).

Input only need be durable until its step has become committed to the earliest version in its local database.

Consider a worker to have input for steps first..last. This might be an empty range if no new input has arrived since the earliest version of the local database.

An input implementation presents the following interface to workers:

Output

Each worker needs to produce output for each step. DBSP can reproduce output for steps since the earliest version in any of the workers' databases.

The need for output durability depends on the requirements of the systems that consume DBSP's output. Those systems can also apply their own expiration or deletion policies to output.

Consider a worker to have output for steps first..last, which might be an empty range.

An output implementation presents the following interface to workers:

If convenient, the output implementation can expose step numbering to the clients consuming the output.

Worker

A worker presents the following interface to the coordinator:

Coordinator

In a loop:

Synchronization

Input/output synchronization

Input mentioned that the division of input between steps must be durable. That is, a crash must not move input from one step to another, even though such a change would ordinarily not change the integral of the distributed circuit output following the steps that changed. This is because output may already have been produced for one (or more, or even all) workers for steps being replayed, and moving input from one step to another would potentially require some of that already-produced output to change, which cannot be done.

In practice, making the division of input into steps durable seems to require recording it. Furthermore, for a given step, the complete division of input must become durable before or at the same time as any part of the step's output (otherwise, we could have output that we don't know how to reproduce). There are a few places we could record the division:

Coordinator/worker synchronization

We need some synchronization to allow all the workers to pause at the same step without blocking. This isn't a big design consideration.

Coordinator/worker synchronization details Here's one possible approach. Give each worker a cancellation token `token`, something like [`tokio_util::sync::CancellationToken`][1], plus a `state`: ``` enum State { Run, Pause, Paused(Step), Stop(Step), Stopped, } ``` `start()` sets its local variable `state` to `Run` when it starts and to `Stopped` just before it returns. At the top of its internal loop, instead of just getting input, it consults `state`: * If `state` is `Run`, which is the common case, gets input by calling `read(step, &token)`. * If `state` is `Pause`, sets it to `Paused(step)` and blocks until it becomes `Stop(_)`. * If `state` is `Stop(x)`: - If `step < x`, gets input by calling `read(step, &token)`. - If `step == x`, breaks out of the loop. - If `step > x`, fatal error. Refine the worker interface with: * `fn pause() -> Step` Sets `state` to `Pause`. Cancels `token`. Blocks until `state` becomes `Paused(step)` and returns `step`. * `fn stop(step: Step)` Sets `state` to `Stop(step)`. Blocks until `state` becomes `Stopped`. Refine the coordinator by, instead of just calling `stop()` on each worker: - Call `pause()` on each worker and take `step` as the maximum return value. - Call `stop(step)` on each worker. Refine the input interface by making `read()` take the cancellation token and block until input arrives or `token` is cancelled, whichever comes first.

Scale in and out

The computational resources in our distributed system are hosts and workers. Distributed DBSP should support "scale out" to increase workers and hosts and "scale in" to decrease them. For performance, input and state should be divided more or less evenly across workers regardless of the current scale.

For input/output synchronization, scale must be fixed at a given checkpoint. That is, if a checkpoint requires recovery, then recovery must take place with the same scale that was initially used. Thus, the state database must include scale information, and changing scale requires committing a checkpoint then updating the scale data without processing any input and immediately committing again.

💡 The final design approach listed under input/output synchronization would allow recovery at different scale.

Scaling input

Input provides input to each worker. As the number of workers increases or decreases, it must adapt to provide data to the new workers. The distribution of data among workers does not affect correctness, because DBSP does not assume anything about the distribution of input. For performance, data should be fairly well balanced among workers.

The distribution of data among workers only affects performance up to the first time it is re-sharded with the "exchange" operator. SQL programs are likely to re-shard data early on, so input scaling might affect performance in only a limited way. We do not want it to be a bottleneck, but it also seems unwise to focus much effort on it.

We currently expect DBSP input to arrive using the Kafka protocol. Kafka allows data to be divided into several partitions. The number of partitions is essentially fixed (the admin can increase it but not decrease it). Each worker reads data from some number of partitions. As scale changes, the assignment also changes to balance input across workers.

Scaling state

When the number of workers changes, DBSP must also change how the "exchange" operator (described in Distribution of computation) re-shards data among workers. Exchange occurs entirely inside DBSP, so the restrictions of, e.g., Kafka do not constrain it.

However, computations that use an exchange operator also maintain state associated with keys, which must reside in the same worker as the key. When scaling adjust exchanges so that a given key is sent to a different worker, the state associated with that key must also be moved. To speed up scaling operations, we want to move as little state as possible.

💡 If DBSP maintains state in a distributed database, instead of a per-worker local database, then DBSP itself doesn't need to explicitly move state data around, but the database still needs to do it.

DBSP currently divides data in exchange by hashing each key and taking hash % W, where W is the number of workers, as a worker index. This is a worst case for moving state: when the number of workers increases from W to W+1, W/(W+1) of the state moves.

Some better approaches Let `S` be the number of "shards" for the exchange operator to divide data into. Then the following algorithms seem like reasonable ones: * Fixed sharding: Choose a relatively large number `S`, e.g. 1024, of "shards", numbered `0..S`. Initially, for `W` workers, assign approximately `S/W` shards to each worker, e.g. `0..333`, `333..666`, and `666..1000` for `S = 1000` and `W = 3`. To add a new worker, assign it approximately `1/S` shards, taking them from the existing workers so that the new division is approximately even, e.g. a fourth worker might receive shards `0..83`, `333..416`, and `666..750`. This algorithm reassigns a minimum amount of traffic when `W` changes. As `W` approaches `S`, it divides data more and more unevenly. It requires a `S`-sized array to express the assignments. This is essentially the same as the Kafka partitioning algorithm, but we can afford for `S` to be larger than `P`. * Dynamic sharding: Choose `0 < k < 1` as the maximum inaccuracy in sharding to tolerate, e.g. `k = 0.1` for 10% inaccuracy. Let the number of shards `S` be `(W/k).next_power_of_two()`. Assign approximately `S/W` shards to each worker. To increase or decrease the number of workers by 1, first recalculate `S`. If this doubles `S`, then double the bounds of every existing shard slice, so that, e.g., `333..416` becomes `666..832`. If this halves `S`, then halve the bounds (and adjust assignments, if necessary, for a minimum amount of disruption). After adjusting `S`, assign workers to shards such that the division is approximately even. With this algorithm, shard data using the shard assigned to the data hash's `S.ilog2()` most-significant bits. This algorithm reassigns a minimum amount of traffic when `W` changes. It requires a `S`-sized array to express the assignments (but the array size adapts appropriately for `W` and `k`). Fixed partitioning requires one to choose `S` appropriately, which might be hard. Dynamic partitioning seems like a good approach. It might take some work to implement it exactly correctly (it is possibly a novel algorithm), so it might not be worth implementing initially. The choice of algorithm only matters for adding or removing a few hosts at a time. If the number of workers changes by more than 2× up or down, then most of the data needs to move anyway, which means that the choice of algorithm does not matter.

Layout changes

We call the assignment of partitions and shards to workers, and workers to hosts a "layout". Scale-in and scale-out are both examples of layout changes.

Given fault tolerance, we can change the layout in a "cold" fashion by:

  1. Terminate DBSP workers for the old layout.
  2. Redistribute state as necessary.
  3. Start DBSP workers with the new layout.

We can add or remove hosts in a "hot" fashion by:

  1. Start the hosts to be added, if any.
  2. Start copying state as necessary, while the circuit continues executing.
  3. Stop the workers that are to be removed, initialize the workers that are to be added, and pause the others.
  4. Finish copying the state.
  5. Update the assignment of partitions and shards to workers.
  6. Start the workers to be added, and un-pause the others.
  7. Stop the hosts to be removed.

Rolling upgrade

It's desirable to be able to upgrade the DBSP software without disrupting an ongoing computation. This section describes two approaches.

Rolling upgrade does not support changing the computation or the dataflow graph that implements it. This procedure is for upgrades that, e.g., fix a security vulnerability.

Via scale-in and scale-out

Rolling upgrade of DBSP can leverage scale-out and scale-in as primitive operations. This approach works if a spare host is available for the upgrade:

  1. Add a new host to the computation, using the new version of DBSP.
  2. Remove a host using the old version of DBSP.
  3. If any hosts remain using the old version, start over from step 1.

If no spare host is available, swap steps 1 and 2.

Via layout change

Iterative rolling upgrade, as described in the previous section, moves all DBSP state at least twice: step 1 moves 1/n of the data to the added host, step 2 moves 1/n of the data from the removed host, and the process is repeated n times.

However, the layout change process is more general. In a single step, one can add host A and remove host B and ensure that the data assigned to B is now assigned to A, so that only 1/n of the data needs to move. Over n iteration, only half as much data moves. One could also do a single step that removes all of the old hosts and adds all of the new ones.

Furthermore, if a given host has enough memory and other resources, one could start the new DBSP processes on the same hosts as the old ones and use a layout change that avoids moving any data at all.