pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.12k stars 1.83k forks source link

DeltaLake I/O sometimes shuffles the DataFrame #9635

Open danielgafni opened 1 year ago

danielgafni commented 1 year ago

Polars version checks

Issue description

Sometimes the rows get shuffled when writing a DataFrame and reading it with deltalake:

image

Link to CI logs

Reproducible example

import shutil

import polars as pl
import polars.testing as pl_testing
from _pytest.tmpdir import TempPathFactory
from hypothesis import given, settings
from polars.testing.parametric import dataframes

# TODO: remove pl.Time once it's supported
# TODO: remove pl.Duration pl.Duration once it's supported
# https://github.com/pola-rs/polars/issues/9631
# TODO: remove UInt types once they are fixed:
#  https://github.com/pola-rs/polars/issues/9627
@given(
    df=dataframes(
        excluded_dtypes=[
            pl.Categorical,
            pl.Duration,
            pl.Time,
            pl.UInt8,
            pl.UInt16,
            pl.UInt32,
            pl.UInt64,
            pl.Datetime("ns", None),
        ],
        min_size=5,
        allow_infinities=False,
    )
)
@settings(max_examples=500, deadline=None)
def test_polars_delta_io(df: pl.DataFrame, tmp_path_factory: TempPathFactory):
    tmp_path = tmp_path_factory.mktemp("data")
    df.write_delta(str(tmp_path))
    pl_testing.assert_frame_equal(df, pl.read_delta(str(tmp_path)))
    shutil.rmtree(str(tmp_path))  # cleanup manually because of hypothesis

Expected behavior

This should not happen

Installed versions

I modified this manually since it's happening in my [GitHub Actions CI](https://github.com/danielgafni/dagster-polars/actions/runs/5422842774). It's sometimes happening with polars 0.18.2. It's always happening on Mac OS. ``` --------Version info--------- Polars: 0.17.15, 0.18.2 Index type: UInt32 Platform: Ubuntu, MacOS Python: 3.9-3.11 ----Optional dependencies---- numpy: 1.24.3 pandas: 2.0.2 pyarrow: 12.0.0 connectorx: deltalake: 0.10.0 fsspec: 2023.5.0 matplotlib: xlsx2csv: xlsxwriter: ```
stinodego commented 1 year ago

@chitralverma Could you take a look at this one?

chitralverma commented 1 year ago

sure, but I'm on leave at the moment so it may take some time.

my guess is that this is because of the parallel readers.

danielgafni commented 1 year ago

Interesting. Just to clarify, the same test passes with Parquet.

danielgafni commented 1 year ago

I tried adding a 0.5s sleep, but it didn't really help (see CI in https://github.com/danielgafni/dagster-polars/pull/10)

danielgafni commented 1 year ago

Hey @chitralverma, any idea on what's going on here? Even a 0.5s sleep doesn't help. Is it normal?

P.S. Please excuse me if this ping is annoying, my guess was you might have forgotten about this issue because of your leave

chitralverma commented 1 year ago

Hi @danielgafni , I tried reproducing this but can't. can you please post again a minimum reproducible example with just the polars code?

also were you able to reproduce this if you don't use polars at all and just use delta-rs directly?

danielgafni commented 1 year ago

Hey, actually I can't reproduce it without the hypothesis.given decorator. This test should be logically equivalent to the one I initially provided, but it passes with no errors:

import shutil

import polars as pl
import polars.testing as pl_testing
from _pytest.tmpdir import TempPathFactory
from polars.testing.parametric import dataframes

strategy = dataframes(
    excluded_dtypes=[
        pl.Categorical,
        pl.Duration,
        pl.Time,
        pl.UInt8,
        pl.UInt16,
        pl.UInt32,
        pl.UInt64,
        pl.Datetime("ns", None),
    ],
    min_size=5,
    allow_infinities=False,
)

def test_polars_delta_io(tmp_path_factory: TempPathFactory):
    for i in range(500):
        tmp_path = tmp_path_factory.mktemp("data")
        df = strategy.example()
        assert isinstance(df, pl.DataFrame)

        df.write_delta(str(tmp_path))
        pl_testing.assert_frame_equal(df, pl.read_delta(str(tmp_path)))
        shutil.rmtree(str(tmp_path))

As I know, hypothesis doesn't parallelize tests execution, so this is surprising to me.

It seems like the issue is on the hypothesis side tho?

danielgafni commented 1 year ago

Still happening in my CI even after I added a 0.1s sleep between tests. It's rare but it happens.