vmware-archive / database-stream-processor

Streaming and Incremental Computation Framework
Other
225 stars 20 forks source link

RFC: Parallelizing circuits. #23

Open ryzhyk opened 2 years ago

ryzhyk commented 2 years ago

We discuss strategies for parallelizing the execution of DBSP circuits.

NOTE: This is orthogonal to the question of speeding up individual operators using SIMD instructions or GPUs.

The high-level goal is to shard processing across multiple worker threads while minimizing bottlenecks due to thread communication and synchronization. As a starting point for the design, the DD architecture where each thread runs its own copy of the circuit seems suitable. These copies happen to have identical topologies, i.e., contain the same operators and subcircuits connected in the same way (not strictly required, but I have a hard time imagining why we would want something else).

Fragments of the circuit may execute locally without any inter-thread communication, e.g., assuming input tables have been sharded, a sequence of linear operators, including group_by and linear aggregates, can execute locally with results from multiple workers assembled at the end of the pipeline.

Other operators, most noticeably joins, may require communication, but even that can sometimes be avoided, e.g., R1(x,y), R2(y,z) can be evaluated locally assuming R1 and R2 are sharded so that records that share the same value of y are produced by the same worker. With DDlog we've seen workloads that in theory could have been partitioned based on some attribute with the bulk of processing occurring without cross-worker communication. But of course we couldn't take advantage of this when running on top of DD.

We therefore do not want to automatically shard all operators or all streams. Instead, sharding decisions should be left to the compiler or programmer. A modular way to achieve this is to encapsulate sharding inside special Exchange operators. Such an operator reads locally computed values from its input stream, distributes them across its peers in other workers based on a sharding function, and outputs the set of values received from peers:

                                                 Exchange
                                                 ┌───────┐
            ┌───────┐                            │       │
            │ op 1  ├──────────────┐             │       │
            └───────┘              ▲             │       │
                               ┌───────┐         │       │          ┌───────┐
                               │  op 3 ├─────────┼─┬──┬──┼─────────►│ op 4  │
                               └───────┘         │ │  │  │          └───────┘
            ┌───────┐              ▲             │ │  │  │
            │ op 2  ├──────────────┘             │ │  │  │
            └───────┘                            │ │  │  │
WORKER 1                                         │ │  │  │
─────────────────────────────────────────────────┼─┼──┼──┼──────────────────────────
WORKER 2                                         │ │  │  │
            ┌───────┐                            │ │  │  │
            │ op 1  ├──────────────┐             │ │  │  │
            └───────┘              ▼             │ │  │  │
                               ┌───────┐         │ │  │  │          ┌───────┐
                               │  op 3 ├─────────┼─┴──┴──┼─────────►│ op 4  │
                               └───────┘         │       │          └───────┘
            ┌───────┐              ▲             │       │
            │ op 2  ├──────────────┘             │       │
            └───────┘                            └───────┘

An interesting question is: how are Exchange operators scheduled? The simplest approach is to stick to static scheduling. When an Exchange operator is evaluated, it distributes its inputs across peer mailboxes and then blocks waiting for all peers to put values in its incoming mailboxes. This means that the whole pipeline will run at the speed of the slowest worker.

As an optimization, we can implement communication-aware static scheduling: (1) split Exchange operators into the input half that writes data to peer mailboxes and output half that collects data from peers. We compute a static schedule that tries to evaluate all inputs halves as early as possible and all output halves as late as possible to give stragglers time to catch up and reduce waiting times.

Finding an optimal static schedule can be hard or impossible. Therefore, the next option is to implement a dynamic scheduler that schedules the output halves of Exchange operators as they become ready. This will require a new mechanism for an operator to inform the scheduler about its status. Even with this design, synchronization will be a bottleneck for some workloads.

We therefore consider a more radical solution that takes advantage of incremental computation to reduce data dependencies between workers. Rather than waiting for inputs from all peers to become available, Exchange operators can be placed in an iterative scope. At each nested clock cycle, an Exchange operator yields new inputs received from its peers. The circuit processes these inputs before reading more inputs at the next iteration. Iteration continues until all data has been sent and received.

We don't need to choose one of the above options. There's enough modularity in the design of the framework to support all or some of them. We can start with implementing a simple (non-iterative) exchange operator and using it with a static scheduler. As the next step, we add dynamic scheduling and see how much difference it makes in benchmarks. We can experiment with iterative Exchange operators if we still see bottlenecks.

ryzhyk commented 2 years ago

CC: @mbudiu-vmw , @Kixiron

mihaibudiu commented 2 years ago

The functionality of the exchange operator can be implemented by the partitioning z-set operator. So what you need is actually a new discipline of scheduling, which uses multiple cores. There is another detail, which is about clocks: currently DBSP assumes that clocks are completely synchronized: an input delta triggers exactly one computation in each operator (modulo cycles). You are proposing to implement circuits with different and perhaps independent clocks. This is something that we should model theoretically first. But it may be possible to encapsulate this very cleanly. My hope is that we can have a completely separate data-plane (e.g., exchange) and control-plane (scheduler) and mix and match choices. Similar to operators that create cycles, we could have operators that create new subclocks.

Kixiron commented 2 years ago

As a note, prioritizing the front of the dataflow graph isn't always a good thing as shown in Faucet, it can force intermediate buffers to be overloaded and can thrash latency.

On exchanging, what are the problems with using hash-sharding like timely currently does, exchanging data based off of its hash? I'm not really sure what you mean by "scheduling exchange operators", wouldn't they just be another node within the dataflow?

ryzhyk commented 2 years ago

The functionality of the exchange operator can be implemented by the partitioning z-set operator. So what you need is actually a new discipline of scheduling, which uses multiple cores.

Yep, this is what I was trying to do in this RFC.

There is another detail, which is about clocks: currently DBSP assumes that clocks are completely synchronized: an input delta triggers exactly one computation in each operator (modulo cycles). You are proposing to implement circuits with different and perhaps independent clocks. This is something that we should model theoretically first. But it may be possible to encapsulate this very cleanly.

They are not independent clocks, just nested clocks. Yes, the idea is that we can apply the incrementalization theory to make it work. For example, I am hoping that we won't need double nested circuits, instead we fold the inner incrementalization (driven by changes received from peers) and recursion in one flat circuit. Sure, we need to work out the theory of this.

Also, it is not completely obvious that we even need this form of incrementality. Waiting for all remote inputs may work well in practice.

My hope is that we can have a completely separate data-plane (e.g., exchange) and control-plane (scheduler) and mix and match choices. Similar to operators that create cycles, we could have operators that create new subclocks.

I agree.

ryzhyk commented 2 years ago

As a note, prioritizing the front of the dataflow graph isn't always a good thing as shown in Faucet, it can force intermediate buffers to be overloaded and can thrash latency.

Can you clarify? What do you mean by prioritizing the front of the graph?

Overloaded buffers should not be a thing here, since all buffers have the size of exactly one with synchronous circuits.

On exchanging, what are the problems with using hash-sharding like timely currently does, exchanging data based off of its hash?

That's probably how we'll do it. My point was that we don't need to rehash the data after each operator. A smart compiler (possibly driven by user annotations) can arrange things so that most of the work is done purely locally while only occasionally exchanging data with peers.

I'm not really sure what you mean by "scheduling exchange operators", wouldn't they just be another node within the dataflow?

It will probably be two nodes. The input node consumes locally produced data and sends it to peers. The output node outputs data received from peers. Plus a scheduling constraint, expressed as a virtual stream connecting input and output, that input must be scheduled first. Given this setup, they are indeed just regular operators, but unlike most other operators these can block (or if we don't want blocking operators then we could say that they may not be ready to produce data). E.g., the output operator is not ready until it has received inputs from all its peers.

All this means that we want a smarter scheduler that (a) schedules the input halves early to make data available to remote peers, (b) does as much local work as possible before scheduling the output halves to give peers more time to produce results, (c) dynamically schedules outputs that are ready to produce data (hence we need to add some kind of notification from the node to the scheduler).