ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.07k stars 586 forks source link

feat(pyspark): support windowing functions in Pyspark backend #8847

Open chloeh13q opened 5 months ago

chloeh13q commented 5 months ago

Is your feature request related to a problem?

Windowing functions are not supported in Pyspark backend right now.

Describe the solution you'd like

I'd like to have support for windowing functions in the Pyspark backend: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#types-of-time-windows

The traditional SQL windowing with over() is not supported in Spark Structured Streaming (https://stackoverflow.com/questions/55591774/how-to-use-a-non-time-based-window-with-spark-data-streaming-structure)

What version of ibis are you running?

main

What backend(s) are you using, if any?

Pyspark

Code of Conduct

chloeh13q commented 5 months ago

I looked at streaming window aggregations for Flink, Spark, RW, Decodable, Materialize.

Note that Spark also has something called window functions, which corresponds to the over() syntax in Ibis. It's not supported in streaming.

Syntax for streaming windows in Spark

window_agg = session.sql("""
SELECT
    window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
    FROM streaming_df
    GROUP BY provinceId, window(createTime, '1 hour', '30 minutes')
""")

Spark supports three types of time windows: tumbling (fixed), sliding, and session.

We will address session windows separately because 1) it's not currently supported in Ibis' Flink backend and 2) they have different characteristics and use a different API in Spark.

Tumbling and sliding windows both use the window function in Spark. slide_duration is an optional param. The window function takes in both a timestamp column and a time window column. The windowing function produces window.start and window.end. It's also possible to call window_time on top of the window to get a timestamp representation of the time window, e.g.,

SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start);
+---+-------------------+-------------------+--------------------+---+
|  a|              start|                end| window_time(window)|cnt|
+---+-------------------+-------------------+--------------------+---+
| A1|2021-01-01 00:00:00|2021-01-01 00:05:00|2021-01-01 00:04:...|  2|
| A1|2021-01-01 00:05:00|2021-01-01 00:10:00|2021-01-01 00:09:...|  1|
| A2|2021-01-01 00:00:00|2021-01-01 00:05:00|2021-01-01 00:04:...|  1|
+---+-------------------+-------------------+--------------------+---+

Syntax in other streaming engines/databases

Flink SQL and Decodable SQL both use table-valued functions. RW uses tumble() and hop() functions. The result of a time window function is a table in which each row carries data for a time window. A time window function extends the schema of the original table with two new columns, window_start and window_end, which indicate the start and end of time windows respectively. Materialize has something called a date_bin function.

Proposal for implementation

Ibis has a window function but I think the semantics are too different. We also introduced a window_by method that returns a WindowedTable object when we implemented windowing TVFs in Flink.

In Flink, windowing TVFs return tables. Spark's window() function returns a single column. The output column will be a struct called 'window' by default with the nested columns 'start' and 'end', where 'start' and 'end' will be of pyspark.sql.types.TimestampType. You can use this column like you would any other column - it doesn't have to be combined with a groupby, e.g.,

streaming_df = streaming_df.withColumn(
    'window', 
    window(streaming_df.createTime, '1 hour', '30 minutes')
)

This outputs records like

{"createTime":"2024-04-10T20:55:07.629-07:00","orderId":1712783621,"payAmount":64630.64838013934,"payPlatform":0,"provinceId":3,"window":{"start":"2024-04-10T20:00:00.000-07:00","end":"2024-04-10T21:00:00.000-07:00"}}

Usage of the window_by method in Ibis:

expr = (
    simple_table.window_by(time_col=simple_table.i)
    .tumble(window_size=ibis.interval(minutes=15))
    .group_by(["window_start", "window_end", "g"])
    .aggregate(mean=_.d.mean())
)

We can have .window_by().tumble() just return a table with all the columns + an additional window column in Spark. Then group by behaves the same way. Basically deconstructing the query into two steps, i.e.,

# Dataframe API
streaming_df = streaming_df.withColumn(
    'window', 
    window(streaming_df.createTime, '1 minute', '30 seconds')
)

streaming_df = streaming_df.groupBy("window").count()

# equivalent SQL
streaming_df = session.sql("""
SELECT window, provinceId, sum(payAmount) FROM
  (SELECT *, window(createTime, '1 minute', '30 seconds') FROM streaming_df) AS t0
GROUP BY t0.window, t0.provinceId
""")

What we will need to do to implement this approach:

Note

chloeh13q commented 5 months ago

We had a discussion around this. There are two options that I have considered:

  1. Something along the lines of the TimestampBucket operation.

I think Spark's window() function is sort of similar to the .bucket() method in Ibis.

Example usage of bucket():

mean_by_bucket = (
     t.group_by(t.ts.bucket(minutes=5).name("bucket"))
     .agg(mean=_.val.mean())
     .order_by("bucket")
)

It feels like a streaming equivalent of the TimestampBucket op. But the buckets here are non-overlapping, i.e., it cannot express sliding windows. This is an alternative syntax for windowing that we may want to explore.

  1. Use the window_by API as I have mentioned above. We discussed whether this seems to be a reasonable abstraction, and an important part of the consideration there being "capturing user intent in a generic way and translating that to the backends", as opposed to the other way around. While it does make specific operations more expensive (particularly if the user just wants the windows column, due to the constraint here we will basically build an entire table, then select that column, which may be slower depending on how that query is being optimized underneath the hood), I think it's reasonable to assume that these these usage patterns being affected are rare (i.e., it's rare for the user to only want the window column as output, because that's basically generating a floor/ceiling and it doesn't make much sense if it's not connected to the rest of the data).

The conclusion was to potentially implement both in the Spark streaming backend, with window_by being more of just syntactic sugar but a more natural way for people coming from Spark/Flink/etc. But I also want to call out that the benefit of implementing bucket may be limited because it only support 1 out of the 4 window types.

Separately, I'm still debating on whether it makes sense for us to build the "group by" into the windowing, the way that it works in pandas. In Spark/Flink/other streaming engines, all that the window does is that it creates the time buckets. In order to actually do anything meaningful, you need to group by the time buckets. Arguably that seems obvious and perhaps a bit of boilerplate, but there is more flexibility, say, if you only want to group by the start of the window or the end of the window. On the contrary, a pandas-like API that does group by as a part of the windowing does not provide that flexibility.

I'm leaning towards avoiding making large changes to the window_by API right now and continue to collect feedback.

jcrist commented 4 months ago

We can have .window_by().tumble() just return a table with all the columns + an additional window column in Spark. Then group by behaves the same way.

This is what I would do, except I'd make it match the schema of the existing window_by operations. We can generate output tables with the same schema that flink generates by adding an extra projection to create the window_start/window_end/window_time columns. The window_by functionality as exists should map well enough onto how spark does things, and I'd expect/hope that their optimizer should elide creating the extra projected columns if they're not referenced later on, so I wouldn't expect coercing spark's output to match this schema to come with a large perf cost.

Make the 3 TVF ops more generic windowing ops, i.e., remove the TVF from the name

I don't think we necessarily need to rename them - conceptually they're still table-valued functions (take in a table and some options, return a table). Not opposed to renaming though.

Something along the lines of the TimestampBucket operation.

I added TimestampBucket originally to be able to implement window_by for other non-streaming backends (I still intend to finish this up). Sparks window is a superset of this functionality, so while we could implement bucket using spark's window, IMO we should aim towards supporting window_by in spark first.

chloeh13q commented 4 months ago

@jcrist Makes sense, I think we're on the same page. Just wondering: what is the motivation behind matching the schema of the existing window_by operations? I'm not opposed to it and I think it's one of the two options we have, but just wondering if what your considerations there are.

jcrist commented 4 months ago

Mainly I think we want to ensure the same semantics across backends. Since flink seems to be the most restrictive in what's required for downstream operations consuming window_by outputs (regarding usage of a time attribute column) it makes the most sense to me to stick with the current output schemas and do the bit of extra work to massage spark's output to be semantically consistent.

chloeh13q commented 4 months ago

It also seems that Flink's most strict about its timestamp datatype... It requires the watermark column to be timestamp type of precision 3, and returns window_start, window_end, window_time all with timestamp type of precision 3. I don't think Spark and RW distinguish between timestamps of different precision...

In Flink, window_time casts the original timecol so that it becomes timestamp(3). After windowing TVF, the original timecol becomes a regular timestamp column and window_time becomes the new time attribute field of the returned table. I'm a little hesitant about how we want to imitate this behavior in other backends - duplicating the original timecol twice?

chloeh13q commented 4 months ago

I was really close to getting this to work, but it turns out that you can't group by window.start and window.end separately in Spark because it loses the timestamp column... So we can't flatten the window column before groupby. The best we can do is something like

SELECT `t1`.`window`,
       `t1`.`provinceId`, 
       AVG(`t1`.`payAmount`) AS `avg_amount` FROM 
           (SELECT `__windowed`.createTime, 
                   `__windowed`.orderId, 
                   `__windowed`.payAmount, 
                   `__windowed`.payPlatform, 
                   `__windowed`.provinceId, 
                   `__windowed`.window,
                   `__windowed`.createTime AS `window_time` FROM 
                           (SELECT `t0`.*, WINDOW(`createTime`, '60 SECOND') FROM `payment_msg` AS `t0`) AS __windowed
            ) AS `t1` 
GROUP BY 1, 2

Should we just forgo trying to unify the output schema across backends?

I think a possible alternative is to combine window_by + group by + agg into a single API, with the caveat that users cannot use the output of the window TVF as a normal table. In that case it'd be easier for us to reconcile the differences across the underlying engines.

chloeh13q commented 4 months ago

We had a discussion this morning on possibly rethinking the API.

The open question was: does Spark allow windowing ops to be chained in such a way that, if we project the window column into separate columns, it would place constraints on downstream processing?

--

TL;DR:

Yes, Spark does allow chaining windowing ops, and, AFAIK, it only works with the window column and not a projection of it. I think what we'll need to do is to create some type of a "window column" concept that materialize differently depending on the backend.

--

I think an important gap/distinction here is the concept of a "time attribute column", which exists in Flink but does not exist in Spark.

Flink has this concept of "time attribute column", which is a special column defined for the table, sort of like a special attribute / property of the table.

Spark doesn't have this "time attribute column" concept. Instead, it requires that 1) the aggregation must have either the event-time column, or a window on the event-time column and 2) withWatermark must be called on the same column as the timestamp column used in the aggregate.

--

  1. Flink requires the window_time column to chain windowing ops.

The window_start and window_end columns are regular timestamp columns, not time attributes. Thus they can’t be used as time attributes in subsequent time-based operations. In order to propagate time attributes, you need to additionally add window_time column into GROUP BY clause. The window_time is the third column produced by Windowing TVFs which is a time attribute of the assigned window. Adding window_time into GROUP BY clause makes window_time also to be group key that can be selected. Then following queries can use this column for subsequent time-based operations, such as cascading window aggregations and Window TopN.

  1. Spark's window function can take in both a timestamp column and a time window column. So even though the new window column is not a "time attribute column" in Spark, it is a valid input into the window() function which allows chaining windowing ops.

I manually validated the following two queries that should compute the same results

# a) redefine watermark
v0 = session.sql("""
SELECT
    window.start AS window_start, window.end AS window_end, provinceId, avg(payAmount) as avgPayAmount
    FROM payment_msg
    GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
""")
v0.withWatermark("window_start", "1 second").createOrReplaceTempView("v0")
streaming_df = session.sql("""
SELECT window, provinceId, avg(avgPayAmount) as avgPayAmount
    FROM v0
    GROUP BY provinceId, window(window_start, '2 minute')
""")

# b) chained aggregation
session.sql("""
CREATE TEMPORARY VIEW v0 AS SELECT window AS w0, provinceId, avg(payAmount) as avgPayAmount
    FROM payment_msg
    GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
""")
streaming_df = session.sql("""
SELECT window, provinceId, avg(avgPayAmount) as avgPayAmount
    FROM v0
    GROUP BY provinceId, window(w0, '2 minute')
""")

Both execute, but a) doesn't write anything into the sink (which is weird, I feel like it should work semantically at least). But, regardless, because Spark doesn't support defining watermarks in SQL, we or the user would have to use the DataFrame API in order to define the watermark, which would add another layer of complexity if we want to go this route...

chloeh13q commented 4 months ago

I'm thinking about implementing the following API. It's inspired somewhat by groupby-aggregates.

t = ibis.table(
    ibis.schema(
        {
            "createTime": "timestamp(3)",
            "orderId": "int64",
            "payAmount": "float64",
            "payPlatform": "int32",
            "provinceId": "int32",
        }
    ),
    name="payment_msg",
)
expr = t.window_by(
    window_type="tumble", 
    time_col="createTime", 
    window_size=ibis.interval(seconds=30)
).agg(
    by=["provinceId"], 
    avgPayAmount=_.payAmount.mean()
)

What I realized that did not work in the first iteration is, in chained windowed aggregations, only the first windowby-aggregate needs the user to provide a time_col. We need to internally manage the time col for all subsequent windowby-aggregations because the time_col is no longer a field in the original table. I did consider alternatively using a builder pattern but I'm not sure if that fits super well with existing Ibis API, so I am thinking about just making the time_col parameter optional. So a chained windowed aggregation would look like the follows:

expr = t.window_by(
    window_type="tumble", 
    time_col="createTime", 
    window_size=ibis.interval(seconds=30)
).agg(
    by=["provinceId"], 
    avgPayAmount=_.payAmount.mean()
)
expr.window_by(
    window_type="tumble", 
    window_size=ibis.interval(minutes=30)
).agg(
    by=["provinceId"], 
    avgPayAmount=_.avgPayAmount.mean()
)

I think there are some complexities around how a query would look like if the two chained windowby aggregations are not immediately next to each other. I think we should limit the things we support right now, probably starting with just window aggregation (group-by aggregation and global aggregation), window top-n, window join (I'm tempted to drop this in the first iteration too), consecutive chained window aggregation.

# group by aggregate
t.window(
    window_type="tumble", 
    time_col="createTime", 
    window_size=ibis.interval(seconds=30)
).agg(
    by=["provinceId"], 
    avgPayAmount=_.payAmount.mean()
)

# global aggregate
t.window(
    window_type="tumble", 
    time_col="createTime", 
    window_size=ibis.interval(seconds=30)
).agg(
    avgPayAmount=_.payAmount.mean()
)

# window topn
t.window(
    window_type="tumble", 
    time_col="createTime", 
    window_size=ibis.interval(seconds=30)
).topn()
chloeh13q commented 3 months ago

@jcrist and I discussed a few different options for APIs.

  1. We decided to drop chained window aggregations for now, because they're less common.
  2. We're going to focus on supporting window aggregation (groupby and global), topk, distinct, and join for tumble and hop windows.
  3. window_by will return an intermediate object similar to GroupedTable that holds information about the windowing semantics.
  4. Aggregation results will contain window_start, window_end, and the aggregation metrics.

Example:

t = ibis.table({"time": "timestamp(3)", "a": "int", "b": "int", "c": "int"})

r1 = t.window_by("time").tumble(size="30 seconds")
r1.agg(...)
r1.group_by(...).agg(...)
r1.topk(...)
r1.distinct(...)

Window joins are a little more complex because it involves two WindowedTable. We could have something like an as_table() method, with usage as follows:

r1.as_table().columns == ["window_start", "window_end", *t.columns]
r1.as_table().join(r2.as_table(), ("a", "b"))