Eventual-Inc / Daft

Distributed DataFrame for Python designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
1.98k stars 131 forks source link

Do correct resource accounting if running multiple dataframe collections in the pyrunner #2493

Open samster25 opened 1 month ago

samster25 commented 1 month ago

failure case

iter1 = daft.read_parquet(...).iter_partitions()
iter2 = daft.read_parquet(...).iter_partitions()
jaychia commented 1 month ago

I think #2502 should fix most of this. However, there is still a potential problem though with multiple dataframe iterators are running in parallel.

Our current implementation doesn't place any hard limits on how many inflight partitions each generator can have running. That is to say, if we run it1 = df1.iter_partitions() alongside a it2 = df2.iter_partitions() and call next(it1); next(it2), we will most likely have a situation in which it1 will schedule a bunch of tasks (up till the limit of either exhausting "inflight resources", or when any partition task has available results), potentially taking up all of the "inflight resources" and starve it2.

To fix this, we need something like https://github.com/Eventual-Inc/Daft/pull/2279 to help us limit the number of in-flight tasks per iterator.