risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.03k stars 578 forks source link

Avoid relying on barrier read in executor #13687

Open hzxa21 opened 11 months ago

hzxa21 commented 11 months ago

As mentioned in #12393, we realize that barrier and checkpoint decoupling is orthogonal to enable read on non-ckpt barrier. Storage can enable try_flush instead of force_flush on high-frequency non-checkpoint barrier to reduce the chance of OOM while avoid producing too many small immutable memtables if read on non-ckpt barrier is not allowed.

hzxa21 commented 11 months ago

IIUC, arrangement backfill and log store is currently relying on non-ckpt barrier read to work correctly if checkpoint frequency > 1. @wenym1 @kwannoel @chenzl25 Can you help to confirm? Is it possible to avoid reading on non-ckpt barrier read in these cases?

kwannoel commented 11 months ago

IIUC, arrangement backfill and log store is currently relying on non-ckpt barrier read to work correctly if checkpoint frequency > 1. @wenym1 @kwannoel @chenzl25 Can you help to confirm? Is it possible to avoid reading on non-ckpt barrier read in these cases?

For arrangement backfill, on every barrier (checkpoint or not), it will indeed still do a snapshot read.

We could delay read to every checkpoint barrier, but that could lead to checkpoint barrier taking a long time, if there's lots of data between 2 checkpoint barriers, and we also need to buffer updates between those two barriers.

Could you elaborate more on why we should "avoid reading on non-ckpt barrier"?

kwannoel commented 11 months ago

Storage can enable try_flush instead of force_flush on high-frequency non-checkpoint barrier to reduce the chance of OOM while avoid producing too many small immutable memtables if read on non-ckpt barrier is not allowed.

This seems to suggest if we have any non-ckpt read, then we cannot have the stated optimization. Is my understanding correct?

kwannoel commented 11 months ago

no_shuffle_backfill and arrangement_backfill both do snapshot read whenever there's a new barrier (regardless of checkpoint).

hzxa21 commented 11 months ago

Storage can enable try_flush instead of force_flush on high-frequency non-checkpoint barrier to reduce the chance of OOM while avoid producing too many small immutable memtables if read on non-ckpt barrier is not allowed.

This seems to suggest if we have any non-ckpt read, then we cannot have the stated optimization. Is my understanding correct?

Correct

Could you elaborate more on why we should "avoid reading on non-ckpt barrier"?

If we still need to support reading on non-ckpt barrier, executor needs to force flush its memtable to storage, which will result in many small immutable memtables. If the number of memtables are too big, it will hurt read performance and that is why we introduce extra logics to merge these memtables in addition to memtable spill and SST compaction. That being said, if we want to enjoy the benefit of spilling out states on non-ckpt barrier as well as maintaining the non-ckpt barrier read capability, we need complicated logic to make the read performance of non-ckpt barrier read performance stable.

There are 3 options here:

  1. Disallow non-ckpt barrier at all. Meaning that ckpt frequency can only be 1.
  2. Do force flush on each barrier and enable non-ckpt barrier read optimizations (i.e. merge small memtables). This is the current main's implementation
  3. Do force flush on each barrier and disable non-ckpt barrier read optimizations. This maintains the non-ckpt barrier read capability but provides no guarantees on its performance.
  4. Do try flush on each barrier and do not support non-ckpt barrier read at all.

Given that the usage of barrier read for batch query is narrow and only our executors rely on non-ckpt barrier read, if it is not hard to modify our executors, my preference is 4 > 3 > 2 > 1

chenzl25 commented 11 months ago

Besides backfill, non-ckpt read is used by DeltaJoin and TemporalJoin right now. DeltaJoin needs to read current epoch and the previous epoch data which means that it must contain a non-ckpt barrier read. If we don't support non-ckpt barrier read anymore, I think we should deprecate DeltaJoin as well.

As for backfill, removing non-ckpt barrier read seems easy, we just need to buffer upstream data between 2 checkpoint barriers and don't need to handle barrier alignment required by joins.

wenym1 commented 11 months ago

For log store, I can try let log reader to read the same local state store as the log writer. The drawback is that we will need to have an async mutex on the shared local state store. I will investigate on the potential of doing it.

kwannoel commented 11 months ago

Besides backfill, non-ckpt read is used by DeltaJoin and TemporalJoin right now. DeltaJoin needs to read current epoch and the previous epoch data which means that it must contain a non-ckpt barrier read. If we don't support non-ckpt barrier read anymore, I think we should deprecate DeltaJoin as well.

As for backfill, removing non-ckpt barrier read seems easy, we just need to buffer upstream data between 2 checkpoint barriers and don't need to handle barrier alignment required by joins.

For ckpt read, the data must have been committed. So it seems more buffer until previous checkpoint barrier complete, which could extend past 2 checkpoint barriers.

This means we need to either:

  1. Let state store notify us when ckpt complete, and discard the buffer
  2. OR on ckpt, wait for epoch to be committed before reading from the snapshot.
kwannoel commented 11 months ago

This means we need to either:

  1. Let state store notify us when ckpt complete, and discard the buffer
  2. OR on ckpt, wait for epoch to be committed before reading from the snapshot.

@chenzl25 @hzxa21 will there be any issues for 2.? That's the approach I would prefer. Could checkpoint take a long time to complete?

hzxa21 commented 11 months ago

Besides backfill, non-ckpt read is used by DeltaJoin and TemporalJoin right now. DeltaJoin needs to read current epoch and the previous epoch data which means that it must contain a non-ckpt barrier read. If we don't support non-ckpt barrier read anymore, I think we should deprecate DeltaJoin as well.

As for backfill, removing non-ckpt barrier read seems easy, we just need to buffer upstream data between 2 checkpoint barriers and don't need to handle barrier alignment required by joins.

That is true. It seems that there are many places to change and deprecate. How about only deprecating barrier read for batch query and still provide barrier read for internal usage? We can simply choose

  1. Do force flush on each barrier and disable non-ckpt barrier read optimizations. This maintains the non-ckpt barrier read capability but provides no guarantees on its performance.

because in most cases decoupling will be disabled with checkpoint_frequency=1

wcy-fdu commented 11 months ago

3. disable non-ckpt barrier read optimizations

Is there any other kind of optimization besides merge imm?

hzxa21 commented 11 months ago
  1. disable non-ckpt barrier read optimizations

Is there any other kind of optimization besides merge imm?

That is the only one I can think of.

github-actions[bot] commented 5 months ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.