catalyst-cooperative / pudl

The Public Utility Data Liberation Project provides analysis-ready energy system data to climate advocates, researchers, policymakers, and journalists.
https://catalyst.coop/pudl
MIT License
456 stars 106 forks source link

Prototype CEMS pipeline using Dagster #1432

Closed bendnorman closed 2 years ago

bendnorman commented 2 years ago

Based on our discussion in #1406, we have decided to prototype a Dagster version of our CEMS pipeline.

There are a number of features I'd like to test out:

Ideally, I keep this to one sprint. We should have a hard cutoff at 2 sprints and discuss what we've found.

Anything else @zaneselvans?

zaneselvans commented 2 years ago

I wasn't really aware of the asset lineage or re-run on code changes features.

I'm not really understanding how the re-run on changes works... like how can it keep track of all the potential upstream code changes? What if there's other code (not data flow) dependencies that have changed? It also seems like a feature that would be most appropriate for a big data warehouse context, where lots of versions can be persisted and accessed easily. Is that a model of usage that we're anticipating? Could still be useful in development though, to speed up re-running only those ops that you've modified and their descendants.

I'm also definitely not understanding the gist of the asset lineage feature. How is it different from the inferred dependencies between ops? Is it that the asset which is being persisted is also included in the chain of dependencies, allowing upstream ops to be skipped if the asset is present?

bendnorman commented 2 years ago

I highly recommend you read the Dagster Docs before diving into my findings, . I found the tutorial and concepts sections helpful. The docs are well written and should only take 15 - 20 min to read. Having other folks read the docs will help us understand Dagster better and will provide context for my findings.

I will cover some background on Dagster, my review of various features, and how to refactor PUDL using Dagster.

Dagster Background

Airflow is the most established orchestration tool. It has a high barrier to entry but is a well-proven open-source tool.

Prefect and Dagster are two new tools ascending the open-source data stack world with other popular tools like Airbyte and DBT. In the last five years, a bunch of additional orchestration tools have popped up to address the shortcomings of Airflow. This post discusses the failures of Airflow and the high-level differences between Prefect, Dagster, and Prefect.

Prefect is more popular than Dagster, but both have growing communities and codebases. We have nearly completed refactor of PUDL using Prefect but wanted to explore Dagster before fully committing to Prefect. Both tools make it easy to turn existing python code into a DAG and parallelize data processing. We are intrigued by Dagster because it seems to care more about the data moving through the DAG. It provides useful tools for validating DataFrames, tracking asset dependencies, rerunning parts of the DAG, and configuring resources. You can read more about our reasoning for exploring Dagster here.

The Prototype

We picked our EPA CEMS pipeline to convert to Dagster because it is decoupled from our other datasets, has clear ETL steps, and could benefit from parallelization.

I mostly did not change the ETL functions and just wrapped them in Dagster @op decorators. Ops are the smallest unit of work in dagster (similar to Prefect tasks). The ETL ops are linked together in pudl.dagster_etl.etl_epacems_dagster() which is a Dagster job.

“Jobs are the main unit of execution and monitoring in Dagster. The core of a job is a graph of ops - connected via data dependencies. Jobs can also contain resources and configuration, which determine some of the behavior of those ops.”

The etl_epacems_dagster job requires a couple of Dagster Resources.

“Resources provide a way to manage dependencies to external components and share implementations across multiple ops in a job.”

The EPA CEMS ETL requires a couple of resources that vary between environments and runs. I made Datastore and the pudl_settings Dagster Resources because they might differ by deployment environment. This allows us to change their configuration within the Dagit UI. The Dagster ops access the Resources using a context keyword.

The pipeline performs the same ETL steps for about 1,250 partitions. I create a new function pudl.extract.epacems.gather_partitions() that yields EpaCemsPartitions as Dagster DynamicOuts. This allows Dagster to process each partition in parallel. gather_partitions() is an op with a ConfigSchema, which allows you to specify which states and partitions you want to process via Dagit.

The ETL ops process each partition. The pudl.dagster_etl.load_epacems() op writes a DataFrame to a parquet file. The op also yields a Dagster AssetMaterialization.

“Asset" is Dagster's word for an entity, external to ops, that is mutated or created by an op. An asset might be a table in a database that an op appends to, an ML model in a model store that an op overwrites, or even a slack channel that an op writes messages to.”

Producing Dagster assets allows us to track intermediate and final ETL outputs in Dagster’s Asset Catalog. Each asset can have metadata associated with it, for example, the number of rows in the asset or the amount of time it took to process the asset.

That is the gist of how this prototype works. See the PR for instruction on how to run it.

Features

Implementing this prototype was a fairly pleasant experience. It took me about 15 hours to become familiar with the documentation and implement the prototype. The concepts feel clear, and there are many helpful examples. Dagster makes it easy to add helpful data engineering tools to an ETL.

Setup

The setup was straightforward. The UI and the core library are packaged together so you can execute DAGs via the UI in one command. In Prefect, you need to set up the UI (Prefect Server) separately; however, Orion will add the UI to the core.

I initially tried to add a job decorator to pudl.etl.etl_epacems() but ended up getting strange import errors saying PUDL sub-modules couldn’t be found or there were circular imports. Once I created pudl.dagster_etl, the errors went away.

Ops and Jobs

Dagster ops force you to be deliberate about the input and outputs of functions. An op only starts to execute once all of its inputs have been resolved. Inputs can be resolved in two ways: The upstream output that the input depends on has been successfully emitted and stored. The input was stubbed through config.

It felt like these rules guided my refactor. For example: pudl.extract.epacems.extract() has three parameters, states, years and datastore. They aren’t data output from another function and feel more like config or resources. I made state and years a ConfigSchema and Datastore a configurable resource.

If you’re an input, you’re either data or a configurable resource. This feels like a helpful paradigm, and I hope it applies nicely to other parts of our ETL.

Config

ConfigSchemas would probably replace our settings yaml files. I like how the UI provides configuration autocompletion and linting. You can easily access configuration using the context keyword in ops.

You can use Fields to specify default values, required fields, and descriptions. Ideally, we would do more robust config validation using Dagster tools as we do in our pudl.settings classes. This isn’t a huge issue because we can still use our settings classes to validate the config values inside of ops.

While ConfigSchemas are useful, they have their limitations. You can only configure ops, not jobs. Dagster requires you to specify the name of the op or resource in the configuration:

ops:
  gather_partitions:
    config:
      states:
        - all

This isn’t necessarily a bad thing, but it is a deviation from our current settings yaml structure.

I did enjoy working with the context keyword because it was clear what lives in it: resources and op configuration. Prefect is less specific about what prefect.context should contain and doesn’t provide schema validation.

Dagster_pandas

The dagster_pandas package provides tools for data validation, summary statistics, and dataframe serialization. To perform validation and report summary statistics, you need to create a DagsterType that specifies the dataframe schema and summary statistics. The DagsterType can then be used to set an op’s input datatype.

I created Resource.to_dagster() method to create dagster dataframe schemas from our metadata. I used this to validate the input of pudl.dagster_etl.load_epacems. It is a little silly to validate the dataframe right before we validate it again when we load it to a parquet file but a good test for Dagster’s pandas integration.

This integration sets Dagster apart from other orchestration tools like Prefect. I think it would be great to develop dataframe input schemas of intermediate steps, so data errors are caught early, and developers will better understand what data to expect at each step.

I do have a couple of critiques. It feels a little awkward to pack data validation and summary statistics into a DagsterType. This isn’t a huge problem, but it might be challenging to generate table schemas based on metadata for each op programmatically.

The summary statistics are helpful, but I would love to track the same stats for one asset over multiple ops. This could make it much easier to debug data errors.

Assets and Outputs

We could use Assets to save intermediate and final tables in the PUDL ETL. Dagster allows folks to track how assets move through ops and how Assets relate to one another.

The prototype creates one asset with many partitions. You can view the assets by clicking on the Assets tab in the upper right corner.

Dagster will be expanding on Assets 0.14.0 with Software Defined Assets. I haven’t had time to learn about them and how they could be used in PUDL.

Partitions

Dagster allows you to partition jobs but only for a single dimension. I think this is mostly used for running batch processing jobs on a schedule. There is no smooth way to use this feature for multidimensional partitions like state and year.

Performance

Processing all years of Idaho CEMS data using the default multiprocess_executor took about 4 minutes. This is much slower than our current ETL or the Prefect version, which makes me think I could be doing something wrong. I couldn’t run the ETL on the entire dataset because Dagster caches the outputs of all the steps (40+ GB of CEMS dataframes).

UI

The UI provides helpful information like the DAG, input and output types, op descriptions, Gantt charts, and an asset catalog. Also, it is trivial to rerun parts of the DAG using the UI.

Future Work

I love Dagster’s data-focused features like Assets and pandas integrations. To move forward with it we need to research the performance issue and develop a refactoring plan.

TODO: I’m still working on fleshing out a refactoring plan.

zaneselvans commented 2 years ago

Dagster makes it easy to add helpful data engineering tools to an ETL.

What were you thinking of there?

I like that there's an explicit pattern for nesting graphs, since this would let us group different sections of the overall ETL process logically. I'm sure this is possible in Prefect too, but it seemed kind of obscure. The way a graph with multiple outputs expects a dictionary also happens to map directly to our dictionaries-of-dataframes pattern. It seems like the ferc1 and EIA transform modules (eia860, eia861, eia923) would each be good candidates for conversion into sub-graphs, yielding multiple outputs (currently dicts of dfs).

It's unclear to me how tightly bound Dagster is to their web interface / UI. Do you get the sense that that's really the primary way it's intended to be used? Not that that would be bad (especially in our automated CI/CD context) but it would be very different from our current setup.

The use of globally unique keys for all of the inputs / outputs / assets seems like it could get hairy if we aren't intentional about it. We already have proto-names for these things, but it seems like some better defined naming conventions would be helpful.

While ConfigSchemas are useful, they have their limitations. You can only configure ops, not jobs.

It seemed like there was a facility for job-level configuration. Is that different from what you're talking about here?

It feels a little awkward to pack data validation and summary statistics into a DagsterType. This isn’t a huge problem, but it might be challenging to generate table schemas based on metadata for each op programmatically.

It seems like the detail with which we would be validating / defining the schema of the data being passed between ops would increase as we get further into the ETL process -- as we know more things that should be true about the data. Initially it would probably just be things like which columns are present and whether there are any rows, but by the end we'd have columns, number of rows, data types, and (in the data validation case) expectations about the contents of the columns and/or their relationships with each other. The more detailed expectations / metadata which pertain to the end of the ETL process are already largely available, while the less detailed information about the data at the beginning and intermediary steps might need to be compiled or made explicit. Could this be an incremental process? Where we have things to validate at some steps, or for only a subset of the columns in a dataframe, and we add more information over time?

Is it possible to do validations like "output df has the same number of rows and the same columns as the input df." I didn't see this built in but it seems like a common kind of check -- ensuring that some attribute isn't mutated over the course of the op, even if you don't know exactly what the value of the attribute is.

The summary statistics are helpful, but I would love to track the same stats for one asset over multiple ops.

Is this possible? Can the same asset be a consequence of multiple ops? Or does each new op / ETL step create a new asset? And is there a structure which links together those different assets as they change over the course of the ETL process? Is that what an "asset lineage" is? If a given asset with a unique asset key does get changed over time then it seems like there's another descriptive dimension like "stage" that's required to describe the expected status of an asset. Is that right? Does it happen in the asset metadata or somewhere else?

Dagster allows you to partition jobs but only for a single dimension.

As I commented over here I'm not sure the one dimensional partitioning is a bad thing. I think it might just be an inevitable outcome of data partitioning being messier in general than cartesian products of several possible dimensions.

I couldn’t run the ETL on the entire dataset because Dagster caches the outputs of all the steps (40+ GB of CEMS dataframes).

Whoa, that seems bad/weird/wrong. There must be some way to say "No really I'm done with this now." Part of what we need out of this framework is the ability to orchestrate a bunch of jobs within whatever the CPU / memory constraints of the system it's running on are.

You didn't implement any validation of the EPA CEMS dataframes did you? I could see that being a performance hit. But also it should be running on multiple cores.

bendnorman commented 2 years ago

I will have more on the performance issues soon.

bendnorman commented 2 years ago

Performance

I made some improvements to the pipeline performance. Each year, state tuple is a Dagster partition. This results in a Dagster run for each partition. I got the recommended QueuedRunCoordinator working which adds each partition run to a queue.

0.14.0 supports forkserver for starting sub-processes, reducing the sub-process launch time from 5-10 seconds to less than a second. Processing all 2020 data took about 5 minutes. So 24 years of data will take about 120 minutes which is slower than sequential execution. I haven't run all the years because of storage constraints on my computer. All the data is processed in about 60 min using the multithreaded prefect executor.

However, launching the initial process still takes about 9 seconds. Launching processes for each small partition might not make the most sense because the EPA CEMS pipeline is mostly IO bound. Dagster currently does not support in-process parallelism.

It will be interesting to see how Dagster performs with more complex DAGs like our EIA processing. Will there be a bunch of overhead? How exactly does Prefect spawn processes?

Update

The in_process_executor spawns a single process for each run. The initial process launch is instantaneous and there is no lag between each op because each op is executed within the process. I think this is a better option for the CEMS pipeline. 2020 data took about 3 minutes to process (apologies for these rough estimates, Dagster doesn't report backfill execution time). Concurrency is limited by the QueuedRunCoordinator. Increasing the max_concurrent_runs did not increase the performance.

bendnorman commented 2 years ago

We have decided to move forward with Dagster but have a few lingering questions:

Dagster Cloud has a Viewer Role but the user must be a part of the organization.

Yes, Resources use a ConfigSchema for parameterization. ConfigSchemas accept StringSources which can read from env vars.

We can use the Dagster CLI or run our ETL as a script.

bendnorman commented 2 years ago

We have decided to move forward with Dagster. See epic #1487 for the full reasoning.