risingwavelabs / risingwave

SQL engine for event-driven workloads. Perform streaming analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch processing. PostgreSQL compatible.
https://www.risingwave.com/slack
Apache License 2.0
6.72k stars 552 forks source link

bug(dyn-filter): left side changes after state cleaning with right watermark #17711

Closed stdrc closed 3 weeks ago

stdrc commented 1 month ago

In DynamicFilter executor, we use watermarks on RHS to clean the state of left table, by left_table.update_watermark(rhs_watermark). However in this way, later left changes will cause inconsistent table operations (e.g. double delete) on the left table, if the update_watermark did take effect.

An intuitive way to resolve this is to check whether the changed rows on left side is below watermark, and just ignore those rows that's below watermark. But another fact is that state cleaning watermark is not persisted, so after recovery we will temporarily have None watermark.

I guess for the sake of simplicity, we may just use state table with inconsistent_op for the left table of DynamicFilter.

(This is blocking #17694.)

stdrc commented 1 month ago

To Reproduce

First, change risingwave_stream::common::table::state_table::STATE_CLEANING_PERIOD_EPOCH to 0, so that state cleaning watermark will instantly apply to state tables and hence range deletion will instantly happen.

Then, run:

create table t (ts timestamp, foo int);

insert into t values (now() - interval '2 min', 111), (now() - interval '1 min', 222);

create materialized view mv as select * from t where ts >= now() - interval '1 minute';

After the MV creation, there should be nothing in mv, because all records are not satisfying the condition. Also, there should be nothing in __internal_mv_3_dynamicfilterleft_3 because StreamNow should already emitted some watermarks to clean the state in this internal table.

Now, updare the left table:

update t set foo = 123 where foo = 111;

This is absolutely valid because t is not append only thus is open to change. But this will cause DynamicFilter to panic when trying to update the record in __internal_mv_3_dynamicfilterleft_3 table.

stdrc commented 1 month ago

cc @st1page @fuyufjh @BugenZhao @yuhao-su @kwannoel

kwannoel commented 1 month ago

I got the first part, which is that within a single epoch, if the ordering of watermark happens like:

  1. Clean watermark for A
  2. Delete / Update A

Then the state for A is inconsistent.

Didn't quite get the second part:

But another fact is that state cleaning watermark is not persisted, so after recovery we will temporarily have None watermark.

If no watermark, doesn't that just mean we don't encounter the inconsistency case?

fuyufjh commented 1 month ago

This is absolutely valid because t is not append only thus is open to change. But this will cause DynamicFilter to panic when trying to update the record in __internal_mv_3_dynamicfilterleft_3 table.

From my understanding,

stdrc commented 1 month ago
  • Meanwhile, the DynamicFilter should ignore the update because it was 2 minutes ago i.e. under the watermark, which means it has been purged from state table as well as downstream

Yes, so as I said in the issue description:

An intuitive way to resolve this is to check whether the changed rows on left side is below watermark, and just ignore those rows that's below watermark. But another fact is that state cleaning watermark is not persisted, so after recovery we will temporarily have None watermark.

If we adopt the ignoring changes below watermark way, we will face the problem that (state table) watermark is not persisted.

stdrc commented 1 month ago

Didn't quite get the second part:

But another fact is that state cleaning watermark is not persisted, so after recovery we will temporarily have None watermark.

If no watermark, doesn't that just mean we don't encounter the inconsistency case?

There is watermark on the right column, but on recovery it seems not guaranteed that the first right watermark will come before left changes.

kwannoel commented 1 month ago

Didn't quite get the second part:

But another fact is that state cleaning watermark is not persisted, so after recovery we will temporarily have None watermark.

If no watermark, doesn't that just mean we don't encounter the inconsistency case?

There is watermark on the right column, but on recovery it seems not guaranteed that the first right watermark will come before left changes.

But if it comes after the left changes, there won't be inconsistency right, because the left changes will be processed first. Then the watermark will just not clean it.

stdrc commented 1 month ago

Didn't quite get the second part:

But another fact is that state cleaning watermark is not persisted, so after recovery we will temporarily have None watermark.

If no watermark, doesn't that just mean we don't encounter the inconsistency case?

There is watermark on the right column, but on recovery it seems not guaranteed that the first right watermark will come before left changes.

But if it comes after the left changes, there won't be inconsistency right, because the left changes will be processed first. Then the watermark will just not clean it.

But the left changes may be actually below the watermark produced last time, which due to recovery dynamic filter doesn't know yet.

fuyufjh commented 1 month ago

But another fact is that state cleaning watermark is not persisted, so after recovery we will temporarily have None watermark.

The watermark is persisted in Hummock's metadata. See https://github.com/risingwavelabs/risingwave/pull/15344. If the storage (StateTable) can provide such a function to get the latest watermark of the table, the problem will be resolved, right?

st1page commented 1 month ago

But another fact is that state cleaning watermark is not persisted, so after recovery we will temporarily have None watermark.

The watermark is persisted in Hummock's metadata. See #15344. If the storage (StateTable) can provide such a function to get the latest watermark of the table, the problem will be resolved, right?

Can the CN obtain accurate and timely watermark information? If it is trustworthy, I suggest obtaining the current watermark when the state table is created, and it should be able to automatically accept and filter out all operations below the watermark. 🤔 cc @wenym1

stdrc commented 1 month ago

Raised another issue to discuss whether we should provide watermark information on state store level: #17741.