catalyst-cooperative / pudl

The Public Utility Data Liberation Project provides analysis-ready energy system data to climate advocates, researchers, policymakers, and journalists.
https://catalyst.coop/pudl
MIT License
467 stars 107 forks source link

Convert CEMS ETL to assets #2084

Closed bendnorman closed 1 year ago

bendnorman commented 1 year ago

Convert EPA CEMS ETL to yield software defined assets. There are couple ways we can use dagster concepts here:

bendnorman commented 1 year ago

I'm running into one pesky issue. When I run the most recent years of the ETL, the Ohio partition fails because the plant_id_eia = 55248 plant only exists in plant_entity_eia until 2018, but the plant id is still in the 2021 Ohio CEMS data. Running CEMS doesn't fail when you're working with the full pudl.sqlite database because the timezone is pulled from an older year.

Have other people run into this issue? Should we force people to have the full pudl.sqlite database before running any subset of EPA CEMS years? Should we flag this edge case in the CEMS pipeline?

zaneselvans commented 1 year ago

I think it's the current expectation that you'll have a full SQLite DB before running the CEMS ETL. It's a bit janky.

However, there's a small manually compiled file that contains a list of plant-years that you might be able to add this case to as a workaround. It's at src/pudl/package_data/epacems/additional_epacems_plants.csv

bendnorman commented 1 year ago

Got it. I added plant 55248 to additional_epacems_plants.csv so folks can run the entire fast ETL.

bendnorman commented 1 year ago

EPA CEMS is turning out to be a design stinker. There are a few ways to handle larger partitioned datasets with dagster, but none of them seem to work perfectly:

Options

Process all partitions in one asset

We could process all partitions in one asset, concatenate them and have an IO Manager write the combined data frame to a single parquet file. This is simple but would blow up memory because thousands of partitions would be held in memory. Also, the partition processing wouldn't be done in parallel.

Dynamic Graph

Dynamic graphs seem like a decent fit. Partitions are cleaned in separate processes and concatenated together. The transform op results can be written to individual parquet files with an IO Manager.

There are two downsides to this option. First accessing the partition name in the IO Manager is a bit hacky:

class PartitionedEpaCemsPandasParquetIOManager(PandasParquetIOManager):
    """An IO Manager that writes EPA CEMS partitions to individual parquet files."""

    def _get_path_without_extension(
        self, context: InputContext | OutputContext
    ) -> UPath:
        """Get partition name of op output."""
        if context.has_asset_key:
            # we are dealing with an asset
            raise NotImplementedError("This IO manager does not support assets.")
        else:
            # we are dealing with an op output
            # TODO (bendnorman): indexing to get step_key. Raise warning if we get
            # an unexpected identifier.
            # This regex is grabbing the partition name from the step_key.
            match = re.search(r"\[([A-Za-z0-9_]+)\]", context.get_identifier()[1])
            partition = match.group(1)
            context_path = [partition]

        return self._base_path.joinpath(*context_path)

    def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
        """Load a dataframe from parquet file."""
        with path.open("rb") as file:
            return pd.read_parquet(file)

Second, we encounter the same memory issues when concatenating all of the partitions together. The memory issues could be averted if we don't concat the dataframes and just produce parquet files for each partition.

Partitioned Asset

Using partitioned assets seems like the proper way to handle CEMS.

Pros:

Cons:

Op

Running our existing CEMS code in a single op might be the easiest option for now. We can specify the IO location but configure the op as I did for the ferc_to_sqlite ops. Downstream assets can access CEMS data using an IO Manager and SourceAsset.

Pros:

Cons:

bendnorman commented 1 year ago

I ended up creating a non-partitioned asset for hourly_emissions_epacems that doesn't return any data but writes the data within the asset instead of using an IO Manager. This isn't ideal because all partitions are processed sequentially and we can't take advantage of an IO Manager. Not being able to use the IO Manager isn't a huge issue. I created a epacems_io_manager IO Manager which allows assets to load the CEMS asset as a task data frame.

We should revisit using partitioned assets in the future because they seem like the proper way to handle large partitioned datasets with dagster. MultiPartitionsDefinition will eventually be stable and the new dagster dev command launches the daemon and dagit which will make it easier to manage.