risingwavelabs / risingwave

SQL stream processing, analytics, and management. We decouple storage and compute to offer instant failover, dynamic scaling, speedy bootstrapping, and efficient joins.
https://www.risingwave.com/slack
Apache License 2.0
6.59k stars 538 forks source link

Proposal: A simple approach towards Stateless UDAF #16767

Open fuyufjh opened 1 month ago

fuyufjh commented 1 month ago

Is your feature request related to a problem? Please describe.

Share some ideas about UDAF during our discussion with @wangrunji0408

Stateless UDAF and Stateful UDAF are very different in both implementation and use cases

Describe the solution you'd like

We don’t really need to offer a framework for stateless UDAF. Instead, we can reuse the interface of UDF and accept an ARRAY as input argument.

For example, supposing the user have provided such a UDF:

def my_simple_udaf(values: list[int]):
    ...

or

class MyEvent:
    def __init__(self, ts, attr1, attr2):
        self.ts = ts
        self.attr1 = attr1
        self.attr2 = attr2

def my_fancy_udaf(rows: list[MyEvent]):
    ...

Then they can no only use it as normal UDFs but also as a UDAF in agg or window agg

select my_udaf(a, [b, ...]) from t
select my_udaf(a, [b, ...]) over (...) from t

A trivial approach is using array_agg() and then call the UDF with the result array. However, in this way, both the result array and the input rows are persisted, resulting in 2 copies of data in the state table. Besides, the array_agg() is written by users, which looks very unfriendly.

To make it more efficient and to make the syntax above just works, we need a special code path to handle these use cases.

Describe alternatives you've considered

No response

Additional context

No response

st1page commented 1 month ago

Stateless UDAF is as simple as a normal UDF, except that it accepts multiple rows at a time

Yes, but it doesn't mean that he has to physically do all the data commits in one call, which can be fatal when there is a lot of input data (maybe millions of rows?). I think the implementor of stateless UDAF comes in to complete this tradeoff, i.e. what data structure to buffer this data within the middle, or to COMPACT or intermediate computation ahead of time depending on the semantics of the aggregator.

In other words, in my mind, the implementor of stateless UDAF should implement these interfaces to achieve pipelined computation

It is still much simpler than Stateful UDAF because it does not need to

xxchan commented 1 month ago

I feel when talking about "aggregation", the incremental API is more intuitive (i.e., fn(Acc,T)->Acc, or the function you pass to fold).

TBO I've never seen such "batch API" before. Users may also feel strange that they need to iterate over the input on their own. And +1 that the performance may be unacceptable, as there's no intermediate result.

xxchan commented 1 month ago

If the performance of passing whole input at once is acceptable, I guess array_agg() persist twice might be also acceptable.


both the result array and the input rows are persisted, resulting in 2 copies of data in the state table.

Wait, I don't get this. Why 2 copies? I think only one copy for array_agg's state?

fuyufjh commented 1 month ago

In other words, in my mind, the implementor of stateless UDAF should implement these interfaces to achieve pipelined computation

  • init() -> void
  • accumulate(input) -> void
  • finish()->result

Keep in mind that the stateless UDAF needs to solve the cases that cannot be incremental. This might be more common than you imagine. Many streaming jobs were migrated from batch, where the complicated algorithms are freely used and were not designed for incremental computation at all.

For a textbook example, combined with a session window, a UDAF accepts tens of events (browsing history, clicks, impressions, etc.), execute some statistics regression, and then outputs a score of the likelihood of scam.

For these algorithms that can be made incremental, https://github.com/risingwavelabs/arrow-udf/pull/23 will solve it.

xxchan commented 1 month ago

https://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html Spark UDAF is also such “incremental” api. So I’m confused about what does “cannot be incremental” and “stateless” really mean. 🤔

xxchan commented 1 month ago

I just discussed with @st1page, and made some points clarified:

xxchan commented 1 month ago

Well, I think indeed some algorithm cannot be incremental. e.g., if you need to iterate over the whole inputs multiple times.

In this case, all computation is performed in finish, and BUF is just the whole input. Or is there any other ways to do it in batch frameworks?


Ohhh, I finally understand, the API @fuyufjh proposed is equivalent to only having finish().

So there are 3 cases:

  1. small BUF. This is OK for streaming
  2. large BUF, but can do incremental computation in accumulate. That's @st1page and me are thinking about. This is not good for streaming.
  3. extreme case of 2. large BUF and only compute in finish. That's the OP's case
fuyufjh commented 1 month ago

I guess @fuyufjh means OUT isn't always the same as BUF?

Yeah, of course. Rather than "isn't always", I tend to say "is almost never" for user-defined functions.

This is why @st1page proposed the slightly different API (return BUF vs return void, i.e., let user code manage state). I proposed this can be done by the computation framework via a flag and there's no need to distinguish the API for users.

I understand @st1page's concern, but that would be putting the cart before the horse. The users need to ingest a non-incremental algorithm here, and this is the requirement.

If the only concern is about the data mount, we can add some limitation actually, or using stream API, etc. Let's do it in the future if necessary.

I proposed this can be done by the computation framework via a flag and there's no need to distinguish the API for users.

Again, keep in mind the algorithm is not incremental. You are basically saying that users should manually create a buffer and put rows into it for each accumulate() call. This sounds stupid for both users and us.

fuyufjh commented 1 month ago

Besides, this idea doesn't conflict with https://github.com/risingwavelabs/arrow-udf/pull/23. I'd like to offer both to users and see their reactions.

xxchan commented 1 month ago

You are basically saying that users should manually create a buffer and put rows into it for each accumulate() call. This sounds stupid for both users and us.

Yes, but I'm curious how users of batch frameworks like Spark do it.. Because that seems to be the only way from the UDAF API.