risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.79k stars 562 forks source link

Support configuration change in a single barrier #18312

Open wenym1 opened 2 weeks ago

wenym1 commented 2 weeks ago

Motivation

Currently, when doing configuration change, such as scaling, replace table (including sink into table), we follow the following steps:

  1. inject a pause barrier, let's say epoch1, and after the barrier is collected and committed, call try_wait_epoch(epoch1) explicitly for each CN
  2. inject a configuration change barrier, let's say epoch2, to apply the configuration. For newly created stateful executors, they call StateTable::init on epoch2, for existing stateful executors, they call update_vnode_bitmap.
  3. inject a resume barrier to resume the stream

The reason for this design is mainly for data synchronization, which means, each newly created actors, or actor that is assigned with new vnode, can see the latest data of the vnodes it owns. The mechanism works, because we call try_wait_epoch(epoch1) in each CN, so that all created actors and any actor created later can at least read all data at epoch1, and since we have paused the stream after epoch1, data is the same in epoch1 and epoch2, and when configuration change happens in epoch2, all actors can read all latest data.

Though the current mechanism works, it has many drawbacks:

  1. We pause the stream explicitly, and during which concurrent barriers become useless
  2. The correctness depends on a too large scope. First, it depends on all executors to handle the pause barrier correctly, to ensure that there is actually no data between epoch1 and epoch2. Second, it depends on correctly calling try_wait_epoch. Sometimes, for example https://github.com/risingwavelabs/risingwave/issues/17602, the correctness implicitly depends on calling try_wait_epoch, but we didn't do it, and the error is either hard to reproduced, or happens in somewhere far away in downstream, which cost many efforts to debug.

Since the main purpose of this mechanism is for data synchronization, we'd better narrow the scope that the correctness depends on. In solely the implementation of StateTable, we can ensure that data are always ready when StateTable::Init and StateTable::update_vnode_bitmap, without any implicit dependency on the correct behavior of higher level streaming executor or the streaming runtime.

Proposal

As discussed offline with @BugenZhao and @st1page , we have the following rough proposal.

When we call StateTable::Init and StateTable::update_vnode_bitmap, the state table itself will wait the committed epoch of the table id to bump up before return, and after the methods return, the state table is ready to serve the read request.

In the previous development of configuration change, @BugenZhao followed a similar implementation to this proposal, but encountered deadlock. The deadlock happened because, we started waiting for committed epoch to bump up before yielding the barrier, and committed epoch bumping up depends on collecting all barriers.

Similar deadlock is very likely to happen, and require careful handling the order of waiting for epoch and yielding barrier. In general, we need to following the following order:

  1. receive barrier
  2. commit state table if not first barrier
  3. yield barrier
  4. wait epoch in either init or update_vnode_bitmap.

Following this order, similar deadlock won't happen. The correctness depends on an assumption that, in the perspective of a single executor, after yielding a checkpoint barrier, the committed epoch must be able to bump up, without dependency on the executor doing any other things, so that it can safely wait for the committed epoch to bump up eventually.

Though we need careful handling the logic in all stateful executors, the correctness can be ensured in CI as long as all executors are covered in CI.

After this proposal, we can do configuration change in a single barrier without the need to pause and resume the stream.

wenym1 commented 2 weeks ago

cc @hzxa21 @fuyufjh @shanicky

fuyufjh commented 2 weeks ago

Sounds good to me.

If I understand correctly, in this design, there is still a short period of "stop-the-world", which is after an actor receives the Mutation barrier, and before the Mutation finally checkpointed in Meta. Theoretically, it will reduce the duration of "stop-the-world" to half, rather than eliminating it entirely.

fuyufjh commented 2 weeks ago

By the way, both @shanicky and @BugenZhao proposed to abandon Mutation barrier in scaling out/int process, but instead trigger a global recovery. This approach can significantly simplify the implementation, with the cost of dropping the operator cache, which seems to be less useful than we thought - According to our experience, most cases didn't get visible performance drops when doing scaling.

lmatz commented 2 weeks ago

most cases didn't get visible performance drops when doing scaling.

Just provide one data point:

during a recent POC for one customer who does business in streaming media, with 30+ CPU 100+ GB compute node, scaling up/down (although not scaling out/in as mentioned here) leads to a significant performance drop, e.g. high barrier latency, whenever it triggers remote fetch from S3.

Backfilling the entire operator cache took more than 2 hours after scaling up and clearing the cache. During these 2 hours, the barrier latency can spike to 1~3min from time to time.

That being said, it seems only deploying a memory caching layer such as Alluxio on the Cloud (optional for latency-sensitive customers only) can mitigate the performance drops for both scaling up/down and scaling out/in cases. While mutation only works for the scaling out/in cases.