timeplus-io / proton

A streaming SQL engine, a fast and lightweight alternative to ksqlDB and Apache Flink, 🚀 powered by ClickHouse.
https://timeplus.com
Apache License 2.0
1.52k stars 65 forks source link

Shuffle and substream #460

Open chenziliang opened 9 months ago

chenziliang commented 9 months ago

Describe what's wrong

We shall not use substream to do aggregation when we do light shuffle since substream aggr is more expensive than regular aggr

timeplus :) explain pipeline SELECT 
              ip_prefix, count() AS count
            FROM 
              (
              SELECT 
                ipv4_cidr_to_range(ip, 28).1 AS ip_prefix, bit_shift_right(ip, 24) AS shuffle_key
              FROM 
                ip_rand
            )
            SHUFFLE BY 
              shuffle_key
            GROUP BY 
              ip_prefix;

EXPLAIN PIPELINE
SELECT
  ip_prefix, count() AS count
FROM
  (
    SELECT
      ipv4_cidr_to_range(ip, 28).1 AS ip_prefix, bit_shift_right(ip, 24) AS shuffle_key
    FROM
      ip_rand
  )
SHUFFLE BY
  shuffle_key
GROUP BY
  ip_prefix

Query id: 34c8c2a1-7b03-4463-a688-3093f2e18c96

┌─explain────────────────────────────────────────┐
│ (Expression)                                   │
│ ExpressionTransform × 32                       │
│   (StreamingAggregatingWithSubstream)          │
│   GlobalAggregatingTransformWithSubstream × 32 │
│     (Expression)                               │
│     ExpressionTransform × 32                   │
│       (LightShufflingStep)                     │
│       LightShufflingTransform 1 → 32           │
│         (WatermarkStep)                        │
│         WatermarkStamperTransform              │
│           (Expression)                         │
│           ExpressionTransform                  │
│             (ReadFromStorage)                  │
│             Random 0 → 1                       │
└────────────────────────────────────────────────┘

14 rows in set. Elapsed: 0.012 sec. 

timeplus :) show create ip_rand

SHOW CREATE STREAM ip_rand

Query id: 800c973f-4e80-4c43-90a5-f7b511b874af

┌─statement─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE RANDOM STREAM default.ip_rand
(
  `ip` ipv4,
  `_tp_time` datetime64(3, 'UTC') DEFAULT now64(3, 'UTC') CODEC(DoubleDelta, LZ4),
  INDEX _tp_time_index _tp_time TYPE minmax GRANULARITY 2
)
SETTINGS eps = 0 │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

1 row in set. Elapsed: 0.001 sec. 

How to reproduce

Error message and/or stacktrace

Additional context

yl-lisen commented 9 months ago

In fact, for shuffle by , the GlobalAggregatingTransformWithSubstream's behavior is same with regular aggr, there is not more substreams (just one)

chenziliang commented 9 months ago

Then do we need use WithSubtream ?