dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.64k stars 1.47k forks source link

support asset partitions for load_assets_from_dbt_project #7683

Closed OwenKephart closed 1 year ago

OwenKephart commented 2 years ago

We don't currently support partitioned dbt assets, but we should

Relevant mentions in Slack:

GeorgePearse commented 2 years ago

Would love this

aryan-tmg commented 2 years ago

I second the above. I see a ton of value in incremental partitioning because they are partitioned by nature in some sense. Most cases I've seen use a date for example.

sryza commented 2 years ago

@GeorgePearse @aryan-tmg - just to deepen my understanding, are you using the BigQuery? The main information I see when I Google dbt partitioning is about BigQuery.

GeorgePearse commented 2 years ago

My data warehouse is currently v. budget. Just in postgres, might migrate to Snowflake or Firebolt at some point in the future. Just don't want to have to run SQL queries over huge amounts of data to do a computation that unnecessarily recreates everything from scratch.

I haven't thoroughly explored what would already be available for this though.

aryan-tmg commented 2 years ago

My use case is that I have a lot of data in my dbt models. If I were to materialize them as views each day it would greatly extend my workflows. Right now, I have my models configured as incremental and I pass a variable "run_date" whenever I do a dbt run. I'd like to be able to configure that somehow in dagster so that I can pass a partition definition to all assets in my workflow including dbt

sryza commented 2 years ago

We're currently investigating adding this. Some questions for those who are interested in this:

Tagging people who 👍 'ed this issue who I haven't yet reached out to in other contexts: @deugene @hungtd9 @xaniasd @nvinhphuc.

nvinhphuc commented 2 years ago

Hi @sryza , I will describe what we currently do.

I have an ingestion job running daily. When the ingestion job finishes, we get the start_time and end_time from the partition and run an dbt ops with variables start_time and end_time. A simple dbt model looks like this:

{{ config(
    materialized='incremental',
    unique_key=['id'],
) }}
SELECT 
    id::string,
    date_modified::timestamp,
    date_modified_partition::date
FROM {{ source('raw', 'source_table') }}
{% if is_incremental() %}
WHERE date_modified_partition >= '{{ var('start_time') }}'
    AND date_modified_partition < '{{ var('end_time') }}'
{% endif %}

This is to process data from start_time to end_time, not fully refresh all the data. However, we can run full refresh with dbt run --full-refresh if we want to reprocess all data. But the idea is to bring the partition in Dagster into DBT for coherence.

xaniasd commented 2 years ago

Hi @sryza here's some input from my side as well

In your project, what partitions do you use (i.e. daily), and are all models partitioned or are some partitioned and some not? Do models within a project have different partitions?

It really depends on the model. Typically daily partitions, could be hourly as well. Event-like tables are typically partitioned, but there are also models that partitioning is not needed, for instance small tables with master data (think zipcode per city).

How do you change behavior of dbt to respect the partition (is it just vars?). Do you use incremental models to handle inserting just the records corresponding to the partition, or is there something else going on?

For incremental models, you can also define static partitions. You could tell dbt to figure out all missing partitions itself and update the model accordingly, but this is not a great option when each partition contains gigabytes of data. By providing a partition as a variable, I can update exactly the partition that I want.

If you set a var named something like "run_date", does that typically correspond to the start of the time window or the end of the time window?

All I can say: please avoid the confusion Airflow created with their execution_date (I think they're trying to move past that as well) :) I'm trying to use the var as the partition to be processed wherever possible. Perhaps it would be good to leave the interpretation to the individual user, as it can differ wildly.

Hope this helps, happy to share more if necessary

OwenKephart commented 2 years ago

@xaniasd @nvinhphuc thanks for the feedback! You both mentioned sometimes partitioning by hour and sometimes by day, and I have some followup questions around that.

nvinhphuc commented 2 years ago

Hi @OwenKephart

OwenKephart commented 2 years ago

Hi everyone! As an initial implementation of this behavior, we're planning to add a partitions_def and partition_key_to_vars_fn parameter to both load_assets_from_dbt_project and load_assets_from_dbt_manifest.

For simple cases, where you want to parameterize all models in a dbt project by (let's say) a run date, that would look something like:

from dagster import DailyPartitionsDefinition

my_partitions_def = DailyPartitionsDefinition(start_date="2022-02-02")

def partition_key_to_dbt_vars(partition_key):
    return {"run_date": partition_key}

dbt_assets = load_assets_from_dbt_project(
    ...,
    partitions_def=my_partitions_def,
    partition_key_to_vars_fn=partition_key_to_dbt_vars,
)

In this example, when Dagster is invoked to update the "2022-06-06" partition (for example), then it will invoke the dbt cli with a vars argument of {"run_date": "2022-06-06"}.

This does not currently provide any special support for dbt projects that have multiple partitioning schemes (although you can invoke the loadassets... function multiple times, once for each set of dbt assets).

We're definitely interested in how well this setup maps to your usecases, so feel free to leave feedback. All of these changes are currently experimental and subject to change, so if this misses the mark, definitely let us know :)

xaniasd commented 2 years ago

awesome, I'll give it a try. Thanks for this!

its-a-gas commented 1 year ago

Hi @sryza

In your project, what partitions do you use (i.e. daily), and are all models partitioned or are some partitioned and some not? Do models within a project have different partitions?

When we supplying partition information to a DB/query engine a sensible default is daily, however there are sometimes cases to make this lower eg if we want to make hourly partitions for more recent data to make scans cheaper. Conversely we might want to make partitions larger eg BigQuery has a default max quota of 4000 partitions per table, and if we want more than 4000

How do you change behavior of dbt to respect the partition (is it just vars?)

I've done it previously by partitioning BigQuery tables per date, and we've used incremental DBT models with variables representing the start/end dates passed into the is_incremental WHERE clause to do partial backfills.

Today we are using Dagster/DBT/Snowflake, letting Snowflake decide how it wants to partition/shard it's data files, and don't have anything better than passing timestamp bookmarks to DBT incremental models.

Do you use incremental models to handle inserting just the records corresponding to the partition, or is there something else going on?

Depends on the data requirements, but if it's just to create a clean copy ready for analysts we might make an incremental doing an insert only where the incoming set has timestamps greater than the max timestamp in the downstream table, or alternatively we might filter out on a WHERE NOT EXISTS looking for existence of the primary key in the downstream table.

If you set a var named something like "run_date", does that typically correspond to the start of the time window or the end of the time window?

Ideally we'd want to be able to start start/min stop/max bookends, for example if we need to backfill 1 specific week eg 2022-02-01 to 2022-02-07 without having to backfill 2022-02-01 to whatever today's timestamp is.


One alternative is for us to do full loads, which is expensive when you're at/above 100s of TB in an insert statement. Another is to not invoke DBT via Dagster, and have another runner/orchestrator invoke DBT passing in the timestamp bookends but we really like using Dagster.

It would be a massive time saving feature for Dagster to be able to push partition/date-ranges/id-ranges down to DBT to do partial backfills.

sryza commented 1 year ago

Thanks a ton for the info @its-a-gas. Would the approach that @OwenKephart added support for and mentioned here work for you?

its-a-gas commented 1 year ago

Hi @sryza 👋

I think this approach would get us a step closer to having Dagster push the partitioned-asset concept down to the DBT layer. My interpretation is that start_date would be passed to DBT to mutate the exact single date, as opposed to a range. Further that this proposal assumes all models, within a DBT Project, are partitioned daily, at the exclusion of hourly/weekly/etc.

I think we all agree the ideal state is to pass start, and end, bookmarks of arbitrary granularity.... but @OwenKephart solution would get us closer to the ideal state

sryza commented 1 year ago

I'm going to close this because the partitions_def and partition_key_to_vars_fn arguments enable this.

astronautas commented 6 months ago

Hi @sryza

In your project, what partitions do you use (i.e. daily), and are all models partitioned or are some partitioned and some not? Do models within a project have different partitions?

When we supplying partition information to a DB/query engine a sensible default is daily, however there are sometimes cases to make this lower eg if we want to make hourly partitions for more recent data to make scans cheaper. Conversely we might want to make partitions larger eg BigQuery has a default max quota of 4000 partitions per table, and if we want more than 4000

How do you change behavior of dbt to respect the partition (is it just vars?)

I've done it previously by partitioning BigQuery tables per date, and we've used incremental DBT models with variables representing the start/end dates passed into the is_incremental WHERE clause to do partial backfills.

Today we are using Dagster/DBT/Snowflake, letting Snowflake decide how it wants to partition/shard it's data files, and don't have anything better than passing timestamp bookmarks to DBT incremental models.

Do you use incremental models to handle inserting just the records corresponding to the partition, or is there something else going on?

Depends on the data requirements, but if it's just to create a clean copy ready for analysts we might make an incremental doing an insert only where the incoming set has timestamps greater than the max timestamp in the downstream table, or alternatively we might filter out on a WHERE NOT EXISTS looking for existence of the primary key in the downstream table.

If you set a var named something like "run_date", does that typically correspond to the start of the time window or the end of the time window?

Ideally we'd want to be able to start start/min stop/max bookends, for example if we need to backfill 1 specific week eg 2022-02-01 to 2022-02-07 without having to backfill 2022-02-01 to whatever today's timestamp is.

One alternative is for us to do full loads, which is expensive when you're at/above 100s of TB in an insert statement. Another is to not invoke DBT via Dagster, and have another runner/orchestrator invoke DBT passing in the timestamp bookends but we really like using Dagster.

It would be a massive time saving feature for Dagster to be able to push partition/date-ranges/id-ranges down to DBT to do partial backfills.

Wouldn't just pushin slices of range instead of pushing a range at once achieve the same? i.e. e.g. to refill 2022-02-01 and 2022-02-07 , dagster just run 7 parallel dbt jobs, each having received a day to backfill from dagster?

sryza commented 6 months ago

Wouldn't just pushin slices of range instead of pushing a range at once achieve the same? i.e. e.g. to refill 2022-02-01 and 2022-02-07 , dagster just run 7 parallel dbt jobs, each having received a day to backfill from dagster?

That works too, but there's some fixed overhead with each job, so doing it all at once reduces that overhead