dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.29k stars 1.42k forks source link

dbt custom schema not working in dagster #22771

Closed SeppBerkner closed 3 months ago

SeppBerkner commented 3 months ago

Dagster version

dagster, version 1.7.0

What's the issue?

The custom schema setting in dbt is not working in dagster. Having a postgres materialization I would like that data is pushed into different database schemas. For this I am using the custom schema approach with a config inside the dbt model:

_mymodel.sql

{{ config(schema='my_custom_schema') }}

SELECT 
    column_1
FROM 
    {{ref("stg_my_table")}}

schema.yml

version: 2

models:
  - name: my_model
    config:
        tags: 'category=my_category'
        meta:
          dagster:
            group: "my_group"

Using the dbt run command creates the table in the schema my_custom_schema. In dagster I then use a job by selecting every downstream models of my_model:

jobs.py

my_model_job = define_asset_job(
    name = "my_model_job", 
    selection = "*my_model",
    description = "job to create my model"
)

Without the above config() setting in the dbt model I can start dagster normally using dagster dev and the model is also materialized when I run the job. But when adding the config() block to my dbt model, I am getting the following error trace:

/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/workspace/context.py:622: UserWarning: Error loading repository location orchestrate:dagster._core.errors.DagsterInvalidSubsetError: AssetKey(s) ['my_model'] were selected, but no AssetsDefinition objects supply these keys. Make sure all keys are spelled correctly, and all AssetsDefinitions are correctly added to the `Definitions`.

For selected asset ["my_model"], did you mean one of the following?
        ["my_custom_schema", "my_model"]

Stack Trace:
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_grpc/server.py", line 408, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
                                                              ^^^^^^^^^^^^^^^^^^^
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_grpc/server.py", line 266, in __init__
    repo_def.load_all_definitions()
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/repository_definition/repository_definition.py", line 139, in load_all_definitions
    self._repository_data.load_all_definitions()
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/repository_definition/repository_data.py", line 206, in load_all_definitions
    self.get_all_jobs()
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/repository_definition/repository_data.py", line 435, in get_all_jobs
    self._all_jobs = self._jobs.get_all_definitions()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/repository_definition/caching_index.py", line 101, in get_all_definitions
    sorted(
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/repository_definition/caching_index.py", line 132, in get_definition
    definition = cast(Callable, definition_source)()
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/repository_definition/repository_data_builder.py", line 117, in resolve_unresolved_job_def
    job_def = unresolved_job_def.resolve(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/unresolved_asset_job_definition.py", line 184, in resolve
    job_asset_graph = get_asset_graph_for_job(asset_graph, self.selection)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/asset_job.py", line 290, in get_asset_graph_for_job
    selected_keys = selection.resolve(parent_asset_graph)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/asset_selection.py", line 420, in resolve
    return self.resolve_inner(asset_graph, allow_missing=allow_missing)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/asset_selection.py", line 983, in resolve_inner
    selection = self.child.resolve_inner(asset_graph, allow_missing=allow_missing)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbe/anaconda3/envs/lbd_data_env/lib/python3.11/site-packages/dagster/_core/definitions/asset_selection.py", line 897, in resolve_inner
    raise DagsterInvalidSubsetError(

  warnings.warn(f"Error loading repository location {location_name}:{error.to_string()}")

What did you expect to happen?

I expect that dagster handles dbt's custom schema as described by dbt's documentation, creating a new database schema based on the name in the model's config() setting.

How to reproduce?

No response

Deployment type

Local

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

marchinho11 commented 3 months ago

Hey @SeppBerkner! When you configure custom schema the asset key for your model is different: https://github.com/dagster-io/dagster/blob/3a990132069004eea64c823bdb7676dca8b006fa/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py#L431-L435

To define your job and select the asset by the key use this selection:

my_model_job = define_asset_job(
    name = "my_model_job", 
    selection = "*my_custom_schema/my_model",
    description = "job to create my model"
)
JoBerkner commented 3 months ago

Great! This was it. Thanks @marchinho11

tuantran4 commented 1 month ago

Hi, I also faced this problem. How can I resolve it in like defining dbt asset ? Here is my dbt asset code:

from dagster import AssetExecutionContext
from dagster_dbt import dbt_assets, DbtCliResource

from ..project import dbt_project

@dbt_assets(
    manifest=dbt_project.manifest_path,
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["run"], context=context).stream()