risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.98k stars 575 forks source link

RFC: Support disable checkpoint #4392

Closed fuyufjh closed 1 year ago

fuyufjh commented 2 years ago

Background

In real-world use cases, not all streaming tasks require checkpoints/persisted state. For example, In most ETL use cases, users could simply recover from some offset in source MQ, so the persistence is actually guaranteed from the source side.

On the other hand, Hummock is still under heavy development, and it's hard for us to make sure the backward compatibility of persisted data at this stage. Thus I guess for early users they may not have other choices but only to use RisingWave as a "non-persisted" streaming database.

Design

RisingWave is designed to have persisted storage from day 1. Luckily, since we are working on barrier-checkpoint decoupling #4290, it's possible to support this within relatively few changes.

Here are my rough ideas. Correct me if anything is missing or wrong.

Further Optimization

An optional optimization is to keep the state of streaming operator in operator cache only. Otherwise, all rows of internal state will be stored twice in memory. To achieve this,

Notes

twocode commented 2 years ago

If the purpose is for compute team to benchmark streaming performance, I suggest we keep barriers since it's brings trivial costs anyway while keep the semantics at the same time. Keeping all data in shared buffer will not help with performance stability since it would potentially thrash the memory system.

If correctness matters and we want to cater to real customers without long tail persistence cost, we can use memory object store. If correctness is not main target, I suggest we implement a blackhole object store to swallow all io operations.

neverchanje commented 2 years ago

FYI @lmatz is working on a benchmark that aims at comparing the performance of Flink and RisingWave in terms of stateless computation. We will compare the CPU and memory consumption when each of the systems deals with a heavily loaded stream. In this case, the benchmark would be unfair since RisingWave materializes the result while Flink doesn't. So we are planning to hack a "blackhole MaterializeExecutor" that does nothing but return an OK(). This executor won't be merged into the main branch but is only for testing. So it would be great if we had a better solution.

lmatz commented 2 years ago

I suggest we keep barriers

I thought Never emits checkpoint barrier means that the system will keep freshness barriers, but will not emit checkpoint barriers that tell the system to the checkpoint. So probably the same thing? I want to be clarified on this issue.

fuyufjh commented 2 years ago

I thought Never emits checkpoint barrier means that the system will keep freshness barriers, but will not emit checkpoint barriers that tell the system to the checkpoint.

Yes, I think we are talking about the same thing.

liurenjie1024 commented 2 years ago

In the stateless computing case, what's flink's sink?

lmatz commented 2 years ago

In the stateless computing case, what's flink's sink?

blackhole sink I suppose, is it correct?

BugenZhao commented 2 years ago

Can we also disable the barrier aligning in this case?

fuyufjh commented 2 years ago

Can we also disable the barrier aligning in this case?

No, we need aligned barriers to propagate epochs through the system. https://singularity-data.slack.com/archives/C034TRPKN1F/p1659434112846699?thread_ts=1659433808.483719&cid=C034TRPKN1F

fuyufjh commented 2 years ago

Waiting for https://github.com/risingwavelabs/risingwave/pull/4966

fuyufjh commented 1 year ago

closed by #4966