dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
386 stars 71 forks source link

Implement windowing functions #43

Open nils-braun opened 4 years ago

nils-braun commented 4 years ago

So far, aggregation can only work without windowing (without the OVER keyword).

Unfortunately, I do (still) not know how to implement the very broad windowing techniques from SQL in dask.

beckernick commented 3 years ago

There's been a good chunk of effort put into an efficient hash-partitioning shuffle in Dask this year. Perhaps this could be leveraged here?

I could imagine parsing the relational algebra / query plan and mapping the func() OVER (PARTITION BY hash_cols ORDER BY col) AS ... to a custom task composed of several standard Dask operations. With the hash repartition, all unique rows corresponding to specific hash values in col would be wholly contained in the same partition, somewhat equivalent to if we collected each individual PARTITION BY partition into larger partitions. This should allow for within-group and within-partition operations for the functional logic, assuming we preserve ordering of rows within groups (implicitly or explicitly, depending on the groupby).

As a small example:

import pandas as pd
from pyspark.sql import SparkSession
import dask.dataframe as dd
from dask.datasets import timeseries

spark = SparkSession.builder \
    .master("local") \
    .getOrCreate()

time_df = timeseries(
    start='2000-01-01',
    end='2000-01-31',
    freq='30s'
)
time_df = time_df.persist()
time_pdf = time_df.compute()

sdf = spark.createDataFrame(time_pdf)
sdf.createOrReplaceTempView("time_df")
sdf.cache()

Rank (dense_rank in this case)

q = """
SELECT
    id,
    name,
    x,
    y,
    CAST(dense_rank() OVER (PARTITION BY name ORDER BY y ASC) AS DOUBLE) as rank
FROM
    time_df
"""

def rankfunc(df, keys, col, ascending):
    keys = [keys] if isinstance(keys, str) else keys
    col = [col] if isinstance(col, str) else col

    new = df.copy()
    new['rank'] = new.groupby(keys)[col].rank(method="dense", ascending=ascending)

    order_by_col_order = [True] * len(keys) + [ascending]
    new = new.sort_values(keys + col, ascending=order_by_col_order).reset_index(drop=True)
    return new

def sql_window_rank(df, partition_cols, order_by_col, ascending=False):
    new = df.copy()
    new = new.shuffle(on=partition_cols)
    new = new.map_partitions(rankfunc, partition_cols, order_by_col, ascending)
    return new

# Execute and Verify
spark_res = spark.sql(q)
dask_res = sql_window_rank(time_df, "name", "y", True)

dask_result = dask_res.compute()
spark_result = spark_res.toPandas()

sort_order = ["id", "name", "x", "y"]
equality = dask_result.sort_values(sort_order).reset_index(drop=True).equals(
    spark_result.sort_values(sort_order).reset_index(drop=True)
)
print(equality)
True

Lag (Lead would be shift(-1) instead)

q = """
SELECT
    id,
    name,
    x,
    y,
    lag(y) OVER (PARTITION BY name ORDER BY y DESC) as lagged
FROM
    time_df
"""

def lagfunc(df, keys, col, ascending):
    keys = [keys] if isinstance(keys, str) else keys
    col = [col] if isinstance(col, str) else col

    new = df.copy()
    order_by_col_order = [True] * len(keys) + [ascending]
    new = new.sort_values(keys + col, ascending=order_by_col_order).reset_index(drop=True)
    new['lagged'] = new.groupby(keys)[col].shift(1)
    return new

def sql_window_lag(df, partition_cols, order_by_col, ascending=False):
    new = df.copy()
    new = new.shuffle(on=partition_cols)
    new = new.map_partitions(lagfunc, partition_cols, order_by_col, ascending)
    return new

# Execute and Verify
spark_res = spark.sql(q)
dask_res = sql_window_lag(time_df, "name", "y", False)

dask_result = dask_res.compute()
spark_result = spark_res.toPandas()

sort_order = ["id", "name", "x", "y"]
equality = dask_result.sort_values(sort_order).reset_index(drop=True).equals(
    spark_result.sort_values(sort_order).reset_index(drop=True)
)
print(equality)
True
dwy904 commented 3 years ago

any update on this one?

nils-braun commented 3 years ago

Not so far from my side, unfortunately. Thanks to @beckernick for providing some code here! Just out of curiosity: is there a reason you choose shuffle and not just a "usual" groupby-apply (I am fine with both)

I have however still some difficulties in implementing this: imagine we have a SQL query like this:

SELECT 
    *, 
    ROW_NUMBER() OVER (PARTITION BY a) AS row_a, 
    ROW_NUMBER() OVER (PARTITION BY b) AS row_b
FROM df

In this case I would need to partition by a for the row_a column and by b for the row_b. Each of them is not a problem, but combining them is problematic: the partitioning divisions will not match (and the index in each partition neither). So my best guess here would be:

While that will give the correct result, there is a lot of shuffling involved (which is typically quite slow). Even worse, when there are two OVER calls involved, I might do this dance multiple times.

Does someone have a good idea of how to solve that?

nils-braun commented 3 years ago

Hi @dwy904, @beckernick! It took some time, but I finally managed to add a first implementation of windowing in dask-sql in the PR #157 It might not be optimal already, but at least it works in my tests. If you want, feel free to have a look and test it. I have started with only a few functions, more (e.g. lag and rank) can be added (if you would like to do a PR, that would be really cool! I am very happy to guide you through the process).

nils-braun commented 3 years ago

Here is the list of window operations and their implementation status: