risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://www.risingwave.com/slack
Apache License 2.0
6.75k stars 557 forks source link

feat(state store): fetch last committed watermark #17741

Closed stdrc closed 1 month ago

stdrc commented 1 month ago

As discussed in https://github.com/risingwavelabs/risingwave/issues/17711, we found that things can be a lot easier if StateTable can persist watermark (seems already done as we can query it after #15344) and restore it on recovery.

What's more, with the committed watermark information in StateTable, executors no longer need to maintain their own (inaccurate) table_cleaning_watermark state, and they can just blindly write any changes to state table and the latter will help ignore any changes that are below watermark.

This can actually gives executors a new coherent view of states, that is, state tables now logically hold all intermediate data generated by executors, and watermarks received on executor level are just hints for the storage to clean states. Executors don't need to care about whether update_watermark will really delete entries or not.

What do you think? cc @wenym1

wenym1 commented 1 month ago

What's more, with the committed watermark information in StateTable, executors no longer need to maintain their own (inaccurate) table_cleaning_watermark state

Any reason why it's inaccurate, or why getting the per-vnode table watermark can be more accurate than reading the state table?

I think the in terms of visibility or freshness of watermark from other parallelisms, writing to a state table should be the same as writing a new per-vnode watermark.

stdrc commented 1 month ago
  1. The watermark maintained inside executor runtime variables cannot be synced with StateTabke.state_cleaning_watermark and HummockReadVersion.table_watermarks, because executors don't know whether table.update_watermark take effect or not.
  2. Executor cannot persist the watermark, so after recovery, there may be a period when there's no watermark being aware of by the executor. In this period, executor doesn't know whether it can write changes (may come from other input side than the watermark side) to state table.
stdrc commented 1 month ago

If we can simply table.get_watermark(), life will be much easieršŸ¤”šŸ„¹

wenym1 commented 1 month ago

I think the only benefit we can get from this get_watermark is that we don't need an extra state table to store the watermark, and the rest is the same.

The per-vnode table watermark can actually be treated as a special table. For any specified visibility (committed, or current, or whatever), the watermark we can get from the per-vnode table watermark should be consistent to the watermark we can read from the current watermark state table. So for the issue In this period, executor doesn't know whether it can write changes (may come from other input side than the watermark side) to state table, the two are the same.

The only inconsistency between them may come from executors don't know whether table.update_watermark take effect or not., which is caused by the logic of buffering watermark in state table and delay writing to storage. This logic was previously a workaround for tombstone based range deletes to reduce the range tombstones we write. It can actually be removed when we support per-vnode table watermark now.

In brief, I think it's still doable if it can simplify the code of watermark filter, but it doesn't seem to be a solution to any of the related issues.

stdrc commented 1 month ago

The per-vnode table watermark can actually be treated as a special table. For any specified visibility (committed, or current, or whatever), the watermark we can get from the per-vnode table watermark should be consistent to the watermark we can read from the current watermark state table. So for the issue In this period, executor doesn't know whether it can write changes (may come from other input side than the watermark side) to state table, the two are the same.

The only inconsistency between them may come from executors don't know whether table.update_watermark take effect or not., which is caused by the logic of buffering watermark in state table and delay writing to storage. This logic was previously a workaround for tombstone based range deletes to reduce the range tombstones we write. It can actually be removed when we support per-vnode table watermark now.

The problem is not executors don't know whether table.update_watermark take effect or not. actually, because we can easily add a method to get state_clean_watermark from StateTable. The key problem is about recovery. When recovery happens, executors cannot know the last watermark updated to the state table, so that some decisions cannot be made before receiving the first watermark message.

This is a blocking issue in any executor that uses watermarks one input side to state-clean a state table that materializes another side input.

So to clarify, last updated watermark or last committed watermark is part of the executors' state, the lost of the state on recovery is actually a bug, not an inconvenience.

wenym1 commented 1 month ago

The key problem is about recovery. When recovery happens, executors cannot know the last watermark updated to the state table, so that some decisions cannot be made before receiving the first watermark message.

But things is the same for the per-vnode table watermark. The table watermark and watermark state table are always consistent. They are written to shared buffer, and committed to the LSM manifest (we call in HummockVersion) at the same time, like in a transaction. After recovery, if we can't know the latest watermark updated to the state table, there is no way to get the latest per table watermark either.

stdrc commented 1 month ago

Completed by https://github.com/risingwavelabs/risingwave/pull/17767