pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.19k stars 1.95k forks source link

Implementing iter_rows() on a LazyFrame #10683

Open svaningelgem opened 1 year ago

svaningelgem commented 1 year ago

Problem description

This way you could potentially completely stream a dataframe. Even when it's larger than life (/memory) itself.

yonil7 commented 1 year ago

iter_slices() is also supported only by DataFrame and not by LazyFrame - so it will be nice to have this function supported as well.

evbo commented 10 months ago

This is a great enhancement, and really wouldn't just be adding syntactic sugar over LazyFrame.slice?: https://docs.pola.rs/py-polars/html/reference/lazyframe/api/polars.LazyFrame.slice.html

For instance, to do this currently I think it just involves the following:

step = 3

lf = (
    pl.from_dicts([{"a": 1}, {"a": 2}, {"a": 3}, {"a": 4}, {"a": 5}])
    .lazy()
    .with_row_count()
)

count = lf.last().collect().item(0, "row_nr")

for offset in range(0, count + 1, step):
    micro_batch = lf.slice(offset, step).collect()
    print(micro_batch)
fdcastel commented 8 months ago

@evbo Why

range(0, count + 1, step)

instead of

range(0, count, step)

?

fdcastel commented 8 months ago

My attempt:

# Clone of DataFrame with LazyFrame support
def iter_slices(df: DataFrame | LazyFrame, n_rows: int = 10_000) -> Iterator[DataFrame | LazyFrame]:
    if isinstance(df, DataFrame):
        for offset in range(0, df.height, n_rows):
            yield df.slice(offset, n_rows)
    else:
        # Get row count for LazyFrame -- https://stackoverflow.com/a/75523731/33244
        row_count = df.select(pl.len()).collect().item()

        for offset in range(0, row_count, n_rows):
            yield df.slice(offset, n_rows).collect()
evbo commented 8 months ago

@fdcastel with_row_count (which I think now is deprecated) generates indices starting from 0 - so it's off-by-one

I'm unsure of performance implications of using len vs last. Do you have any insights?

douglas-raillard-arm commented 4 months ago

It currently does not work as LazyFrame.slice() only streams with offset=0. Anything else seems to load all the data in memory anyway. But I suppose if that is fixed, then this pure-python code should work without knowing the length upfront, as LazyFrame.slice() does not seem to have a problem being asked a bigger slice than what's left:

def iter_slices(df, batch_size):
    def get_batch(df, offset, batch_size):
        batch = df.slice(offset, batch_size)
        batch = batch.collect(streaming=True)
        return batch

    batch = get_batch(df, 0, batch_size)
    # Yield once even if we got passed an empty LazyFrame
    yield batch
    offset = len(batch)
    if offset:
        while True:
            batch = get_batch(df, offset, batch_size)
            len_ = len(batch)
            if len_:
                offset += len_
                yield batch
            else:
                break

for batch in iter_slices(df, 100):
    print(batch)

EDIT: added streaming=True to collect() call and renamed iter_batches() to iter_slices()

douglas-raillard-arm commented 3 months ago

So it looks like the snippet above is working (and has been working for a few releases at least. I initially forgot the streaming=True part. With it, it does run in constant memory.

EDIT: it however does not run in O(n) time. As the iterator progresses, it gets slower and slower, probably from having to scan from the beginning every time.

cmdlineluser commented 3 months ago

@douglas-raillard-arm I did notice a streaming slice was just added to the new streaming engine https://github.com/pola-rs/polars/pull/17451

EDIT: Please ignore.

Trying .collect(new_streaming=True) on 1.1.0 runs your example 10x faster in a simple test.

douglas-raillard-arm commented 3 months ago

@cmdlineluser That's interesting. I gave it a go but the reason it works faster is because everything is loaded upfront (or a lot). It almost instantly used all the memory on my machine.

So either way, it's not really usable yet considered the O(N^2) time complexity at best or eager loading at worst. I can't really think on how we could avoid having that O(N^2) issue without implementing the feature at a lower level. The Python version always has to "restart from the beginning" on the original LazyFrame and figure out where in the file is the slice data. The only way to avoid that would be to either:

  1. preserve the low-level state of the parser and resume from where it was left after the previous chunk (impossible from Python API)
  2. pre-compute some sort of array of file offsets that allow jumping straight to the data. This is also impossible to get from polars and feed back in AFAIK.
orlp commented 2 months ago

Hi, we've decided that we want functionality like this in the new streaming engine. Ideally an API that can iteratively return partial DataFrames as well as individual rows.

However it is blocked behind the new streaming engine, so don't hold your breath, it's going to take a while.