catalyst-cooperative / pudl-catalog

An Intake catalog for distributing open energy system data liberated by Catalyst Cooperative.
https://catalyst.coop/pudl/
MIT License
9 stars 2 forks source link

Use external metadata to improve filter/cache performance #4

Open zaneselvans opened 2 years ago

zaneselvans commented 2 years ago

Maybe this shouldn't be surprising, but when you query the whole collection of Parquet files with caching on, they all get downloaded, even if you're only reading data from a few of them, because as it is now you still need to access metadata inside the Parquet files to figure out which ones contain the data you're looking for.

This defeats some of the purpose of caching, since the first time you do a query/filter, you have to wait 10+ minutes for it all to download. Probably this wouldn't be an issue on cloud resources with 1-10Gb of network bandwidth, but it's a pain on our home network connections.

It looks like pyarrow supports _metadata sidecar files, which would hopefully speed up scanning the whole dataset considerably. But it also looks like it's tied to writing out a PyArrow dataset, rather than just a collection of files with the same schema in the same directory (which means all the columns are in all the files, and the schema applies simply to all of them without needing to munge around in the partitioning columns)

So far as I can tell, writing pandas_metadata into the parquet files (see #7) also requires using df.to_parquet() rather than a ParquetWriter directly or other methods for writing the dataframes out to parquet files, which is frustrating.

Using pd.read_parquet()

When using pd.read_parquet() reading data from a collection of remote parquet files using the gcs:// protocol takes twice as long as reading from a single parquet file, but no similar slowdown occurs locally:

# Select ~1% of the 800M rows in the dataset, from 6 of 1274 row groups: 
filters = [
    [('year', '=', 2019), ('state', '=', 'ID')],
    [('year', '=', 2019), ('state', '=', 'CO')],
    [('year', '=', 2019), ('state', '=', 'TX')],
    [('year', '=', 2020), ('state', '=', 'ID')],
    [('year', '=', 2020), ('state', '=', 'CO')],
    [('year', '=', 2020), ('state', '=', 'TX')]
]

single_file_local = pd.read_parquet("../data/hourly_emissions_epacems.parquet", filters=filters)
# CPU times: user 2.58 s, sys: 778 ms, total: 3.35 s
# Wall time: 2.23 s

multi_file_local = pd.read_parquet("../data/hourly_emissions_epacems", filters=filters)
# CPU times: user 4.57 s, sys: 1.01 s, total: 5.58 s
# Wall time: 2.67 s

single_file_remote = pd.read_parquet("gcs://catalyst.coop/intake/test/hourly_emissions_epacems.parquet", filters=filters)
# CPU times: user 5.33 s, sys: 1.22 s, total: 6.56 s
# Wall time: 25 s

multi_file_remote = pd.read_parquet("gcs://catalyst.coop/intake/test/hourly_emissions_epacems", filters=filters)
# CPU times: user 16.2 s, sys: 2.61 s, total: 18.8 s
# Wall time: 51.7 s

Using intake_parquet

Even ignoring the close to 12 minutes of apparent network transfer time, the same query only took 25 seconds with pd.read_parquet() and here it took 3 minutes. Really need to be able to toggle caching on and off before I can experiment here.


# Not sure giving it empty storage options had the effect of disabling caching.
# It seems to have re-downloaded the whole dataset and put it... where?
single_file_intake = pudl_cat.hourly_emissions_epacems(
    storage_options={}, filters=filters
).to_dask().compute()
# CPU times: user 2min 17s, sys: 44.2 s, total: 3min 1s
# Wall time: 14min 49s
zaneselvans commented 2 years ago

I'm pretty sure that both the slow querying of remote parquet data, and the ability to do per-file caching are tied to changing how the metadata is stored, and consolidating it in a sidecar file that's available alongside the parquet files themselves, so I'm putting both of those things in this issues and closing #8.