risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://www.risingwave.com/slack
Apache License 2.0
6.76k stars 557 forks source link

infer key dependency and prefetch state for downstream executor #686

Open lmatz opened 2 years ago

lmatz commented 2 years ago
statement ok
create table t (v1 int, v2 int, v3 int);
statement ok
explain create materialized view mv1 as 
select  c1, 
    count(*) 
    from 
    (select t1.v1 as c1, 
        t2.v2 as c2  
        from 
        t as t1 
        INNER JOIN 
        t as t2 
        ON t1.v1=t2.v1) 
group by c1;
----
RwStreamMaterialize(name=[mv1])
  RwStreamAgg(group=[{0}], agg#0=[COUNT()])
    RwStreamHashJoin(condition=[=($0, $2)], joinType=[inner])
      RwStreamExchange(distribution=[RwDistributionTrait{type=HASH_DISTRIBUTED, keys=[0]}], collation=[[]])
        RwStreamChain(all=[true], tableId=[0.0.17], primaryKeyIndices=[[1]], columnIds=[[0.0.17.0, 0.0.17.3]])
          RwStreamBatchPlan(table=[[main, t]], tableId=[0.0.17], primaryKeyIndices=[[1]], columnIds=[[0.0.17.0, 0.0.17.3]])
      RwStreamExchange(distribution=[RwDistributionTrait{type=HASH_DISTRIBUTED, keys=[0]}], collation=[[]])
        RwStreamChain(all=[true], tableId=[0.0.17], primaryKeyIndices=[[1]], columnIds=[[0.0.17.0, 0.0.17.3]])
          RwStreamBatchPlan(table=[[main, t]], tableId=[0.0.17], primaryKeyIndices=[[1]], columnIds=[[0.0.17.0, 0.0.17.3]])

In this case, Agg is downstream of HashJoin. The group-by-key of Agg happens to be the same as the join key of HashJoin. When a new input record comes from the left input of HashJoin, we fetch/probe the state of the right join side. At the same time, we can prefetch the state of Agg with respect to the same key:

  1. If HashJoin produces nothing, the prefetch does not help.
  2. if HashJoin does produce output, then ideally, the fetching of Agg's state is overlapped with the fetching of HashJoin's right side state.

Considering the latency of get from S3 is likely to dominate the overall e2e latency, the improvement could be substantial.

statement ok
explain create materialized view mv2 as
select  * 
    from 
    t as t1 
    INNER JOIN 
    (select t2.v1 as v1, 
        SUM(t2.v2) 
        from 
        t as t2 
        group by t2.v1) as t3 
    ON t1.v1 = t3.v1;
----
RwStreamMaterialize(name=[mv2])
  RwStreamHashJoin(condition=[=($0, $4)], joinType=[inner])
    RwStreamExchange(distribution=[RwDistributionTrait{type=HASH_DISTRIBUTED, keys=[0]}], collation=[[]])
      RwStreamChain(all=[true], tableId=[0.0.17], primaryKeyIndices=[[3]], columnIds=[[0.0.17.0, 0.0.17.1, 0.0.17.2, 0.0.17.3]])
        RwStreamBatchPlan(table=[[main, t]], tableId=[0.0.17], primaryKeyIndices=[[3]], columnIds=[[0.0.17.0, 0.0.17.1, 0.0.17.2, 0.0.17.3]])
    RwStreamExchange(distribution=[RwDistributionTrait{type=HASH_DISTRIBUTED, keys=[0]}], collation=[[]])
      RwStreamProject(v1=[$0], EXPR$1=[$2])
        RwStreamAgg(group=[{0}], agg#0=[COUNT()], EXPR$1=[SUM($1)])
          RwStreamExchange(distribution=[RwDistributionTrait{type=HASH_DISTRIBUTED, keys=[0]}], collation=[[]])
            RwStreamChain(all=[true], tableId=[0.0.17], primaryKeyIndices=[[2]], columnIds=[[0.0.17.0, 0.0.17.1, 0.0.17.3]])
              RwStreamBatchPlan(table=[[main, t]], tableId=[0.0.17], primaryKeyIndices=[[2]], columnIds=[[0.0.17.0, 0.0.17.1, 0.0.17.3]])

In this case, we have an Agg as the right child of HashJoin. (The Exchange between them should be optimized away as the right child has already been partitioned by key=[0] once before.)

Unlike the case above, the Agg in this case is upstream of HashJoin and must produce some outputs. Therefore, prefetching the state for HashJoin while fetching the state for Agg is very likely to be beneficial.


Just a thought. The scenarios may be quite limited in real workload though. Sort of like operator fusion but it only fuses the state access but not computation.

fuyufjh commented 2 years ago

Good point!

This can also be done by query optimization:

  1. Join -> Agg (-> means before) with the same group key and join key can be optimized into Agg -> Join (see also Eager Aggregation and Lazy Aggregation
  2. Agg -> Join with the same group key and join key can be optimized into a single special physical operator HashGroupJoin to reuse the hash table between HashAgg and HashJoin.

In this way, Agg and Join will be fused into one operator.

liurenjie1024 commented 2 years ago

Seems like group join optimization.

twocode commented 2 years ago

Very interesting. I wonder what kind of runtime support is needed to support "state prefetching". 🤔

lmatz commented 2 years ago

Very interesting. I wonder what kind of runtime support is needed to support "state prefetching". 🤔

If we target this the same key scenario, then query optimization is the way to go. We fuse two operators into one, essentially eliminating the need for prefetching as this one big fused operator would do fetching for the state of both operators before operator fusion. No additional runtime support is needed.

If we target some other scenarios that operator fusion cannot help and prefetching must be done at runtime., we can enable each executor not only to deal with StreamChunk and Barrier, but also a PrefetchHint. Then downstream executors both locally and remotely can be asked to prefetch its state in the same way. This may be a good solution engineering-wise but not performance-wise.

In terms of the storage, we may need to pin the prefetched state in the cache until the arrival of the StreamChunk to be processed, or some scan-resistant caching policy may also do the trick.

kwannoel commented 1 year ago

Good point!

This can also be done by query optimization:

1. `Join -> Agg` (`->` means before) with the same group key and join key can be optimized into `Agg -> Join` (see also [Eager Aggregation and Lazy Aggregation](https://www.vldb.org/conf/1995/P345.PDF)

2. `Agg -> Join` with the same group key and join key can be optimized into a single special physical operator `HashGroupJoin` to reuse the hash table between HashAgg and HashJoin.

In this way, Agg and Join will be fused into one operator.

I'm wondering if anyone is working on this at the moment? If not I'm interested in implementing this.

fuyufjh commented 1 year ago

Good point! This can also be done by query optimization:

1. `Join -> Agg` (`->` means before) with the same group key and join key can be optimized into `Agg -> Join` (see also [Eager Aggregation and Lazy Aggregation](https://www.vldb.org/conf/1995/P345.PDF)

2. `Agg -> Join` with the same group key and join key can be optimized into a single special physical operator `HashGroupJoin` to reuse the hash table between HashAgg and HashJoin.

In this way, Agg and Join will be fused into one operator.

I'm wondering if anyone is working on this at the moment? If not I'm interested in implementing this.

This might need some work and I would recommend to defer it until some real cases appear.