catalyst-cooperative / pudl

The Public Utility Data Liberation Project provides analysis-ready energy system data to climate advocates, researchers, policymakers, and journalists.
https://catalyst.coop/pudl
MIT License
482 stars 111 forks source link

Investigate internal Parquet file partitioning options #1012

Closed zaneselvans closed 2 years ago

zaneselvans commented 3 years ago

The EPA CEMS data is "only" a few GB when compressed and stored on disk as Parquet, and could easily be stored in a single file, if there were other ways to partition the data for efficient reading of selected states or years internally. PyArrow clearly has a lot of dataset partitioning options, but I'm not sure which if any of them other than "hive" style partitioning can be used seamlessly with Pandas. See also #924

Based on the Parquet Intake catalogs that Carbon Plan has published, it seems like Intake expects files named by partition, rather than the directory partitioning that's built in to the pd.to_parquet() method and PyArrow. (e.g. epacems-2019.pq). Can we still have fast access to individual year/state combinations if the data is aggregated into larger chunks? It seems like this is supposed to be possible, with the different columns and row-groups individually addressable within the Parquet format. But I'm not sure I understand how to ensure that row groups correspond to state/year combinations, or whether the performance difference is even worth the trouble.

To be able to split the data processing up into a bunch of independent tasks, it seems like we do still need to be writing to different files, but if the performance of a single large file is good enough, maybe what we do is write out to separate annual files (each of which is like 200MB) and then concatenate them all into a single Parquet file at the end for distribution / use. That way we wouldn't need to worry about the directory based partitioning.

zaneselvans commented 2 years ago

Okay, re-writing the EPA CEMS hourly dataset using pyarrow.parquet.ParquetWriter to append individual state-year row groups like this:

def epacems_one_file(
    years: List[int],
    input_dir: str,
    outfile: str,
    states: List[str] = pudl.metadata.enums.EPACEMS_STATES,
) -> None:
    schema = pa.parquet.read_table(
        source=input_dir,
        filters=year_state_filter(years=[2020], states=["ID"])
    ).schema

    pqwriter = pa.parquet.ParquetWriter(
        where=outfile, schema=schema, compression="snappy", version="2.6",
    )

    for year in years:
        for state in states:
            table = pa.parquet.read_table(
                source=input_dir, filters=year_state_filter(years=[year], states=[state]),
            )
            pqwriter.write_table(table)
            del table

    if pqwriter:
        pqwriter.close()

Allows the data to be read out quickly using DNF filters just like with the Hive partitioned data, but it lets you put the data in whatever arrangement of files in a directory that you want, including a single big file as above.

epacems_dir = pudl_settings["parquet_dir"] + "/epacems"

emissions_categories =  {
    "so2_mass_measurement_code": pd.CategoricalDtype(),
    "co2_mass_measurement_code": pd.CategoricalDtype(),
    "nox_mass_measurement_code": pd.CategoricalDtype(),
    "nox_rate_measurement_code": pd.CategoricalDtype(),
}

epacems_one_file(
    years=range(1995,2021),
    input_dir=epacems_dir,
    outfile="test-epacems-monolithic.pq",
)
epacems_monolithic_df = pd.read_parquet(
    "test-epacems-monolithic.pq",
    engine="pyarrow",
    filters=year_state_filter(years=[2019, 2020], states=["CO", "TX", "ID"]),
    use_nullable_dtypes=True,
).astype(emissions_categories)
epacems_monolithic_df.info(show_counts=True, memory_usage="deep")

It takes the same amount of time to read the data out of one big file as a bunch of little ones, so it seems like performance isn't a consideration, except insofar as we need to make sure each state-year row group is written individually.

However, if we're distributing the EPA CEMS processing task across multiple threads / CPUs / nodes, they can't all be writing into the same big Parquet file. So it might make sense to break up the work into (say) annual chunks each of which takes a couple of minutes, and then once they're done, combine all of those files into a single Parquet file for simple distribution. But it could also be 26 yearly files, or 1274 state-year files if that's better for some reason in the context of the Intake catalogs (e.g. if it wants to local caching per-file, then it would make sense to break the data down into state-year chunks so that those smaller chunks can be downloaded by users more conveniently in the background, and we don't have to pay more data egress fees than necessary).

There's something I don't understand going on with the schema. For some reason the emissions mass/rate codes are coming out as strings rather than categories, which doubles the size of the dataframes, and doesn't match the schema they should have been written with. There's also some difference between the Resource.to_pyarrow() schema, and the schema as it's read off of the disk above, maybe due to the hive partitioning? Or the fact that it's got pandas metadata embedded in it? Will have to investigate more, but that's not this issue.

zaneselvans commented 2 years ago

I think we know enough about how these partitions / row groups work to close this issue now and move on to #1495.