pathwaycom / pathway

Python ETL framework for stream processing, real-time analytics, LLM pipelines, and RAG.
https://pathway.com
Other
2.84k stars 98 forks source link

[QUESTION] How to create accumulated batch for processing? #17

Open ilyanoskov opened 3 months ago

ilyanoskov commented 3 months ago

Hello,

I have a use-case where I need to process 10 million rows. First, I want to process 1M rows when they arrive, then I want to process 2M rows (1M previous + 1M new), then 3M rows, and so on in that order. How can I do it with Pathway?

I was able to do something like this with AsyncTransformer and accumulating the rows inside of it, but I find this solution clunky and perhaps there is more Pathway-like approach I could take here?

Thank you very much in advance for your help

dxtrous commented 3 months ago

Hey Ilia, Since you ask about the Pathway-like approach, the "mental model" of Pathway computations is that of reactive computation. The best known tool with a similar model is a spreadsheet (Excel) so I will use this analogy to answer.

In Excel, you would probably paste in the rows of your table into the spreadsheet, one under another. Excel would recompute all the cells that depend on them, as soon as it can. Pathway does the same in streaming mode, as soon as you connect your inputs to a connector.

From what I see, your question covers two separate elements:

  1. How to trigger recomputation for old rows each time 1M new rows arrive;
  2. (Potentially: how to force new rows to wait (be queued) into a batch of 1M rows precisely before processing them.

For point 1, there are two separate topics, what we would call in Excel AGGREGATES (like sum over rows) and VOLATILE functions (like API calls to a financial/currency API that changes its answers each time you call). I'm guessing your question is more about Volatiles but let's cover both.

Starting with AGGREGATES, Pathway recomputes aggregates whenever fresh data arrives. Like in SQL, the mechanism is based on "groupby", followed by "reduce". So, what you would write in Excel as summing column A, SUM(A:A), is written for a Pathway table t with column colA: t.groupby().reduce(sum=pw.reducers.sum(t.colA)) or equivalently t.reduce(sum=pw.reducers.sum(t.colA)). Other reducers include count, max, etc. For a list of available reducers, please see https://pathway.com/developers/api-docs/reducers. Two of these reducers are special in that they allow you to create your own function acting on all of the rows of the table: these are pw.reducers.tuple which returns an enormous Python tuple containing all the aggregated values from the columns (slow but a convenient escape hatch that lets you postprocess data in Python!), and pw.reducers.udf_reducer - the proper but more advanced way to do a custom reducer more efficiently (see: https://pathway.com/developers/user-guide/data-transformation/custom-reducers).

Now, moving on to VOLATILE functions. How to force the recompute of a VOLATILE function when a trigger event happens (e.g., every 1M new rows)? This one is a bit magical to handle - as it is in Excel! You will need to quantify the precise trigger that you want to activate the recompute, as a sort of global variable. You will then pass this variable as a parameter to your transformers to make sure results are recomputed each time this variable changes.

For example, consider a table t which currently has the following contents:

colA    | colB
Alice   | -1
Bob     |  1
Charlie |  2
David   |  4
Eve     |  4
Fred    |  7

And perform on it:

t_count = t.reduce(count=pw.reducers.count())

This creates a single-row table t_count with a single column count like this:

count
6

Next, divide this value by 1M rounding down:

t_count_1M = t_count.select (count_1M = t_count.count // 1_000_000)

to get:

count_1M
0

Finally, join this in with t to get an extra column in t:

t = t.join(t_count_1M).select(t.colA, t.colB, recompute_trigger = t_count_1M.count_1M)

And here is what t now looks like with its extra new column:

colA    | colB | recompute_trigger
Alice   | -1   | 0
Bob     | 1    | 0
Charlie | 2    | 0
David   | 4    | 0
Eve     | 4    | 0
Fred    | 7    | 0

The value of recompute_trigger will jump from 0 to 1 as soon as the number of rows of t exceeds 1M, then it will jump to 2 as soon as it exceeds 2M. Now, when you create your AsyncTransformer to call into your API, make sure to pass recompute_trigger as an extra parameter when doing the invoke, it in addition to the arguments it actually needs. It will be useless for the actual API call, but will trigger a refresh when it changes :-).

Then, for point 2 above: if you really need it (there are not so many use cases where you actually do need it, it's an artificial slow down), there is a way to achieve this with a queue using tumbling window mechanisms - if you do need this, I'll ask someone from the team to contribute the cleanest answer. In that case, please specify whether you need the computation to be done every 1M rows, precisely and always, or if you tolerate skipping over a batch of 1M and bundling it together with the next one if your system is not catching up (this may happen e.g. if you have 1M rows arriving every second).

Happy to clarify further if needed!

ilyanoskov commented 3 months ago

Hey @dxtrous, thank you very much for such a detailed answer! This is incredibly helpful, I have spent more than half of the week reading entire Pathway documentation and doing POC implementations to see what works best.

One thing I do not fully understand is how do we force AsyncTransformer to recompute things once 2M rows are available (and we have already processed the first 1M rows downstream)? I also need to do this for 2M, 3M, 4M rows, and so on. Do I define 10 different AsyncTransformers in this case? Is it possible to do it all with one AsyncTransformer? So there is essentially a need to accumulate the data for all years and then run the computation... I also do not see recompute_trigger flag anywhere in the docs for this?

Or is it possible to take the entire data available in t inside the AsyncTransformer when the invoke is called? I thought it only takes the latest row? Or can I reference t inside the AsyncTransformer as an external variable (I have not tried this yet)?

To add more context here, I actually need to compute things per year, which is not too different from the count of rows (I will just rewrite the logic to indicate that as soon as a new year appears in the rows - run calc). So this particular operation I am trying to achieve is more of a batch pattern, but then I will need streaming per row everywhere else in the system, so I am looking for a way to combine this.

Perhaps the second point is more applicable here? I do not think I will actually be getting 1M rows a second, or if I do, it's ok to wait for the system to catch up in this case I think.

Thank you very much in advance

dxtrous commented 3 months ago

Hey @ilyanoskov, thanks for the additional context!

Please disregard the discussion of "volatile" functions and triggers from my previous reply in that case.

Please note that AsyncTransformer is called row by row, for each row separately. It is very similar to apply or transform in pandas (but additionally, asynchronous). It would normally be used to call an external API, or just possibly, a heavy computation on GPU.

So, to pass multiple (say 1M) rows at one go into AsyncTransformer, you will first need to reduce these rows to one. If you have date or some other form of timestamp as a column in your data and want to group data by year, that's brilliant, it is likely to simplify the logic a lot. With this in mind, my understanding of what you need is the following (listing keywords) - hopefully it will be 1 longish line of code :-):

Would this make sense?

By the way, if you are looking for an introduction to Tumbling Windows as a concept, you can also take a look at this tutorial https://pathway.com/developers/showcases/suspicious_activity_tumbling_window/ to see if it resonates.

ilyanoskov commented 3 months ago

@dxtrous thank you very much ! I will try just that and report back :)

dxtrous commented 3 months ago

@ilyanoskov Yes do keep us posted! I missed one point / question - whether, say, in year 7, you would just want data from year 7 to go into the 7th model, or data from all years 1..7 to go into the 7th model. (If it's the latter, you will finally need to replace tumbling window by a sliding window of length 10 years and hop of 1 year. It's a tiny change, I'd do it at the end, once you are comfortable with tumbling windows working).

ilyanoskov commented 3 months ago

I was able to do this with the tuple reducer and the sliding window of length 10 years and hop of 1 year. Thank you!

@dxtrous do you by any chance know, how to prevent the sliding window from sliding beyond available data? I did the duration for 10 years, and for 2017 the window is looking into 2027 (and creating more rows). How do I stop at 2024?

dxtrous commented 3 months ago

do you by any chance know, how to prevent the sliding window from sliding beyond available data? I did the duration for 10 years, and for 2017 the window is looking into 2027 (and creating more rows). How do I stop at 2024?

This is a great question which applies to finite (bounded) data streams. @KamilPiechowiak would it make sense for us to envisage a scenario/flag in which an incomplete window having e.g. ExactlyOnceBehavior does not have compute triggered by end-of-data (is simply left out from results if data ends before the window ends)?

@ilyanoskov an (ugly) workaround is to filter out results larger than some given year (or max - 9 years) in post-processing of the windowing results.

KamilPiechowiak commented 3 months ago

@ilyanoskov an (ugly) workaround is to filter out results larger than some given year (or max - 9 years) in post-processing of the windowing results.

I'd say the workaround is not that ugly. If the windows that are filtered out are not a significant majority of all the windows, the performance overhead should be small. Syntax-wise, I think an additional filter is easier to understand than an additional parameter in windowby/window definition/window behavior.

@KamilPiechowiak would it make sense for us to envisage a scenario/flag in which an incomplete window having e.g. ExactlyOnceBehavior does not have compute triggered by end-of-data (is simply left out from results if data ends before the window ends)?

It makes things more complicated as entries need to be aware of the maximal entry in the stream. If they're not aware of it, we can trigger window production later (or never) but its state will be initialized anyway so performance-wise it'll be similar to filtering out unwanted windows. Once we optimize the performance of windows further we can think of such scenario but in the current implementation it wouldn't make much difference.