pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.86k stars 1.92k forks source link

Sink parquet as non-terminal node in dag #13792

Open m00ngoose opened 9 months ago

m00ngoose commented 9 months ago

Description

I want to something like 1) Scan lots of parquets, do some processing 2) (Optionally) write rows to an output parquet for debugging 3) Aggregate the rows

Eg.

foo = pl.scan_parquet(...).select(...)
if (debugging):
     foo.sink_parquet("debugging.parquet")
return foo.group_by(...).agg(...)

but this would perform the input io and processing twice.

For some suitable implementation of sink_parquet2, I want to be able to write

foo = pl.scan_parquet(...).select(...)
if (debugging):
     foo = foo.sink_parquet2("debugging.parquet")
return foo.group_by(...).agg(...)

Not sure how to make sure the output io only happens once per .collect() / .collect_all(), is that a similar problem to as is solved by .cache()?

m00ngoose commented 9 months ago

You effectively have to collect all your work at the point where you write to parquet. You might as well do it explicitly, I suppose.

I don't understand. I thought the point of sink_parquet was that it doesn't have to load all the data into memory at once?

kszlim commented 9 months ago

I think this is kinda related to the concept of durable execution, enabling the intermediate stages in the compute graph to get dumped to disk and resumed from later

deanm0000 commented 9 months ago

When I think about non-terminal I think about making sink_parquet chainable so that it returns the lazyframe itself and doesn't write anything until collect time

so you'd write

foo = pl.scan_parquet(...).select(...)
if (debugging):
     foo = foo.sink_parquet("debugging.parquet", chain=True)
foo.group_by(...).agg(...).collect()

You'd have to tell it you want it to chain so that it knows not to start writing immediately, or make it a separate method but I'd favor the parameter approach personally.

Since sink is automatically and inherently the streaming engine, the optimizer is already turning off some of the normal optimizations that might logically preclude having an intermediate step. It doesn't turn off everything that is relevant though like predicate and projection pushdowns so either those have to be turned off or they have to be per section of the logical plan.

m00ngoose commented 8 months ago

Thinking a bit more, this is not the most general version of the api. Maybe you want to only write out a different transform of the data, eg. something semantically equivalent to

foo = pl.scan_parquet(...).select(...)
if (debugging):
    foo.select(...).sink_parquet(...)
return foo..group_by(...).agg(...)

The chain=True approach only works when you want to write out something precisely in the dag already. You might want to model it as a parquet sink is something collectable only for it's side effect

other_terminal_nodes = []
foo = pl.scan_parquet(...)
if debugging:
    other_terminal_nodes.append(foo.select(...).sink_parquet(...))
bar = foo.group_by(...).agg(...).collect(other_terminal_nodes=other_terminal_nodes)

But this is unpleasant as then you have to pass through this list of extra terminal nodes to the point of collection, rather than simply attaching it to the dag and not worrying further. Thus I think the api should be

foo = pl.scan_parquet(...).select(...)
if (debugging):
    foo.attach_for_side_effect_only().select(...).sink_parquet(...)
return foo.group_by(...).agg(...)

Where attach_for_side_effect_only forces its child dag nodes to be evaluated if the parent is.