risingwavelabs / risingwave

SQL stream processing, analytics, and management. We decouple storage and compute to offer instant failover, dynamic scaling, speedy bootstrapping, and efficient joins.
https://www.risingwave.com/slack
Apache License 2.0
6.59k stars 538 forks source link

feat(sink): enhance more sinks to support `commit_checkpoint_interval` #17383

Open fuyufjh opened 2 weeks ago

fuyufjh commented 2 weeks ago

Is your feature request related to a problem? Please describe.

Many OLAP systems including BigQuery, ClickHouse, Iceberg, etc., perform poorly when inserting data in small batches. Currently, constrained by our checkpointing interval, RisingWave can only batch data within 1 second. In cases with tens of parallelism, this approach results in poor performance.

Describe the solution you'd like

In Iceberg Sink we have introduced an option commit_checkpoint_interval.

https://github.com/risingwavelabs/risingwave/blob/378a04adbd1266cdba32c1533e6ac76e2349d5dc/src/connector/src/sink/iceberg/mod.rs#L131-L133

Based on log store, it can batch data in a wider range, crossing multiple checkpoint epoches.

Describe alternatives you've considered

NA

Additional context

Today I met a case of poor performance in BigQuery Sink. We may start from this.

xxhZs commented 2 weeks ago

Similar Issues https://github.com/risingwavelabs/risingwave/issues/17223

wenym1 commented 2 weeks ago

We can run some benckmarks to test how the throughput will be affected when we try to commit more things once in a batch. Can test bigqueyr and clickhouse first. cc @xxhZs

wcy-fdu commented 2 weeks ago

The same issue also needs to be considered on the file sink side. Currently file sink can only write file when checkpoint comes. Although Flink will also force flush once when the checkpoint arrives, its checkpoint frequency is very low so the impact is not significant. For this type of sink, decoupling the sink is equivalent to batch writing files in some way. For example in flink

xxhZs commented 1 week ago

We can run some benckmarks to test how the throughput will be affected when we try to commit more things once in a batch. Can test bigqueyr and clickhouse first. cc @xxhZs

After running the bench, I found that the throughput on and off is basically the same

xxhZs commented 1 week ago

And this pr also has a bench test on sr.https://github.com/risingwavelabs/risingwave/pull/16816

wenym1 commented 1 week ago

We can run some benckmarks to test how the throughput will be affected when we try to commit more things once in a batch. Can test bigqueyr and clickhouse first. cc @xxhZs

After running the bench, I found that the throughput on and off is basically the same

@xxhZs Could you paste some concrete number about the throughputs and the experiment settings?

xxhZs commented 1 week ago

we use sink bench , and test 300s, the result is : bigquery: commit_checkpoint_interval = 1: avg 105691 p90 Some(112191) p95 Some(115124) p99 Some(119399) commit_checkpoint_interval = 10: avg 101587 p90 Some(116913) p95 Some(118546) p99 Some(122634) commit_checkpoint_interval = 50: avg 104062 p90 Some(113777) p95 Some(116502) p99 Some(119873)

clickhouse: commit_checkpoint_interval = 1: avg 259874 p90 Some(282974) p95 Some(288061) p99 Some(301896) commit_checkpoint_interval = 10: avg 262768 p90 Some(281498) p95 Some(288191) p99 Some(296637) commit_checkpoint_interval = 50: avg 265784 p90 Some(289081) p95 Some(295188) p99 Some(315546)

And I found that our default bigquery.max_batch_rows used to be too small. After resizing it to a larger size, the throughput was much higher

wenym1 commented 1 week ago

And I found that our default bigquery.max_batch_rows used to be too small. After resizing it to a larger size, the throughput was much higher

Interesting. What's the new size, and how much does the throughput increase? If the throughput can be greatly increased when we just simply increase this batch size, I suspect that most time is wasted on waiting for the response from response stream. Can you try asynchronously waiting for response from stream and do the truncation, in this way we won't be blocked by waiting for the response and may get better throughput.

Besides, currently we create a AppendRowsRequestRows for each stream chunk, and when we write rows, we may have several write requests. Have you tried combining all rows that is going to be written, into a single AppendRowsRequestRows?

And what's the number of parallelisms in the test?

xxhZs commented 1 week ago

More detailed bench for bigquery sinks no aysnc + max_batch_rows = 1024 + commit_checkpoint_interval = 1: avg: 4593 rows/s p90: 6144 rows/s p95: 6144 rows/s p99: 6144 rows/s aysnc + max_batch_rows = 1024 + commit_checkpoint_interval = 1: avg: 4377 rows/s p90: 6144 rows/s p95: 6144 rows/s p99: 6144 rows/s no aysnc + max_batch_rows = 10240 + commit_checkpoint_interval = 1: avg: 18202 rows/s p90: 20480 rows/s p95: 22528 rows/s p99: 28672 rows/s aysnc + max_batch_rows = 10240 + commit_checkpoint_interval = 1: Throughput Sink: avg: 33255 rows/s p90: 40960 rows/s p95: 40960 rows/s p99: 42112 rows/s no aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 1: avg: 60873 rows/s p90: 117888 rows/s p95: 122880 rows/s p99: 126976 rows/s aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 1: avg: 118708 rows/s p90: 260096 rows/s p95: 260096 rows/s p99: 284672 rows/s no aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 10: avg: 57629 rows/s p90: 118382 rows/s p95: 120832 rows/s p99: 126482 rows/s aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 10: avg: 108911 rows/s p90: 116736 rows/s p95: 120832 rows/s p99: 268288 rows/s no aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 50: avg: 60745 rows/s p90: 114688 rows/s p95: 118784 rows/s p99: 122880 rows/s aysnc + max_batch_rows = 100000000000 + commit_checkpoint_interval = 50: avg: 96614 rows/s p90: 106496 rows/s p95: 110592 rows/s p99: 116736 rows/s without async, avg throughput will be smaller, due to the fact that an oversized batch will wait for a long time while the bg is being written, resulting in a throughput of 0 for that period of time