mansueto-institute / property-pipeline

Other
0 stars 0 forks source link

Incremental write of partitions to parquet directory #1

Open nmarchio opened 9 months ago

nmarchio commented 9 months ago

Figure out the correct procedure to perform incremental writes to partitioned parquet directory. This is necessary when working with files that exceed memory capabilities. Dask dataframe supports "appends" natively but polars documentation re-directs to pyarrow, which doesn't provide a very robust example. Note it is important to update metadata update upon writing a new partition.

Here is starter code that needs further testing:

root_path = Path('test.parquet')
parquet_file = Path('ValueHistory.parquet')

year_list = (pl.scan_parquet(parquet_file)
    .group_by('AssdYear')
    .agg([pl.count().alias('count')])).collect(streaming=True)

year_list = year_list.select(pl.col('AssdYear')).sort("AssdYear", descending = True)

metadata_collector = []

for i in year_list['AssdYear'].unique():

    print(i)
    print(metadata_collector)

    year = i
    query_table = (pl.scan_parquet(parquet_file)
            .filter(pl.col('AssdYear') == year)).collect(streaming=True).to_arrow()
    print('queried')

    part_dir = f"{root_path}/year={year}"
    os.makedirs(part_dir, exist_ok=True)

    pq.write_table(
        table = query_table,
        where = root_path / f"year={year}/data.parquet"
#,     metadata_collector=metadata_collector
)
    print('partition written')

#    metadata_collector[-1].set_file_path(f"year={year}/data.parquet")

#    pq.write_metadata(
#        schema = query_table.schema,
#        where = root_path / "_metadata",
#        metadata_collector=metadata_collector)
#    print('metadata updated')

Check the test.parquet result against the native polars write to dataset function:

query_table = pl.scan_parquet(parquet_file).collect(streaming=True)

pq.write_to_dataset(query_table, root_path='/Users/nm/Downloads/firstamerican/test2.parquet',
                    partition_cols=['AssdYear'],
                    use_legacy_dataset=False)
nmarchio commented 9 months ago

Also: Oddly the pl.read_parquet function works for reading while the pl.polars.scan_pyarrow_dataset() throws an error.

check = pl.read_parquet(source = '/Users/nm/Downloads/firstamerican/test.parquet',
                        use_pyarrow = True)
nmarchio commented 9 months ago

More examples:

# Pandas
df = pd.read_parquet(path = Path(file_dir) / f'_{name}.parquet', filters=[("foo", ">", 2)])
df.to_parquet(path=Path(file_dir) / f'_{name}.parquet', engine='auto', compression='snappy', partition_cols=['foo'])
# GeoPandas
gdf = gpd.read_parquet(path = Path(file_dir) / f'_{name}.parquet', memory_map = True, filters = [('AssdYear', 'in', [2021, 2020])])
gdf.to_parquet(Path(file_dir) / f'_{name}.parquet') 
# DaskDataframe (allows incremental builds)
root_path = Path('test.parquet')
parquet_file = Path('ValueHistory.parquet')

year_list = (pl.scan_parquet(parquet_file)
    .group_by('AssdYear')
    .agg([pl.count().alias('count')])).collect(streaming=True)

year_list = year_list.select(pl.col('AssdYear')).sort("AssdYear", descending = True)

for i in year_list['AssdYear'].unique():
    print(i)
    year = i
    query_table = (pl.scan_parquet(parquet_file)
            .filter(pl.col('AssdYear') == year)).collect(streaming=True).to_pandas()
    ddf = dd.from_pandas(data = query_table, npartitions = 1)
    dd.to_parquet(df = ddf, path = Path('/Users/nm/Downloads/firstamerican') / f'test3.parquet', engine='pyarrow', compression='snappy', append=True, ignore_divisions=True, partition_on = ['AssdYear'])