risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.88k stars 569 forks source link

perf: nexmark q0 #8712

Open kwannoel opened 1 year ago

kwannoel commented 1 year ago

Background

In recent benchmark Flink had average throughput of 1M r/s. RW had average throughput of 850K r/s. Requires a 17% improvement to match Flink. Thanks to @huangjw806 for spotting this.

Flamegraph can be found here, under Artifacts.

query

    CREATE SINK nexmark_q0
    AS
    SELECT auction, bidder, price, date_time
    FROM bid
    WITH ( connector = 'blackhole', type = 'append-only');

plan

   QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 StreamSink { type: append-only, columns: [auction, bidder, price, date_time] }
 └─StreamProject { exprs: [$expr1, $expr2, $expr3, $expr4] }
   └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr1, Field(bid, 1:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 5:Int32) as $expr4, _row_id] }
     └─StreamFilter { predicate: (event_type = 2:Int32) }
       └─StreamRowIdGen { row_id_index: 5 }
         └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_rw_kafka_timestamp", "_row_id"] }

Here are screenshots of the flamegraph, highlighting cost centers.

Screenshot 2023-03-22 at 5 31 00 PM Screenshot 2023-03-22 at 5 31 21 PM Screenshot 2023-03-22 at 5 33 52 PM Screenshot 2023-03-22 at 5 33 59 PM Screenshot 2023-03-22 at 5 34 09 PM Screenshot 2023-03-22 at 5 34 30 PM
kwannoel commented 1 year ago

Please append to screenshots if you find something interesting in the flamegraph.

lmatz commented 1 year ago

https://github.com/risingwavelabs/risingwave/blob/main/src/common/src/array/mod.rs#L500-L514

SCR-20230322-s0g

append_datum_n itself takes a non-negligible amount of time, does it make sense?

lmatz commented 1 year ago

https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/parser/json_parser.rs#L94-L109

let's save the to_vec.

Edit: Done in #8732

lmatz commented 1 year ago

https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/parser/json_parser.rs#L122

done to_ascii_lowercase() once in advance, out of the closure/loop?

Edit: Done in #8718

lmatz commented 1 year ago

https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/parser/mod.rs#L200

Vec::with_capacity() instead of vec![]?

Edit: Done in #8718

lmatz commented 1 year ago

I feel we need to find a good strategy to decide when to compact in the ProjectExecutor https://github.com/risingwavelabs/risingwave/blob/main/src/stream/src/executor/project.rs#L113

In this case, as the expr is really computation-light, compact itself introduces 11% overhead.

Edit: Probably not, after all, we need to output those visible ones only to an external sink, so we have to do some compaction somewhere before the final stage.

kwannoel commented 1 year ago

Probably not, after all, we need to output those visible ones only to an external sink, so we have to do some compaction somewhere before the final stage.

Not sure I understand why compaction is required in this case. Why can't we just output visible rows to external sink?

lmatz commented 1 year ago

https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/sink/remote.rs#L276-L294

Kind of depending on the output format requirement

if it asks for JSON, then we iterate through each row of the chunk, so we can choose not to compact. But if it asks for stream chunk format, then we have to compact.

Since this is a blackhole sink, we can save the compaction, you are right!

So whether to compact becomes something to determine at the stage of optimization?

kwannoel commented 1 year ago

https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/sink/remote.rs#L276-L294

Kind of depending on the output format requirement

if it asks for JSON, then we iterate through each row of the chunk, so we can choose not to compact. But if it asks for stream chunk format, then we have to compact.

Since this is a blackhole sink, we can save the compaction, you are right!

Hmm even in StreamChunk we can defer it to to_protobuf? https://github.com/risingwavelabs/risingwave/blob/96aa23dd0dc04c869a2ea5fdb4b7b1fc8fbcbf6d/src/connector/src/sink/remote.rs#L290-L294

lmatz commented 1 year ago

I think so, but this prost_stream_chunk seems not aware of visibility

kwannoel commented 1 year ago

But typically will any system consume StreamChunk even? 👀 I thought it is used internally

lmatz commented 1 year ago

But typically will any system consume StreamChunk even? 👀

I guess no, so we can save the final compact if it is connected to the sink/MV executor

lmatz commented 1 year ago

For this particular query, we need to be cautious because of this two project issue. It's a bug that has not been fixed.

The project below actually compacts in this case, so

save the final compact

is a wrong statement here, which is a correct one if we fixed this particular bug first.

kwannoel commented 1 year ago

So whether to compact becomes something to determine at the stage of optimization?

Makes sense to me.

kwannoel commented 1 year ago

For this particular query, we need to be cautious because of this two project issue. It's a bug that has not been fixed.

The project below actually compacts in this case, so

save the final compact

is a wrong statement here, which is a correct one if we fixed this particular bug first.

Linking it: https://github.com/risingwavelabs/risingwave/issues/8577

lmatz commented 1 year ago

https://github.com/risingwavelabs/risingwave/issues/8577#issuecomment-1471343129

Which means we actually do no need to have StreamRowIdGen { row_id_index: 5 } right? @st1page

ok, not much overhead though from flamegraph

lmatz commented 1 year ago

Guess SourceStreamChunkRowWriter is not efficient enough for those insert-only sources 🤔

st1page commented 1 year ago

I feel we need to find a good strategy to decide when to compact in the ProjectExecutor https://github.com/risingwavelabs/risingwave/blob/main/src/stream/src/executor/project.rs#L113 In this case, as the expr is really computation-light, compact itself introduces 11% overhead.

done in https://github.com/risingwavelabs/risingwave/pull/8758

But typically will any system consume StreamChunk even? eyes

I am not sure if we can sink the chunk into a system with arrow format :thinking:

For this particular query, we need to be cautious because of this two project issue. It's a bug that has not been fixed. The project below actually compacts in this case, so

save the final compact

is a wrong statement here, which is a correct one if we fixed this particular bug first.

I think it can not help the compact performance issue because it will be compact in any project. If we have a plan ProjA->ProjB, the chunk will be compacted in the projA and the the ProjB::compact() will not be costly

kwannoel commented 1 year ago

I think the idea I have is slightly different, it's more to avoid compact even when there's filter, as in the case of q0.

When sinking, we don't need to build new chunk, instead we build protobuf / json encoded chunk. We can delay the top-most compact call until here, saving cost of building a new chunk, and relying on protobuf / json building step to remove invisible rows.

@st1page 's approach is still needed, as a general optimization for when to compact. This approach is complementary, it will always eliminate top-most compact when sinking regardless of selectivity, to avoid unnecessarily building a new chunk.

q0's compact call can be optimized via this complementary approach.

To implement this as a optimization requires a bit of refactoring to add should_compact as a plan-level attribute (or other suggestions?). A simple thing to do for now is just disable compact for top-most project, if sinking, since that's usually the most common case.

st1page commented 1 year ago

When sinking, we don't need to build new chunk, instead we build protobuf / json encoded chunk. We can delay the top-most compact call until here, saving cost of building a new chunk, and relying on protobuf / json building step to remove invisible rows.

strongly +1.

To implement this as a optimization requires a bit of refactoring to add should_compact as a plan-level attribute (or other suggestions?). A simple thing to do for now is just disable compact for top-most project, if sinking, since that's usually the most common case.

I think is not a plan-level attribute. Currently, we do compact the input chunk just to simplify the executor's implementation. But in fact, every executor should handle the visibility properly. e.g.

The question here is that we give the chunk's visibility to SinkExecutor which means it has the chance to do the optimization but it does not. So we need to do optimization in the SinkExecutor.

lmatz commented 1 year ago

SCR-20230328-bqd

The peak throughput gets to 1M rows/s. Luckily, the imbalanced source throughput problem didn't happen today.

I guess the left two things are:

  1. avoid the unnecessary compact
  2. optimize SourceStreamChunkRowWriter, or let's say have a customized code path for insert-only sources. 🤔 At least it can avoid the data type match for every Datum I suppose.
lmatz commented 4 months ago

link #14815