ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.62k stars 202 forks source link

Support Joining windowed and non-windowed datasets #165

Open jacksonrnewhouse opened 1 year ago

jacksonrnewhouse commented 1 year ago

We'd like to support joins between a windowed aggregate and a non-windowed stream. This would allow us to run Nexmark Query 6. The Arroyo version of the query is below. Ideally we'd also inspect the WHERE clause in order to not keep data around after the window has passed, as the only data from B1 we care about is that within active windows.

WITH 
auction as (
    SELECT auction.category as category, 
        auction.datetime as datetime, 
        auction.expires as expires,
        auction.id as id 
    FROM nexmark where auction is not null),
bid as (
    SELECT bid.auction as auction,
        bid.bidder as bidder, 
        bid.extra as extra,
        bid.datetime as datetime,
         bid.price as price
    FROM nexmark  where bid is not null)

SELECT B.auction, B.price, B.bidder, B.dateTime, B.extra
from bid B
JOIN (
  SELECT MAX(B1.price) AS maxprice, tumble(INTERVAL '10' SECOND) as window
  FROM bid B1
  GROUP BY 2
) B1
ON B.price = B1.maxprice
WHERE B.dateTime BETWEEN B1.window.start_time AND B1.window.end_time;
edmondop commented 1 year ago

This is the DAG produced by the query

2023-08-31T22:56:30.082233Z  INFO arroyo_controller::compiler: digraph {
    0 [ label = "nexmark_0:Nexmark<10 eps>" ]
    1 [ label = "watermark_1:Watermark" ]
    2 [ label = "fused_9:expression<sql_fused<value_project,filter,value_project,value_project,value_project,key_project>:OptionalRecord>" ]
    3 [ label = "tumbling_window_two_phase_aggregator_3:TumblingWindowAggregator<TumblingWindow(10s)>" ]
    4 [ label = "fused_4:expression<sql_fused<filter,value_project,value_project,key_project>:OptionalRecord>" ]
    5 [ label = "join_pair_merge_2:expression<api_fused<join_merge,sql_fused<filter,value_project,value_project>>:OptionalRecord>" ]
    6 [ label = "sink_web_6:WebSink" ]
    7 [ label = "join_with_expiration_7:JoinWithExpiration<left_expire: 86400s, right_expire: 86400s, join_type: Inner>" ]
    8 [ label = "fused_8:expression<sql_fused<value_project,filter,value_project,value_project,value_project,key_project>:OptionalRecord>" ]
    0 -> 1 [ label = "() → arroyo_types :: nexmark :: Event" ]
    2 -> 7 [ label = "generated_struct_3484707784589493356 -left→ generated_struct_15490570238723732322" ]
    4 -> 7 [ label = "generated_struct_3484707784589493356 -right→ generated_struct_16745226037119149261" ]
    1 -> 8 [ label = "() → arroyo_types :: nexmark :: Event" ]
    7 -> 5 [ label = "generated_struct_3484707784589493356 → (generated_struct_15490570238723732322 , generated_struct_16745226037119149261)" ]
    5 -> 6 [ label = "() → generated_struct_6614185072020766535" ]
    8 -> 3 [ label = "generated_struct_17942395924573474124 ⤨ generated_struct_17349934388688571038" ]
    1 -> 2 [ label = "() → arroyo_types :: nexmark :: Event" ]
    3 -> 4 [ label = "generated_struct_17942395924573474124 → generated_struct_18028448699472152953" ]

No output is produced

edmondop commented 1 year ago
Screenshot 2023-08-31 at 3 58 33 PM