Closed bendnorman closed 1 year ago
To reiterate some of our goals with dagster:
My initial iterations of using dagster for PUDL mostly used ops and graphs. Wrapping pudl functions in these abstractions felt like we were adding an additional layer of complexity, see discussion in #1835. Wrapping functions in ops also didn't provide a clear way to persist interim, output, and analysis tables to the database. These issues lead me to dagster's other paradigm, software-defined assets.
Pros of software-defined assets:
My plan is to convert each pudl function that extracts or cleans a dataframe to an asset or multi_asset. For example, extraction functions will be wrapped in multi_assets and produce an asset for each raw dataframe. Most individual table transform functions will become assets because they can depend on multiple tables and produce a single asset. There will be no need for loading functions because IO Managers will handle the loading to storage (sqlite and parquet).
This refactor will touch everything and it will take a while! To avoid the new branch and dev from getting super out of sync I propose we apply these changes to two phases. The first phase will convert the ETL processes to assets, and the second phase will convert the output and analysis tables. This will make an already monster PR and a little less monster. I believe converting just the ETL initially won't be an issue because the PudlTable class is pretty distinct from the ETL.
Here are some rough steps for the first phase:
At each step, I'd love to get feedback to make sure I'm building the right stuff! Once people approve the EIA ETL changes, I'll start to update the test suite to accommodate the dagster changes.
Resource.from_id()
method to pass to the dtypes arg of the pd.DataFrame().to_sql()
method. apply_pudl_dtypes()
helper function which looks up a data frame’s dtypes in our metadata and converts the columns to the correct dtypes. This is great for reading the fundamental tables but the fields in our interim and output tables aren’t guaranteed to be in our metadata right now. We either have to add metadata for our output and interim tables or rely on pandas convert_dtypes()
function.
Our current dagster refactor plan doesn't describe how to persist interim tables and how to apply dagster concepts to the output and analysis layers.
Data Persistence questions
Is it possible for multiple processes to write to a sqlite file one at a time? If not, we'll need to create a postgres db most likely using docker.
How to persist interim data with schemas? The current metadata system needs all tables schemas to be created at the same time.
Will performance be an issue when we have smaller ops? There is overhead in launching a process for every op.
What is the proper way to use a database as an IO manager? Based on my understanding, the default IO Manager creates a storage directory for each run. How can we create a database for each run? Is this what we want?
If IO Managers aren't the best way to persist data to a database, what is a sensible way to write interim tables? A MaterializeAssets and a function that writes a table to a db? Is it sketchy for the interim table in the db and the dataframe to diverge? How should we manage the schemas for interim tables?
Output layer questions
Which output tables need CEMS data?
How can we give the output and analysis tables access to the ETL outputs? Sensors? Passing ETL op outputs to output ops (this might require merging everything into one job)?
How would SDA work with SQL views?