Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.17k stars 145 forks source link

Window function support #2108

Open jaychia opened 5 months ago

jaychia commented 5 months ago

Is your feature request related to a problem? Please describe.

Windows functions: functions that are applied over windows of data. Here is a great illustration from DuckDB: image

Valid expressions that can be run over windows are:

  1. Aggregation expressions (e.g. sum(), mean())
  2. Window expressions (e.g. first(), last())

In this proposal, the Window API is similar to the PySpark API.

  1. We will create a new group of Expressions called WindowExpressions (similar to how we have "aggregation expressions")
  2. Unlike aggregation expressions however, WindowExpressions do not change the cardinality of the dataframe/partitions
  3. WindowExpressions also have attached context of the WindowSpec that they should run over
  4. WindowSpec consists of: a. (Non-optional, for now) partition_by: a column to partition the overall dataframe by - windows do not cross partition boundaries b. (Optional) order_by: an expression to order each partition by c. (Optional) row_between OR range_between: the granularity of each frame within each partition. Defaults to (START_PARTITION, END_PARTITION). row_between defines each frame by how many rows before/after the current row to consider as part of the frame (+1 row, -1 row). range_between defines a range of values that defines each frame (e.g. (-1 minute, +1 minute)).
  5. Aggregation Expressions can also be run over a WindowSpec. In this case, they would not change the cardinality of the data, and will automatically be "broadcasted" over all data in the window.

As an initial implementation, Daft will not support:

  1. WindowSpec without a partition_by ("global window specs")
  2. WindowExpressions without a WindowSpec ("window expressions over a global window spec")

Proposed API:

To mirror the duckdb simple examples:

-- generate a "row_number" column with containing incremental identifiers for each row
SELECT row_number() OVER () FROM sales;
-- generate a "row_number" column, by order of time
SELECT row_number() OVER (ORDER BY time) FROM sales;
-- generate a "row_number" column, by order of time partitioned by region
SELECT row_number() OVER (PARTITION BY region ORDER BY time) FROM sales;
-- compute the difference between the current amount, and the previous amount,
-- by order of time
SELECT amount - lag(amount) OVER (ORDER BY time) FROM sales;
-- compute the percentage of the total amount of sales per region for each row
SELECT amount / sum(amount) OVER (PARTITION BY region) FROM sales;
from daft.exprs import Function as F, Window

# generate a "row_number" column with containing incremental identifiers for each row
df.with_column("row_number", F.row_number())  # ERROR: Daft will not support this global WindowSpec

# generate a "row_number" column, by order of time
df.with_column("row_number", F.row_number().over(Window(sort_by=["time"]))) # ERROR: Daft will not support this global WindowSpec

# generate a "row_number" column, by order of time partitioned by region
df.with_column("row_number", F.row_number().over(Window(partition_by=["region"], sort_by=["time"])))

# compute the difference between the current amount, and the previous amount, by order of time
df.with_column("prev_amount", col("amount").lag(1)) \
    .with_column("amount_diff", col("amount") - col("prev_amount"))

# compute the percentage of the total amount of sales per region for each row
df \
    .with_column("region_sum", df["amount"].sum().over(Window(partition_by=["region"]))) \  # ERROR: This is a global window spec
    .with_column("pct_sales_per_region", col("amount") / col("region_sum"))

Here are the examples with frames:

DuckDB:

-- Row-based ranges
SELECT points,
    sum(points) OVER (
        ROWS BETWEEN 1 PRECEDING
                 AND 1 FOLLOWING) we
FROM results;

-- Value-based ranges
SELECT "Plant", "Date",
    avg("MWh") OVER (
        PARTITION BY "Plant"
        ORDER BY "Date" ASC
        RANGE BETWEEN INTERVAL 3 DAYS PRECEDING
                  AND INTERVAL 3 DAYS FOLLOWING)
        AS "MWh 7-day Moving Average"
FROM "Generation History"
ORDER BY 1, 2;
# ERROR: Daft won't support global window specs
df.select("points", col("points").sum().over(Window(rows_between=(-1, 1))))

# ERROR: Daft won't support global window specs
df.select("points", col("points").sum().over(Window(range_between=(datetime.timedelta(days=-3), datetime.timedelta(days=3))))))

See Also:

GitHunter0 commented 1 week ago

This kind of operations are essential.

polars rolling functions are a good reference too: https://docs.pola.rs/api/python/stable/search.html?q=rolling