mansueto-institute / dejure-defacto-taxrates

Code to process First American data to generate property tax rates for the entire US.
MIT License
1 stars 1 forks source link

test speed improvements on agg functions by modifying scan_parquet() arguments and using collect_async() #4

Open claireboyd opened 8 months ago

claireboyd commented 8 months ago

In thinking through opportunities to improve the speed of the aggregation functions (e.g. rank) on large parquet files, two real opportunities emerged:

In a few tests using 1.1MB and 1.2GB sized parquet files, there were a few key takeaways (see all data collected from tests in the summary table):

The current implementation uses (collect(), parallel=’auto’, use_statistics=True, hive_partitioning=True). This would be most similar to the first row of the summary table with speed 762 (using this as the benchmark to create recommendations).

Here are recommendations of 3 tests to try based on the 1.2GB run:

claireboyd commented 8 months ago

@nmarchio Here's an example of how collect_async works (because it needs to be wrapped in a function & run by asyncio instead of piped into the process). The below code chunk uses the params for the second recommended test above:

INPUT_FILENAME = <filepath to parquet file>
PARAMS_test2 = {'parallel': "row_groups", 'use_statistics': True,'hive_partitioning': True}

async def test_collect_async(input_dir, **kwargs):
    return await (
        (pl.scan_parquet(Path(input_dir), low_memory=True, **kwargs)
        .with_columns([
            # REPLACE COL NAMES HERE FOR THE RELEVANT OPERATION
            (pl.col("AssdTotalValue").rank(method="random", descending = True, seed = 1).over(['SaleRecordingYear', "county_code"]).alias("highestvalbycountybyyear")),
        ])
        ).collect_async(streaming=True, simplify_expression=True)
    )

#Returns a DataFrame (not a LazyDataframe)
df = asyncio.run(test_collect_async(INPUT_FILENAME, **PARAMS_test2))