dbt-labs / dbt-core

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
https://getdbt.com
Apache License 2.0
9.32k stars 1.55k forks source link

[CT-3225] [Feature] permit python models to declare `ref` and `source` models independently of data-access #8858

Open mattgiles opened 9 months ago

mattgiles commented 9 months ago

Is this your first time submitting a feature request?

Describe the feature

Permit python models to declare upstream refs/sources separately from accessing the data in those upstream models.

Maybe like this:


@ref('model_A')
def model(dbt, session):
    pass

Or this:

def model(dbt, session):
    dbt.declare_ref('model_A')
    pass

Python models currently rely on dbt.ref and dbt.source to implicitly declare dependence on other models. This pattern follows the use of the ref and source macros in .sql models insofar as the declaration coincides with data access.

This is really cool!

However, these functions are not the only way a user might access an upstream model's data using python, and depending on circumstance, may not be preferred (or even possible).

Describe alternatives you've considered

1. Don't do this

The main alternative to consider seems to be: don't do this.

Reading through the original proposal for python models, I gather that this functionality may have been deliberately omitted.

I think I understand the core motivation (maybe not!): python models, by definition, represent a significant break from strictly ELT workflows, but the spirit of ELT might be preserved so long as computation remains relatively remote from whatever client is issuing commands.

It's possible that making the lineage graph accessible outside of the context of a full blown adapter (where compute is sufficiently scalable to handle warehouse-sized workloads) encourages non-idiomatic, local usage and the better alternative is to leave things the way they are.

However, I can, today, nonetheless, create python models which ignore the dbt object entirely and they work. I get my materialization. Out of the box. I'm just penalized insofar as that materialization has no access to the core utility of dbt, and I'm forced to hack it back into the lineage graph by declaring the resulting table as a source. Because this remains possible -- the penalty feels more like a nuisance.

Not to mention popular adapters like dbt-duckdb which put the lie a little bit to the notion of remote computation being essential.

2. dbt.ref could accept a custom handler

If the desire is to continue to couple the declaration of an upstream relationship with data access, the same end could be accomplished with something like:

def custom_handler(*args, **kwargs):
    return warehouse.to_dataframe("select * from model_A")

def model(dbt, session):
    df = dbt.ref('model_A', handler=custom_handler)

There may be other motivations for this kind of pattern, but it seems heavy-handed as a solution to the current issue.

3. Supporting an optional config object, and supporting ref/source declarations there

To increase parity with sql models, instead of merely using model, dbt could look for an optional variable named config:

config = {
    'materialized': 'table',
    'refs': ['model_A']
}

def model(dbt, session):
    pass

This seems like a fine thing, but, similar to the custom handler idea, feels like a bigger design choice than is justified by the limited scope of this issue.

Who will this benefit?

All users who implement or want to implement "plain old" python models, without using the dbt object to load data. Currently these users lose the magic of the ref function and all that comes with it. Their lineage graph/dag is broken.

Are you interested in contributing this feature?

Happy to, if the preferred implementation can be briefly sketched out.

Anything else?

No response

dbeatty10 commented 9 months ago

Thanks for reaching about this @mattgiles !

However, I can, today, nonetheless, create python models which ignore the dbt object entirely and they work. I get my materialization. Out of the box. I'm just penalized insofar as that materialization has no access to the core utility of dbt, and I'm forced to hack it back into the lineage graph by declaring the resulting table as a source. Because this remains possible -- the penalty feels more like a nuisance.

Not to mention popular adapters like dbt-duckdb which put the lie a little bit to the notion of remote computation being essential.

Would you mind sketching this out in more detail? i.e. could you share a small working code example of "ignor[ing] the dbt object entirely" and "hack[ing] it back into the lineage graph by declaring the resulting table as a source"?

This would help us assess the kind of use-cases you are imagining and how you are proposing to reduce the penalties/nuisance you are experiencing.

mattgiles commented 9 months ago

Thanks for responding @dbeatty10. Sure thing!

Context:

So I have written a python model that looks like:

from google.cloud import bigquery

QUERY = """
select *
from `my-project.my_dataset.model_known_to_dbt`
"""

DATETIME_FIELDS = [
    'some',
    'datetime',
    'fields'
]

CLASSIFIERS = {
    'some': generate_classifier(...),
    'generated': generate_classifier(...),
    'functions': generate_classifier(...)
}

def model(dbt, session):
    client = bigquery.Client()

    df = client.query(QUERY).to_dataframe()

    for dt in DATETIME_FIELDS:
        df[dt] = df[dt].astype("datetime64")

    for col, func in CLASSIFIERS.items():
        df[col] = df.apply(func, axis=1)

    return df

Running dbt run --models this_model gives me my model at my-project.my_dataset.this_model in gcp just fine.

However, I found myself going so far as to organize the directory structure of my project models around this break point. This model sits in its own directory. It needs to be run after some of the models in the preceding directory have been run, and before the following directory of models. In the downstream directory, I have added an external.yml to declare sources. And I have declared this dbt-generated model, which is a transformation of another dbt-generated model, as an external source so that it can be known to downstream dbt models:

version: 2

sources:
  - name: python_models
    database: my-project
    schema: my_dataset
    tables:
      - name: this_model
        identifier: this_model

This is what I meant by "hack[ing] it back into the lineage graph by declaring the resulting table as a source."

It seems more in the spirit of dbt for me to organize my directories free of this concern, and expect (a) this model to be generated safely after it's dependencies already exist, and (b) to ref it in downstream models like any other upstream model generated by dbt run.

It also seems (I am not an everyday dbt user, and haven't mucked around too much in dbt-core) this could be addressed in the way proposed above. But again, I maybe don't appreciate the unintended consequences that need to be guarded against. It is definitely the case that dbt is so lovely in part because, in some ways, it is so opinionated.

Two last thoughts:

dbeatty10 commented 9 months ago

Thanks for all that amazing write-up @mattgiles !

High-level feature request

With SQL models there is a way to force dependencies like this:

 -- depends_on: {{ ref('upstream_parent_model') }}
 {{ your_macro('variable') }}

I'm not aware of anything similar for for Python models, and it sounds like your proposal/request is to add it. This is an interesting proposal, and it's just a matter of how impactful it would be and the effort taken to implement.

Your specific use-case

I got a chance to briefly discuss this with @ChenyuLInx who wrote the code that powers Python models for BigQuery (and other platforms).

Something he pointed out to me is that python models will be submitted as a serverless dataproc job by default (rather than requiring a cluster). You can read more about serverless vs. cluster job by clicking the "BigQuery" tab here. Regardless if your submission_method is serverless or cluster, your job will be submitted to dataproc either way.

So if the example you provided works (without needing to go through onerous permissions to set up dataproc), then we'd also expect for the following to work:

DATETIME_FIELDS = [
    'some',
    'datetime',
    'fields'
]

CLASSIFIERS = {
    'some': generate_classifier(...),
    'generated': generate_classifier(...),
    'functions': generate_classifier(...)
}

def model(dbt, session):
    client = bigquery.Client()

    df = dbt.ref("model_known_to_dbt")

    for dt in DATETIME_FIELDS:
        df[dt] = df[dt].astype("datetime64")

    for col, func in CLASSIFIERS.items():
        df[col] = df.apply(func, axis=1)

    return df

Here's the small diff between the two:

1,7d0
< from google.cloud import bigquery
< 
< QUERY = """
< select *
< from `my-project.my_dataset.model_known_to_dbt`
< """
< 
23c16
<     df = client.query(QUERY).to_dataframe()
---
>     df = dbt.ref("model_known_to_dbt")

Question

So could you help us understand what about using df = dbt.ref("model_known_to_dbt") doesn't work for you? I think we're not understanding some key piece to your puzzle.

aboomer07 commented 8 months ago

@dbeatty10 I would also love this feature (I work in Data Science)! My main motivations for this feature are:

For large, complicated data science pipelines, I haven't found a better tool yet than DBT simply for organizing, sketching out, and detailing product requirements of a large modeling project. The strictness with with the data relationships and tabular schemas need to be defined in DBT really helps with defining project requirements, and visualizing missing pieces of the modeling process.

Outside of the planning stage of the project, for a data science pipeline that is mostly executed in SQL with only a few python steps, DBT is relatively easy and user friendly to create a reproducible, trackable pipeline. Airflow may be the better solution for this, but it is a lot more time and overhead for a pipeline that is mostly SQL transformations.

In both of these contexts it would be nice to be able to define "dummy" python models purely so that entire DAG can be visualized and understood in a complete uninterrupted fashion. As an example, I have some python steps in a pipeline that use Ray for distributed computing, and couldn't realistically be done using PySpark. DBT is so good for scoping and organizing projects, I think I would probably use it even if it didn't perform the last step of actually running the SQL queries. It would be nice to be able have a python model that kicked off a job on Ray, where I could list the tabular dependencies of that python model without using DBT to actually load those tables. It would be nice to have a complete DAG construction in this case, the non-software people who vet the methodology and requirements don't need to know whether each step was strictly done by DBT or not. (As an aside this use-case is also a reason why having the ability to execute python locally with dbt-fal in a DBT step can be helpful).

Thanks! Let me know if you want any more info.

dbeatty10 commented 8 months ago

Thanks for sharing your use-case @aboomer07. Love to hear that dbt has been helpful with planning complicated data science pipelines!

In both of these contexts it would be nice to be able to define "dummy" python models purely so that entire DAG can be visualized and understood in a complete uninterrupted fashion.

Could you share more detail about the following?

  1. How you are hoping to define "dummy" python models
  2. How you would plan to visualize the entire DAG with those dummy python models.

i.e., if you can share the exact syntax you are proposing and the exact commands you'd like to execute in order to visualize the DAG, then that would help us better assess this feature request.

github-actions[bot] commented 5 months ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please comment on the issue or else it will be closed in 7 days.

github-actions[bot] commented 4 months ago

Although we are closing this issue as stale, it's not gone forever. Issues can be reopened if there is renewed community interest. Just add a comment to notify the maintainers.

github-actions[bot] commented 4 months ago

Although we are closing this issue as stale, it's not gone forever. Issues can be reopened if there is renewed community interest. Just add a comment to notify the maintainers.

OSalama commented 2 months ago

Thanks for all that amazing write-up @mattgiles !

High-level feature request

With SQL models there is a way to force dependencies like this:

 -- depends_on: {{ ref('upstream_parent_model') }}
 {{ your_macro('variable') }}

I'm not aware of anything similar for for Python models, and it sounds like your proposal/request is to add it. This is an interesting proposal, and it's just a matter of how impactful it would be and the effort taken to implement.

I have a use case which would could benefit from this feature, plus extending the dbt Python ref() function.

We have a team which is using Python models to build some complex queries that are then executed using the spark session object. They are not using the dbt.ref dataframe object because the Python logic is being reused in other parts of the stack, and they would prefer to keep the code identical, rather than having to implement and maintain logic using dataframes and SQL queries.

This worked quite well for a while, but we're now introducing multiple dbt environments, and would like to benefit from dbt's ability to parse and evaluate the database + schema for any tables we ref, so we can include them in the queries we build. Since we're not using the dataframes we get from the ref() call, we'd also benefit from just declaring a dependency.

Something like this would be awesome:

upstream = dbt.ref("my_upstream_model", name_only=True)

upstream_database = upstream.database
upstream_schema = upstream.schema

query = f"select * from {upstream_database}.{upstream_schema}.my_upstream_model"

Even just returning the full table path without breaking it apart would be good enough, and that is already taken care of by the refs object that you build inside the ref() function.

def ref(*args, **kwargs):
    refs = # Do whatever macro magic you do to build this!
    key = '.'.join(args)
    version = kwargs.get("v") or kwargs.get("version")
    if version:
        key += f".v{version}"
    dbt_load_df_function = kwargs.get("dbt_load_df_function")
    name_only = kwargs.get("name_only")
    if name_only:
        return refs[key]
    return dbt_load_df_function(refs[key])

with usage as:

upstream_name = dbt.ref("my_upstream_model", name_only=True)
database, schema, table = upstream_name.split(".")
query = f"select * from {database}.{schema}.{table}"
dbeatty10 commented 2 months ago

Thanks for adding your experience and request @OSalama!

Could you provide simplified example code for this scenario you described? This will help us try it out ourselves so we can better assess your request.

We have a team which is using Python models to build some complex queries that are then executed using the spark session object. They are not using the dbt.ref dataframe object because the Python logic is being reused in other parts of the stack, and they would prefer to keep the code identical, rather than having to implement and maintain logic using dataframes and SQL queries.

Basically, we'd be looking to see some simple code examples that utilize the spark session object you are describing. Ideally, it would also demonstrate how the Python logic can be reused in other parts of a stack as well.

OSalama commented 2 months ago

@dbeatty10 The "how the logic is reused" part is basically that the team copy/paste the core logic into a streamlit module, and then streamlit and dbt model use their entrypoints to pull required variables from the context, eg:

def model(dbt, session):
    source_project_id = dbt.config.get("source_project_id")
    shop_type = dbt.config.get("shop_type")
    dbt.ref("segments_axis_1") # Ensures dbt builds the DAG correctly
    dbt.ref("segments_axis_2")

    segment_tree = dbt.ref("segment_tree")
    restructured_segment_tree = [row.asDict() for row in segment_tree.collect()]

    table_df = perform_segmentation(session, restructured_segment_tree, source_project_id, shop_type)

where the rest of the model is code that gets copy/pasted between 2 modules, and in the streamlit module, there's an equivalent to the model() function where we retrieve the same variables from the environment, and do whatever processing needed to adapt to the perform_segmentation function.

I can't show much of the code that is doing the segmentation processing, but a simplified example could be:

segment_queries = []
for axis in segment_tree:
    axis_table = f"{source_project_id}.mydataset.segments_{axis}
    segment_queries.append(f"SELECT '{axis}' AS axis, COUNT(*) AS num_atoms, FROM `{axis_table}`")

segment_query = " UNION ALL ".join(segment_queries)

session.conf.set("viewsEnabled", "true")
session.conf.set("materializationDataset", "mydataset")

result = session.read.format("bigquery").option("query", query).load() # This runs our query against BigQuery
result_rows = result.collect()
atoms = further_processing(result_rows) # Now you can do some arbitrary processing of the results in Python

schema = get_output_schema()
table_df = session.createDataFrame(atoms, schema=schema)
return table_df