risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://www.risingwave.com/slack
Apache License 2.0
6.73k stars 555 forks source link

Tracking: online scaling in compute node #3750

Open BugenZhao opened 2 years ago

BugenZhao commented 2 years ago

To support scaling in our system, we decide to generally follow the design in Re-Introduce Configuration Change based on Pause Barrier. After consistent hash has been utilized in most of the critical places in our system (#3543), it's high time we start doing this!

This task can be roughly divided into several steps below.

jon-chuang commented 2 years ago

Support connecting or disconnecting an actor from the upstream and downstream of the graph.

Isn't this already covered by AddOutput and Stop?

BugenZhao commented 2 years ago

Support connecting or disconnecting an actor from the upstream and downstream of the graph.

Isn't this already covered by AddOutput and Stop?

Exactly. However, they're only used for creating and dropping materialized views. We should reuse them to support creating and dropping parallel units.

jon-chuang commented 2 years ago

What should the interface be for this? I guess you are referring to having a high-level interface in meta like:

// in meta/..
fragment.add_parallel_units(par_unit_ids: &[usize]) // -> create_actors, delete_actors, add outputs to upstream, stop prev outputs from upstream
fragment.remove_parallel_units(par_unit_ids: &[usize])

Currently, we need to wait until new actors are created, before we can send AddOutput barrier messages? Or is there a form of synchronization for this in the LocalStreamManager?

I guess for the time being, whenever there are new compute nodes added, we can scale up to the new parallel units. We can have a more fine-grained per-fragment control of parallel units in the future once we decide on a scaling and placement policy.


Furthermore, what should be the behaviour for stateful operators? Should it always replace all of the current actors? Or should we allow existing actors to continue?

In the latter case, we need an `UpdateVnodes` barrier message, which if it matches the actor ID of the executor, will trigger update of vnodes of the state table. The state table will need to flush its previous state, ensuring `!state_table.is_dirty()`. Some of the keys (for scale up) may no longer be relevant to the node. We could rely on LRU to evict these unused keys or explicitly evict them by iterating through all the keys in the application cache and evict the ones whose vnodes are in the removed_vnodes set. After resume barrier, we can make use of the new vnodes in the state table.

I guess for the time being we can stop all actors and create new ones for the entire fragment. It's unproven whether its necessary to reuse existing actors. Actor startup should be fast and cache can be populated easily from block cache.

BugenZhao commented 2 years ago

I guess you are referring to having a high-level interface in meta like:

Exactly.

Currently, we need to wait until new actors are created, before we can send AddOutput barrier messages?

Yep. This is very similar to creating materialized views. Currently, we first build the actors on the worker nodes and then issue a command to the global barrier manager. Therefore, the consistency is kept.

whenever there are new compute nodes added, we can scale up to the new parallel units

Yes. This will be included in the step "Utilize parallel units of newly joined compute nodes."

Should it always replace all of the current actors? Or should we allow existing actors to continue?

In the original design by @fuyufjh, we will drop all of the current actors for simplicity. However, after we've unified the state interface with StateTable, updating the partition info (vnodes) can also be simple so I think we can reuse the existing actors.

Actor startup should be fast and cache can be populated easily from block cache.

Not sure how much it will cost, while keeping the cache in the original actors can always be better.

fuyufjh commented 2 years ago

Should it always replace all of the current actors? Or should we allow existing actors to continue?

In the original design by @fuyufjh, we will drop all of the current actors for simplicity. However, after we've unified the state interface with StateTable, updating the partition info (vnodes) can also be simple so I think we can reuse the existing actors.

Agree. Actually, my initial design is to reuse the existing actor and ignore the data that is not owned by it anymore, which will not affect anything and will be evicted out soon or later in theory. But please feel free to simplify the design.