risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.78k stars 561 forks source link

Problem of online scaling for source backfill #18300

Closed xxchan closed 4 days ago

xxchan commented 2 weeks ago

Problem

Currently, SourceBackfill has a hacky stuff: It needs to scan() the whole state table, including splits written by other actors.

https://github.com/risingwavelabs/risingwave/blob/29db1d95c416ea14c8ebb39ba74857d01c631d34/src/stream/src/executor/source/source_backfill_executor.rs#L627-L642

The reason is to handle split migration (i.e., online scaling). SourceBackfill has 2 stages (backfill -> forward upstream). After it entered stage 2, it cannot go back to stage 1. So if an unfinished backfill work is migrated to an actor in stage 2, it cannot do backfilling.

However, the hack doesn't work correctly now, as shown in https://github.com/risingwavelabs/risingwave/pull/18033#discussion_r1716245495. It's because at the beginning, the actor will only read the state written by itself. It needs to wait until it can read all actors' written data. i.e., wait for the first checkpoint has been available.

Note1: checkpoint is async, so we cannot rely on sth like "wait for N barriers to pass". Note2: an actor doesn't know the total number of splits, so we cannot rely on the condition like "states.len() == num_splits". This is unlike WatermarkFilter executor, which has a similar hack, but it relies on "all vnodes are written". However, source splits are not distributed by vnodes.

Solution

There are several solutions to fix this bug:

  1. Patch the hack: Wait until the "inited" state. We can use try_wait_epoch.
  2. Rewrite the code to allow transition between stage 1 and 2: I don't think there's any technical restrictions to prevent us doing this. The reason why we didn't do it in the first place might just be an overlook. (But single direction transition looks slightly more natural though).
  3. Disallow online scaling: Then the problem will disappear! This is inspired by @BugenZhao: MV backfilling actually is in a similar situation. It also goes from stage 1 to 2 in one direction. And currently we disallow online scaling for it. So we can do the same thing for Source backfilling.

    More precisely,

    • For foreground (blocking) DDL, scaling will be rejected. If cluster is down, the backfill will fail.
    • For background DDL, scaling is done by rebuilding the actors. (So the new actor can start from stage 1. But the code doesn't need to handle stage 2 -> 1)

    More on this: https://risingwave-labs.slack.com/archives/C05AZUR6Q5P/p1724227403344349

I prefer solution 3 because it can remove the hack and make the logic simpler to understand. In the long term, if we want to allow online scaling for foreground DDL, solution 2 might be the best (both for Source and MV). But it will need larger effort to implement.

_edit: Now we did both 1 & 3. 3 is automatically done after we do blocking DDL, because of reschedule_lock. But we found it's not enough and still need 1. See #18112_

edit: Imagine we don't have blocking DDL and don't track progress, it seems there's no way to disallow online scaling. So either 1 or 2 must be needed.

A little more

Relationship of the split migration problem with #18338 (blocking DDL) and #18299 (Finish backfill faster):

Blocking DDL requires finishing backfill faster for better UX. And if we finish backfill faster, the chances we need to handle split migration is lower..

Previously we only considered background backfill, and the backfill can be blocked for a long time, and we have to handle split migration gracefully. (But we forgot about the idea of rebuilding actors.)

xxchan commented 1 week ago

I think there's no special code change needed to disallow online scaling for blocking DDL on source.

The existing mechanism is to grab reschedule_lock when creating streaming job. For blocking DDL, create_streaming_job will not return until backfill finished, so scaling is naturally prevented when we have blocking DDL for source.

https://github.com/risingwavelabs/risingwave/blob/602c6adccaf6de63b430bfdfce045653172a015d/src/meta/src/rpc/ddl_controller_v2.rs#L82

fuyufjh commented 1 week ago

Does this problem exist for a normal SourceExecutor? It also has to handle split migration (i.e., online scaling), and it stores offset in its state table.

xxchan commented 1 week ago

No, because SourceExecutor is a single loop. It doesn't need to transition between stages (Backfilling -> Finished).

fuyufjh commented 1 week ago

I think the transition between stages (Backfilling -> Finished) doesn't contribute to the problem here 🤔

Considering a general stateful executor such as HashJoinExecutor, it will not read stale data because we do 3 stages during scale-out: 1) pause the entire graph and ensure all barriers are finished 2) manipulate the actors 3) resume barriers. On step 3, we can ensure that the executor can read the latest checkpoint, otherwise, the correctness is broken (for every stateful executors).

xxchan commented 1 week ago

See this PR for more https://github.com/risingwavelabs/risingwave/pull/18112