ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.81k stars 222 forks source link

question: aggregate node should not be recalculating window, as input is windowed #752

Open zidaye opened 1 month ago

zidaye commented 1 month ago

I have a working logic as follows: after window connection is required, aggregation operations can be performed

CREATE TABLE test_table1 ( 
    `attacker_ip` STRING,
    `victim_ip` STRING,
    `vuln_type` STRING,
    `vuln_name` STRING,
    `attack_result_cd` STRING,
    `occur_time` Timestamp
) WITH (
    connector = 'kafka',
    format = 'json',
    type = 'source',
    bootstrap_servers = 'localhost:9092',
    topic = 'test_table1 ',
    'source.group_id' = 'arroyo',
    'source.read_mode' = 'read_committed',
    'source.offset' = 'group'
 );

CREATE TABLE test_table2( 
    `attacker_ip` STRING,
    `victim_ip` STRING,
    `vuln_type` STRING,
    `vuln_name` STRING,
    `attack_result_cd` STRING,
    `occur_time` Timestamp
) WITH (
     connector = 'kafka',
    format = 'json',
    type = 'source',
    bootstrap_servers = 'localhost:9092',
    topic = 'test_table2',
    'source.group_id' = 'arroyo',
    'source.read_mode' = 'read_committed',
    'source.offset' = 'group'
);

CREATE view union_data as  (select * from  test_table1 UNION all select * from test_table2);

CREATE view hit_data_result as (select  left_tb.window_time as window_time,  left_tb.attacker_ip as attacker_ip, left_tb.victim_ip as victim_ip, left_tb.attack_result_cd as attack_result_cd , left_tb.occur_time as occur_time from (select tumble(interval '10 second') as window_time, attacker_ip ,victim_ip,attack_result_cd, occur_time  from log_data  where vuln_type like '%type_A%' and  vuln_name like '%name_A%' and attack_result_cd != '4' group by 1, 2, 3, 4, 5) left_tb join (select tumble(interval '10 second') as window_time, attacker_ip ,victim_ip,attack_result_cd, occur_time from log_data where vuln_type like '%type_B%' or  vuln_name like '%name_B%' group by  1, 2, 3, 4, 5) right_tb  on  left_tb.attacker_ip = right_tb.attacker_ip and left_tb.victim_ip = right_tb.victim_ip)
;

select window_time, max(occur_time) as log_end_time, min(occur_time) as log_start_time, array_agg(attacker_ip) as attacker_ip_list, array_agg(victim_ip) as victim_ip_list, last_value(attack_result_cd) as attack_result_cd from hit_data_result group by 1 having count(*) >= 1 ;

An error occurs: Error during planning: aggregate node should not be recalculating window, as input is windowed.

How should I rewrite my sql? I am very much looking forward to your help.

mwylde commented 1 month ago

Thanks for reporting the issue. After investigating, this does seem to be a bug in the planner, which is not correctly identifying the window that should be aggregated over in the select statement. We'll take a look at this.