Open zschira opened 2 years ago
Given the need to integrate a whole second stream of data extraction and transformation into the FERC 1 process here, and the fact that we have wanted to disentangle some of the extraction vs. transformation code (deferring some complicated steps until later in the process) it might be worth thinking about those tasks together. Rather than shoehorning all of the XBRL data into the existing pipeline, and then subsequently disassembling the now even more complicated pipeline.
It seems like after the data is in the PUDL DB, there will be another layer of work that has to happen to integrate it with the FERC-EIA record linkage (and any other downstream analysis), since some of the old columns (record_id
) won't exist any more.
There are 4 combinations of data source / existing code. In order we want to tackle them:
Transform step design thoughts:
execute()
or coordinating method that has standard cleaning/renaming/reshaping/encode/etc steps.Comments on Dagster:
I don't it is possible for part of our ETL to be implemented using Dagster. If the new FERC data was going to be an entirely separate pipeline then I'd say we should use dagster but given it will be intertwined with our existing ETL I don't think we should
However, we should keep some dagster concepts in mind while designing these FERC transformations. I would highly recommend reading the dagster concepts docs and our rough plan for applying these concepts to PUDL.
Each transform function will eventually be turned into a dagster "op" which is a node in the DAG. An op takes some upstream inputs, does some processing then passes the result to the next op in the DAG. Dagster is strict about what can be fed into ops:
An op only starts to execute once all of its inputs have been resolved. Inputs can be resolved in two ways:
- The upstream output that the input depends on has been successfully emitted and stored.
- The input was stubbed through config.
In other words, the input of an op has to be the output of another op OR configuration. Configuration can come in the form of Resources and Config which allow you to configure DAG wide resources in a yaml file. ETL wide abstractions like the Datastore, pudl_settings and dataset settings will likely become Resources or Config. This means ops won't accept python constants passed into functions. For example:
@op
def extract():
return pd.read_csv("poopy_data.csv")
@op
def fillnans(df, fill_value):
return df.fillna(fill_value)
@graph
def run_etl()
df = extract()
fillnans(df, "Gas")
won't work because the fill_value
parameter is a python string, not the output of another op or a Dagster Resource or Config. You could fix this by configuring the fill_value
parameter using Dagster Configs.
Also, these rules input rules only apply to functions you think should be included in the DAG. For example, helper functions probably won't be included in the DAG.
def fillnans(df, fill_value):
return df.fillna(fill_value)
@op
def transform(df):
df = fillnans(df, "Gas")
return df
The dagster refactor and xbrl stuff will probably be happening in parallel so Zane and I will probably have more guidance as we refactor!
Some more thoughts on Dagster:
You can create nested graphs in Dagster to allow for some logic hierarchy. @zaneselvans and I envision incrementally applying dagster graphs to our ETL. There are multiple levels we've identified:
_etl_{dataset}()
functions into ops and construct a graph. This is a very simple DAG but would enable us to run the ETLs in separate processes. When we're writing XBRL functions we don't need to think much about the first two levels because we are only making changes within the ferc1.transform()
function.
I wrote up a little prototype for option 3. It's ok but it seems awkward to have to define all of the transformed tables in the transform graph's outs param:
This is the recommended method for returning multiple outputs from graphs. I'm curious if it is possible for graphs to treat dictionaries as a single output instead of multiple.
Satisfying option three shouldn't be that difficult because we can use the existing structure in transform.ferc1
. I do have a couple of questions:
plants_steam()
depends on the fuel_ferc1()
table for plant_id assignment. ferc1_raw_dfs
? This might not be the case in Form 1 land but I'm pretty sure final EIA transform functions rely on multiple raw tables to produce a single cleaned table. Option 4 is a bit trickier because we want generic cleaning functions that can be parameterized for each table but do not have to use dagster config. Dagster recommends using Op factories for this situation. They work but feel a little kludgy. Here is an example of a generic transform op:
def rename_columns_factory(
name="default_name",
ins=None,
column_mapping=None,
**kwargs,
):
"""
Args:
name (str): The name of the new op.
ins (Dict[str, In]): Any Ins for the new op. Default: None.
Returns:
function: The new op.
"""
@op(name=name, ins=ins, **kwargs)
def rename_df(context, df):
context.log.info(f"\n The DataFrame: {df}\n")
context.log.info(f"\n The Op Ins: {context.op_def.ins}\n")
t_df = df.rename(columns=column_mapping)
context.log.info(f"\n The Transformed DataFrame: {t_df}\n")
return t_df
return rename_df
@op
def extract():
return pd.DataFrame([1,2], columns=["col"])
@job()
def etl():
df = extract()
column_mapping = {"col": "column"}
transformed_df = rename_columns_factory(column_mapping=column_mapping)(df)
etl.execute_in_process()
rename_columns_factory()
parametrizes the inner function rename_df()
which is an op. It's kind of mind-bending because there is a lot of function wrapping / decorating going on here. If we like this pattern, this is what a dagster friendly version without dagster abstractions would look like:
def rename_columns_factory(
column_mapping=None,
):
"""
Args:
column_mapping: Dict of column rename mappings.
Returns:
function: the rename_df function.
"""
def rename_df(df):
print(f"\n The DataFrame: {df}\n")
t_df = df.rename(columns=column_mapping)
print(f"\n The Transformed DataFrame: {t_df}\n")
return t_df
return rename_df
def extract():
return pd.DataFrame([1,2], columns=["col"])
def etl():
df = extract()
column_mapping = {"col": "column"}
transformed_df = rename_columns_factory(column_mapping=column_mapping)(df)
etl()
An open question here is where we want to store the transform parameters.
This is kind of an out there idea, but could we develop our own op factory decorator that makes creating op factories a little less verbose. Something like:
from functools import partial
def op_factory(name="default_name", ins=None, **kwargs):
def decorator(op_func):
@wraps(op_func)
def wrapper(**op_kwargs):
return op(name=name, ins=ins, **kwargs)(partial(op_func, **op_kwargs))
I'm not 100% sure that this would work as is (decorators are confusing), but the idea here is that it could be applied like:
@op_factory
def rename_df_factory(context, df, column_mapping=None):
context.log.info(f"\n The DataFrame: {df}\n")
context.log.info(f"\n The Op Ins: {context.op_def.ins}\n")
t_df = df.rename(columns=column_mapping)
context.log.info(f"\n The Transformed DataFrame: {t_df}\n")
return t_df
@op
def extract():
return pd.DataFrame([1,2], columns=["col"])
@job()
def etl():
df = extract()
column_mapping = {"col": "column"}
transformed_df = rename_df_factory(column_mapping=column_mapping)(df)
Here, when you call rename_df_factory
with column_mapping
as a keyword arg, the decorator will create a partially specified method, then apply the op
decorator to that method, and return the created op
.
I updated the FERC cleaning dagster prototype so we don't have to pass the transformed_dfs
dictionary between table cleaning functions:
Passing individual table depencies makes it easy to understand what tables depend on one another instead of passing around the entire transformed_dfs
dictionary.@cmgosnell let me know if this answers some of your questions in the xbrl_steam
PR.
It creates this fun dagster graph:
We want to develop a way to convert generic cleaning functions to dagster ops so they are documented in the DAG and could be validated using dagster-pandera.
There are two ways to configure ops, ConfigSchema
and op factories. Here are a couple of options for Dagster-friendly generic transform functions:
ConfigSchema
s for each generic transform functionDescription of ConfigSchema
from dagster docs:
Various Dagster entities (ops, assets, resources) can be individually configured. When launching a job that executes (ops), materializes (assets), or instantiates (resources) a configurable entity, you can provide run configuration for each entity. Within the function that defines the entity, you can access the passed-in configuration off of the context. Typically, the provided run configuration values correspond to a configuration schema attached to the op/asset/resource definition. Dagster validates the run configuration against the schema and proceeds only if validation is successful.
A common use of configuration is for a schedule or sensor to provide configuration to the job run it is launching. For example, a daily schedule might provide the day it's running on to one of the ops as a config value, and that op might use that config value to decide what day's data to read.
Configs are mostly used for parameters that are changed between runs. For example, a pudl_out
path or dataset processing settings. We initially did not entertain this option because these generic cleaning functions will be used dozens of times and the parameters will not change for different runs. Given we need to store all of the transformation parameters somewhere, we realized it might make sense to set a default dagster config with all of the static generic transform function metadata.
You can view the pseudo code here.
transform_steam
graph has an op named rename_columns
and an op named rename_columns_2
because rename_columns()
is called for the dbf and xbrl dataframes. This prevents us from understanding what config is associated with xbrl and dbf.context
keyword. This option grabs the transform metadata for a given table in a table transform function and passes the relevant data to an op factory of a generic cleaning function.
You can view the pseudo-code here.
context
keyword isn't needed. This option grabs the relevant transform parameters from the transform metadata structure given the table name.
You can view the pseudo-code here.
We might get diminishing returns applying dagster deeper into our call stack. Are the UI documentation and potential pandera validation worth the complexity of converting generic cleaning functions into ops? Most generic cleaning functions will be applied sequentially so there won't be complex dependencies to manage.
@bendnorman I think I understand these options and it seems like option 2 is our best case at least to move things forward before we fully integrate dagster as our orchestrator. Does this sound right to you?
Yeah, I think option 2 or 4 are the front runners. All options rely on a non-dagster data structure to store cleaning metadata. If we went with option 2, y'all could just create a library of generic cleaning functions that can be converted to ops. If we go with option 4, the cleaning functions would likely be methods in a transformer class that couldn't be easily converted to ops.
I'm kind of cooling off on converting cleaning functions to dagster ops:
I'm excited to use the integrated dataframe schemas provided by the Pandera/Dagster integration, but there are going to be a whole lot of table-level transformations and storing schemas for every one of them is probably overkill. I think we're going to be persisting more intermediate steps with the new setup than we were previously, and probably it makes sense to just check the schemas of the dataframes that we're persisting, since we'll need to think about those schemas for the database anyway.
It seems like something analogous to Option 4 could also be implemented by composing functions rather than using classes to store the transformation logic. The parameterizations of the transformations would be stored in the Pydantic classes, and then they would be consumed as arguments by individual transform functions. Only the top level table-specific transform functions (the functions containing all of the operations to be applied between persisted versions of a given table, like plants_steam_ferc1
) would be Dagster ops, and they would know what table they pertain to, and be able to call both generic cleaning functions (looking up their parameters in the Pydantic data structure using the table ID) and table-specific functions which could be defined within the table-level cleaning function, or declared as _private_functions()
elsewhere in the module.
I'm not sure which of these would feel cleaner, and will write some pseudo-code trying them both out.
Background
After creating tools to translate the FERC XBRL filings into SQLite databases, we decided that the old Visual FoxPro DBF and new XBRL data will need their own independent extract + transform processes. The new data has much more structure and metadata, and will be quite a bit easier to work with than the historical data, so it doesn't make sense to convert the new data into the old structure just so we can run it through the same old transformations (this is discussed in catalyst-cooperative/pudl#1579).
This means a lot of new code and transformations, and has precipitated a major refactor of the FERC Form 1 transformations -- especially since we are going to be going after many additional Form 1 tables beyond the ones we've already cleaned up and integrated into the PUDL database.
Now that we have access to the raw XBRL data, we've been working on areas in parallel:
FERC XBRL Data Acquisition (@zschira)
pudl-scrapers
&pudl-zenodo-storage
repositories.Issues
Pre-extract (@zschira)
ferc-xbrl-extractor
repository.xbrl_integration
branch.Release Issues
Post-release Issues
Update Existing ETL
Updating our transformations is a mix of software engineering and data wrangling tasks. We want to get the software somewhat stable and documented before involving lots of people working in parallel on unfamiliar data, and so we've broken this into 3 phases of work:
Phase 1: Software Design and Alpha Testing (@zaneselvans & @cmgosnell)
fuel_ferc1
andplants_steam_ferc1
are loading into the DB successfully, and provide functionality comparable to the old DBF, extending data coverage through 2021.xbrl_steam
.xbrl_steam
branch intoxbrl_integration
.Issues
Phase 2: Beta Testing w/ Familiar Tables (#1801)
fuel_ferc1
andplants_steam_ferc1
tables.xbrl_integration
branch.xbrl_integration
intodev
in the PUDL repository, and can make an initial release of this data publicly.Issues
Phase 3: Integrate New FERC 1 Data & Methods
dev
.key
Issues
2023 Q1 Issues
Icebox