risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.88k stars 569 forks source link

perf: Outer joins will always multiply their input when join key cardinality is 1. #17450

Closed st1page closed 2 months ago

st1page commented 3 months ago

minimal reproduction case

as title, Due to our current implementation of stream join, which processes each row independently, intermediate NULL results are emitted. https://github.com/risingwavelabs/risingwave/pull/17449/

Original case:

The user wants to use a FULL OUTER JOIN to link together several tables that have the same primary key

create table t1(k int primary key, v1 int);
create table t2(k int primary key, v2 int);
create table t3(k int primary key, v3 int);

create materialized view mv as select k, v1, v2, v3 from 
t1
full outer join t2 using(k)
full outer join t3 using(k);

Due to this issue, the output throughput of the join operation increases exponentially with the number of joins, leading to performance problems. The user's original query involves dozens or even a hundred tables.

fuyufjh commented 3 months ago

Very interesting point! Although I don't have any good idea of solution yet 🤔

Just note down some random ideas

lmatz commented 3 months ago

In this particular user's case, the events with the same primary key must come in within the same epoch.

The events that get inserted into t1, t2, and t3 respectively originate from the same source event, but via different intermediate MVs. It's more like

create t(k int primary key, v1 int);

create materialized view m1 as select ... from t ...
create materialized view m2 as select ... from t ...
create materialized view m3 as select ... from t ...

create materialized view mv as select k from 
m1
full outer join m2 using(k)
full outer join m3 using(k);

Besides, the latency requirement of this user is restricted. The expected output must be sent downstream within 60 ms e2e latency, which means output triggered by the arrival of the barrier only is not desirable.

The solution, if possible, is better to offer a knob for the user to tune, similar to the "dirty_heap_size" parameter in the aggregation operator to decide when to output. (I imagine some customized periodic-timer-like stream that can get selected together with the normal data/barrier stream, to avoid the latency due to the barrier not arriving yet and no data coming in either)

Another issue is that:

 StreamMaterialize { columns: [k, v1, v2, v3, v4, v5, t1.k(hidden), t2.k(hidden), t3.k(hidden), $expr1(hidden), t4.k(hidden), $expr2(hidden), t5.k(hidden), $expr3(hidden)], stream_key: [t1.k, t2.k, t3.k, $expr1, t4.k, $expr2, t5.k, $expr3], pk_columns: [t1.k, t2.k, t3.k, $expr1, t4.k, $expr2, t5.k, $expr3], pk_conflict: NoCheck }
 └─StreamExchange { dist: HashShard(t1.k, t2.k, t3.k, $expr1, t4.k, $expr2, t5.k, $expr3) }
   └─StreamProject { exprs: [Coalesce(t1.k, t2.k, t3.k, t4.k, t5.k) as $expr4, t1.v1, t2.v2, t3.v3, t4.v4, t5.v5, t1.k, t2.k, t3.k, $expr1, t4.k, $expr2, t5.k, $expr3] }
     └─StreamFilter { predicate: (((IsNotNull(t1.k) OR IsNotNull(t2.k)) OR (IsNotNull(t3.k) OR IsNotNull($expr1))) OR ((IsNotNull(t4.k) OR IsNotNull($expr2)) OR IsNotNull(t5.k))) }
       └─StreamHashJoin { type: FullOuter, predicate: $expr3 = t5.k }
         ├─StreamExchange { dist: HashShard($expr3) }
         │ └─StreamProject { exprs: [t1.k, t1.v1, t2.k, t2.v2, t3.k, t3.v3, t4.k, t4.v4, Coalesce(t1.k, t2.k, t3.k, t4.k) as $expr3, $expr1, $expr2] }
         │   └─StreamFilter { predicate: (((IsNotNull(t1.k) OR IsNotNull(t2.k)) OR (IsNotNull(t3.k) OR IsNotNull($expr1))) OR IsNotNull(t4.k)) }
         │     └─StreamHashJoin { type: FullOuter, predicate: $expr2 = t4.k }
         │       ├─StreamExchange { dist: HashShard($expr2) }
         │       │ └─StreamProject { exprs: [t1.k, t1.v1, t2.k, t2.v2, t3.k, t3.v3, Coalesce(t1.k, t2.k, t3.k) as $expr2, $expr1] }
         │       │   └─StreamFilter { predicate: ((IsNotNull(t1.k) OR IsNotNull(t2.k)) OR IsNotNull(t3.k)) }
         │       │     └─StreamHashJoin { type: FullOuter, predicate: $expr1 = t3.k }
         │       │       ├─StreamExchange { dist: HashShard($expr1) }
         │       │       │ └─StreamProject { exprs: [t1.k, t1.v1, t2.k, t2.v2, Coalesce(t1.k, t2.k) as $expr1] }
         │       │       │   └─StreamFilter { predicate: (IsNotNull(t1.k) OR IsNotNull(t2.k)) }
         │       │       │     └─StreamHashJoin { type: FullOuter, predicate: t1.k = t2.k }
         │       │       │       ├─StreamExchange { dist: HashShard(t1.k) }
         │       │       │       │ └─StreamTableScan { table: t1, columns: [k, v1] }
         │       │       │       └─StreamExchange { dist: HashShard(t2.k) }
         │       │       │         └─StreamTableScan { table: t2, columns: [k, v2] }
         │       │       └─StreamExchange { dist: HashShard(t3.k) }
         │       │         └─StreamTableScan { table: t3, columns: [k, v3] }
         │       └─StreamExchange { dist: HashShard(t4.k) }
         │         └─StreamTableScan { table: t4, columns: [k, v4] }
         └─StreamExchange { dist: HashShard(t5.k) }
           └─StreamTableScan { table: t5, columns: [k, v5] }

Having a left-deep tree seems a suboptimal plan for this particular case because the left-deep subtree has the longest path and we expect that the barrier will walk to the end slowly compared to other sub-tree. Therefore, all the intermediate joins have already generated a null value output, waiting to trigger the amplification by the time when the barrier arrives.

st1page commented 3 months ago

A simple method to fix this https://github.com/risingwavelabs/risingwave/pull/17568