risingwavelabs / risingwave

SQL stream processing, analytics, and management. We decouple storage and compute to offer efficient joins, instant failover, dynamic scaling, speedy bootstrapping, and concurrent query serving.
https://www.risingwave.com/slack
Apache License 2.0
6.62k stars 543 forks source link

Tracking: support partial checkpoint #14041

Open wenym1 opened 7 months ago

wenym1 commented 7 months ago

Original rfc: https://github.com/risingwavelabs/rfcs/pull/84

This rfc involves changes in 3 layers: batch query, streaming job and hummock storage. We are going to implement the whole rfc from bottom layer to upper layer step by step. Since the current global checkpoint is a special case of partial checkpoint, though the some features of partial checkpoint will have been implemented in bottom layer, we can keep the same global checkpoint logic in upper layer before we have implemented logic in upper layer.

After the partial checkpoint is supported, we can then work on partial recovery to isolate the failure from different MVs.

Following are some changes in each layer

hummock Storage

First we will refactor the current code to implement part of the features required by partial checkpoint while remaining the same as current logic. This includes the following:

Meanwhile, we can develop L0 as a log so that we can reuse the data of MV.

streaming job

First we will implement new streaming executor similar to a source executor that consume the logs of upstream MV.

Second we will implement a partial checkpoint manager that comprehend the streaming graph and collect the barriers reported from each MV parallelism and trigger partial checkpoint.

batch query

By now the batch query will use the global max committed epoch as the query epoch. We can implement different query consistency for batch query. Different query consistency means the policy to choose the query epoch of different state table. The default one is to use the global max committed epoch.

partial recovery

First we can support the failure isolation among MVs that are not connected.

Second we can support failure isolation between upstream and downstream MVs .

Development progress:

kwannoel commented 3 months ago

batch query

By now the batch query will use the global max committed epoch as the query epoch. We can implement different query consistency for batch query. Different query consistency means the policy to choose the query epoch of different state table. The default one is to use the global max committed epoch.

For batch query can we use global checkpoint of created stream jobs i.e. excluding those stream jobs which are in creating process. Such that stream jobs being created will not affect freshness of batch query.