pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.26k stars 1.96k forks source link

Ability to append to an existing directory of parquet files with new partitions (mode=append) #18750

Open pascalwhoop opened 1 month ago

pascalwhoop commented 1 month ago

Description

Hey. Spark has mode=append for writing parquet files. This is kind of useful, it just adds more partitions to the folder of an existing dataset. Great for writing in batches across multiple runs.

How would you solve this in polars? I know adding data to an existing parquet file is a whole different game but just adding more files should be fairly OK no? I suspect, just not overwriting / deleting the whole existing folder structure should do the trick.

Edit

Digging into this, I realize there's a way already with partitioned data when the partition we write to is unique / always new (e.g. by generating a run_id column)

Polars writes parquet like this

                pa.parquet.write_to_dataset(
                    table=tbl,
                    root_path=file,
                    **(pyarrow_options or {}),
                )

and yarrow has default behavior overwrite_or_ignore

so it should just add more files and ignore the existing ones. Exactly what I was looking for. Will whip up quick example.

deanm0000 commented 1 month ago

It does seem nuts to me that it silently overwrites files https://github.com/pola-rs/polars/issues/18242.

cceyda commented 3 weeks ago

( version 1.7.1 & 1.9.0 ) I think maybe this is solved because I was able to append just fine (as long as it is a new partition, haven't tested existing partition behaviour).

Example code modified (from the linked issue):

df_a = pl.DataFrame(
    {
        'type': ['a','b'],
        'date': ['2024-08-15','2024-08-16'],
        'value': [68,70]
    }
)
df_a.write_parquet(f'./example_part.parquet', partition_by='date')
df_b = pl.DataFrame(
    {
        'type': ['a','b'],
        'date': ['2024-08-17','2024-08-18'],
        'value': [72,74]
    }
)
df_b.write_parquet(f'./example_part.parquet', partition_by='date')

pl.read_parquet(f'./example_part.parquet')

returns:


type | date | value
-- | -- | --
str | str | i64
"a" | "2024-08-15" | 68
"b" | "2024-08-16" | 70
"a" | "2024-08-17" | 72
"b" | "2024-08-18" | 74