Open lmatz opened 1 year ago
query:
create materialized view AuctionBids
as
SELECT
bid.auction,
count(*) AS num,
window_start AS starttime
FROM
HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY
window_start,
bid.auction;
plan:
StreamMaterialize { columns: [maxn, starttime_c], pk_columns: [starttime_c] }
└─StreamProject { exprs: [max(count), window_start] }
└─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] }
└─StreamExchange { dist: HashShard(window_start) }
└─StreamProject { exprs: [Field(bid, 0:Int32), window_start, count] }
└─StreamAppendOnlyHashAgg { group_key: [Field(bid, 0:Int32), window_start], aggs: [count, count] }
└─StreamExchange { dist: HashShard(Field(bid, 0:Int32), window_start) }
└─StreamHopWindow { time_col: Field(bid, 5:Int32), slide: 00:00:02, size: 00:00:10, output: [Field(bid, 0:Int32), window_start, _row_id] }
└─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 5:Int32), _row_id] }
└─StreamFilter { predicate: (event_type = 2:Int32) }
└─StreamRowIdGen { row_id_index: 4 }
└─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
query:
create materialized view MaxBids
as
SELECT
max(CountBids.num) AS maxn,
CountBids.starttime_c
FROM (
SELECT
count(*) AS num,
window_start AS starttime_c
FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY
bid.auction,
window_start
) AS CountBids
GROUP BY
CountBids.starttime_c;
plan:
StreamMaterialize { columns: [maxn, starttime_c], pk_columns: [starttime_c] }
└─StreamProject { exprs: [max(count), window_start] }
└─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] }
└─StreamExchange { dist: HashShard(window_start) }
└─StreamProject { exprs: [Field(bid, 0:Int32), window_start, count] }
└─StreamAppendOnlyHashAgg { group_key: [Field(bid, 0:Int32), window_start], aggs: [count, count] }
└─StreamExchange { dist: HashShard(Field(bid, 0:Int32), window_start) }
└─StreamHopWindow { time_col: Field(bid, 5:Int32), slide: 00:00:02, size: 00:00:10, output: [Field(bid, 0:Int32), window_start, _row_id] }
└─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 5:Int32), _row_id] }
└─StreamFilter { predicate: (event_type = 2:Int32) }
└─StreamRowIdGen { row_id_index: 4 }
└─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
It seems StreamShare
can be lifted up by two operators because
StreamHopWindow { time_col: Field(bid, 5:Int32), slide: 00:00:02, size: 00:00:10, output: [Field(bid, 0:Int32), window_start, _row_id] }
└─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 5:Int32), _row_id] }
above it is the same for the two subqueries?
Edit:
three, even the StreamExchange
.
Yes, it is an advanced feature about common sub-plan detection. Currently, we only plan to support shared source, cte, view and subquery domain. Anyway we can support it if necessary. @wsx-ucb seems interested in this feature.
Would you mind also pasting the plan of Flink SQL here as the comparison? 🥺
Query:
SELECT AuctionBids.auction, AuctionBids.num
FROM (
SELECT
B1.auction,
count(*) AS num,
HOP_START(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
HOP_END(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime
FROM bid B1
GROUP BY
B1.auction,
HOP(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
) AS AuctionBids
JOIN (
SELECT
max(CountBids.num) AS maxn,
CountBids.starttime,
CountBids.endtime
FROM (
SELECT
count(*) AS num,
HOP_START(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
HOP_END(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime
FROM bid B2
GROUP BY
B2.auction,
HOP(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
) AS CountBids
GROUP BY CountBids.starttime, CountBids.endtime
) AS MaxBids
ON AuctionBids.starttime = MaxBids.starttime AND
AuctionBids.endtime = MaxBids.endtime AND
AuctionBids.num >= MaxBids.maxn;
Optimized Physical Plan:
Calc(select=[auction, num])
+- Join(joinType=[InnerJoin], where=[AND(=(starttime, starttime0), =(endtime, endtime0), >=(num, maxn))], select=[auction, num, starttime, endtime, maxn, starttime0, endtime0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[starttime, endtime]])
: +- Calc(select=[$f0 AS auction, num, w$start AS starttime, w$end AS endtime])
: +- GroupWindowAggregate(groupBy=[$f0], window=[SlidingGroupWindow('w$, dateTime, 10000, 2000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, COUNT(*) AS num, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
: +- Exchange(distribution=[hash[$f0]])
: +- Calc(select=[bid.auction AS $f0, dateTime], where=[=(event_type, 2)])
: +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
: +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
: +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
+- Exchange(distribution=[hash[starttime, endtime]])
+- Calc(select=[maxn, starttime, endtime])
+- GroupAggregate(groupBy=[starttime, endtime], select=[starttime, endtime, MAX(num) AS maxn])
+- Exchange(distribution=[hash[starttime, endtime]])
+- Calc(select=[w$start AS starttime, w$end AS endtime, num])
+- GroupWindowAggregate(groupBy=[$f0], window=[SlidingGroupWindow('w$, dateTime, 10000, 2000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, COUNT(*) AS num, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[$f0]])
+- Calc(select=[bid.auction AS $f0, dateTime], where=[=(event_type, 2)])
+- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
+- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
+- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
Optimized Execution Plan:
Calc(select=[auction, num])
+- Join(joinType=[InnerJoin], where=[((starttime = starttime0) AND (endtime = endtime0) AND (num >= maxn))], select=[auction, num, starttime, endtime, maxn, starttime0, endtime0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[starttime, endtime]])
: +- Calc(select=[$f0 AS auction, num, w$start AS starttime, w$end AS endtime])
: +- GroupWindowAggregate(groupBy=[$f0], window=[SlidingGroupWindow('w$, dateTime, 10000, 2000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, COUNT(*) AS num, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])(reuse_id=[1])
: +- Exchange(distribution=[hash[$f0]])
: +- Calc(select=[bid.auction AS $f0, dateTime], where=[(event_type = 2)])
: +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
: +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
: +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
+- Exchange(distribution=[hash[starttime, endtime]])
+- Calc(select=[maxn, starttime, endtime])
+- GroupAggregate(groupBy=[starttime, endtime], select=[starttime, endtime, MAX(num) AS maxn])
+- Exchange(distribution=[hash[starttime, endtime]])
+- Calc(select=[w$start AS starttime, w$end AS endtime, num])
+- Reused(reference_id=[1])
Some differences:
share
lifted up, so the common sub-plan includes more operators.exchange
based on the hash value of bid.auction
. RW does not have this exchange, only the exchange based on the hash value of both bid.auction
and window_start
.starttime
and endtime
. Seems unnecessary?The explanation for the point (3) above is that:
starttime
and endtime
.....It merges the final filter into the hash join. Notice that at https://github.com/risingwavelabs/risingwave/blob/main/src/stream/src/executor/hash_join.rs#L225, RW's hash join is able to accept a non-equal condition. Wonder what's the criteria to determine using a separate filter on top of join or a non-equal hash join?
we think the Filter operator can process more efficiently because of the vectorized execution.
Another difference is that Flink adds a watermark definition on the column date_time
and uses the GroupWindowAggregate
which is more like our EMIT ON WINDOW CLOSE
query. So the first aggregation can buffer the data until the window has been closed and emit an append-only stream, and then the second aggregation can be more efficient with an append-only input.
Exactly, just realized today and put the watermark into the tracking issue.
we think the Filter operator can process more efficiently because of the vectorized execution.
may worth benchmarking once, wonder if having another operator would mess up the cache, I honestly don't know
very similar with https://github.com/risingwavelabs/risingwave/issues/7244#issuecomment-1376126069 rewriten SQL:
SELECT
B.auction,
B.num,
FROM (
SELECT
auction,
num,
/*use rank here to express top-N with ties*/
rank() over (partition by starttime order by num desc) as num_rank
FROM (
SELECT bid.auction, count(*) AS num, window_start AS starttime
FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY window_start, bid.auction
)
) B
WHERE price_rank <=1;
Good point, let me add it into the performance dashboard tomorrow
There is a HOP window that takes 5 times data amplification. We use an individual hop executor to expand the data in chunks but FlinkSQL does handle the time window in the aggregation executor which maybe does not need to expand the data physically. We might need to do operator fusion between hop and other operators.
However, after an offline discussion with @TennyZhuang , we are not sure how much the overhead is. Because the HopExecutor does not do so many the Physical copy( because we use the Arc
of ArrayImpl
)
There is a HOP window that takes 5 times data amplification. We use an individual hop executor to expand the data in chunks but FlinkSQL does handle the time window in the aggregation executor which maybe does not need to expand the data physically. We might need to do operator fusion between hop and other operators. However, after an offline discussion with @TennyZhuang , we are not sure how much the overhead is. Because the HopExecutor does not do so many the Physical copy( because we use the
Arc
ofArrayImpl
)
Will Flink also accelerate computation by reusing the aggregation results between different windows?
Will Flink also accelerate computation by reusing the aggregation results between different windows?
A kind of 2-phase Agg optimization? :hot_face: Before:
SELECT
count(*) AS num,
window_start AS starttime_c
FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY bid.auction, window_start
After:
with partial_agg as (
SELECT
sum(*) AS partial_num,
window_start AS tumble_window_start
FROM TUMBLE(bid, bid.date_time, INTERVAL '2' SECOND)
GROUP BY bid.auction, window_start
)
SELECT
sum(partial_num) AS num,
window_start AS starttime_c
FROM HOP(partial_agg, tumble_window_start, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY bid.auction, window_start
c.c. @fuyufjh @chenzl25 @TennyZhuang @liurenjie1024 @BugenZhao
IIUC, flink has no such optimization since it introduces another state and exchange, and in fact the computing doesn't is not as heavy as accessing state. In streaming 2 phase agg works best when it has data skew, which is hard to predict. If there is no obvious data skew, simple increase aggregation parallelism is good enough.
since it introduces another state and exchange
the distribution will be the same(sharded by bid.auction
) if we adopt https://github.com/risingwavelabs/risingwave/issues/3255.
in fact the computing doesn't is not as heavy as accessing state
I think this optimization can reduce the accessing count of the state because we will do the operations on the tumble window agg before the data amplification because of the HopWindow.
Checkout the Flink code, I found that they also generate multiple rows.
The method replace
is evaluated lazily, so no actual data copy happened. We achieve the same goal by sharing the data columns using Arc
.
current plan after the common plan rewriting optimization.
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [auction, window_start, window_start#1], pk_conflict: "no check" }
└─StreamProject { exprs: [bid.auction, count, window_start, window_start] }
└─StreamFilter { predicate: (count >= max(count)) }
└─StreamHashJoin { type: Inner, predicate: window_start = window_start, output: all }
├─StreamExchange { dist: HashShard(window_start) }
| └─StreamProject { exprs: [bid.auction, count, window_start] }
| └─StreamShare { id = 8 }
| └─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count] }
| └─StreamExchange { dist: HashShard(bid.auction, window_start) }
| └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
| └─StreamFilter { predicate: IsNotNull(bid.date_time) }
| └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamProject { exprs: [max(count), window_start] }
└─StreamHashAgg { group_key: [window_start], aggs: [max(count), count] }
└─StreamExchange { dist: HashShard(window_start) }
└─StreamProject { exprs: [bid.auction, window_start, count] }
└─StreamShare { id = 8 }
└─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count] }
└─StreamExchange { dist: HashShard(bid.auction, window_start) }
└─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
└─StreamFilter { predicate: IsNotNull(bid.date_time) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Analyzation for the EMIT ON CLOSE plan's performance
EMIT ON CLOSE
property in the agg executor, it can produce the append-only stream which will make the following executor can process better. This optimization is extremely important in the second Agg because our updatable Max()
aggregator is much worse than the append-only version.EMIT ON WINDOW CLOSE
aggregation will reduce its outputting operations to the number of the time windows. Currently, we have done the batching with epoch( agg only emits the results when a barrier comes). But if a time window's data cross multiple epoch, the operations will amplify with the "number_of_acrossed_epoch" times. Current plan:
SourceExec
│
│
▼
HopExec
│
│
▼
Append-only AggExec Count()
group by
window_start, auction
Output Updatable Stream
│
│ ──────────────────┐
▼ │
AggExec Max() │
group by │
window_start │
Output Updatable Stream │
│ │
│ │
└───────────────────┘
JoinExec ▼
join key: window_start
(Ideal EMIT ON WINDOW CLOSE PLAN)
SourceExec
│
│
▼
HopExec
│
│
▼
Append-only AggExec Count()
group by
window_start, auction
EMIT ON WINDOW CLOSE
Output AppendOnlyStream
│
│ ─────────────┐
▼ │
Append-onlygAggExecx() │
group by │
window_start │
EMIT ON WINDOW CLOSE │
Output AppendOnlyStream │
│ │
│ │
▼──────────────┘
Append-onlyJoin
join key: window_start
the distribution will be the same(sharded by bid.auction) if we adopt https://github.com/risingwavelabs/risingwave/issues/3255.
Do you need to maintain states in first agg?
I think this optimization can reduce the accessing count of the state because we will do the operations on the tumble window agg before the data amplification because of the HopWindow.
True for second agg.
IIUC, flink agg executor has some optimization to buffer update before actually accessing rocksdb to avoid ser/de cost.
Query:
plan: