dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.92k stars 1.49k forks source link

dagster-dbt test failures not respected across asset definitions #19821

Open jPinhao-Rover opened 9 months ago

jPinhao-Rover commented 9 months ago

Dagster version

1.5.12

What's the issue?

When defining different sets of @dbt_assets as part of a single job, dagster correctly identifies dependencies across them and correctly creates and orchestrated tasks within the job to execute downstream dependencies after the upstream assets have been materialised. In our case, we need to have different @dbt_assets definitions to correctly configure and run partitioned and non-partitioned assets as part of a single job.

However if one of the upstream assets' tests fail, it fails to recognise this when selecting downstream assets to execute, and will execute them independently of test success/failure.

If assets are defined within a single @dbt_assets definition, it correctly respects test failures and doesn't execute downstream assets.

What did you expect to happen?

Test failures are respected when deciding to execute downstream assets as part of a single job, whether the asset is defined in the same or across separate @dbt_assets definitions.

How to reproduce?

Create 4 dbt models where:

Create 2 @dbt_asset definitions:

@dbt_assets(
    manifest=dbt_manifest_path,
    select="tag:chain_1",
)
def dbt_test_chain_one(
    context: AssetExecutionContext, dbt: DbtCliResource
):
    dbt_run_invocation = dbt.cli(["build"], context=context, raise_on_error=False)
    yield from dbt_run_invocation.stream()

@dbt_assets(
    manifest=dbt_manifest_path,
    select="tag:chain_2",
)
def dbt_test_chain_two(
    context: AssetExecutionContext, dbt: DbtCliResource
):
    dbt_run_invocation = dbt.cli(["build"], context=context, raise_on_error=False)
    yield from dbt_run_invocation.stream()

Create 1 job definition:

define_asset_job(name="test_deps", selection=[dbt_test_chain_one, dbt_test_chain_two])

Execute the job, and note that:

Deployment type

None

Deployment details

This occurs both in a local deployment + Dagster Helm Chart deployment

Additional information

You have 2 alternatives way to handle this as far as I'm aware:

  1. You mark the upstream task as having failed (raise exception if model/test fails), and none of the downstreams in the second @dbt_asset definition will execute, even if their upstreams succeeded
  2. You don't mark the task as failed, and all dowsntreams in the second @dbt_asset definition will execute

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

jPinhao-Rover commented 8 months ago

For anyone interested, there is a non-ideal workaround - effectively intercept the materialisation events before they're submitted to dagster, and if there were test-failures, don't report the asset as materialised. Dagster will see the asset as "skipped" and will prevent dowsntream dependencies from running.