dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.02k stars 1.38k forks source link

DagsterInstance() on Dagster Cloud Serverless #15062

Closed gnmjulius closed 1 year ago

gnmjulius commented 1 year ago

Dagster version

1.3.13

What's the issue?

The issue relates to this Slack thread

Using the DagsterInstance() to call all dynamic partitions on the DagsterCloud Serverless. But it returns an empty list.

What did you expect to happen?

instance = DagsterInstance.get() 
all_partition = instance.get_dynamic_partitions(partitions_def_name=f"fivetran_{source}")
print(all_partition)

all_partiton should return a list of dynamic partitions

How to reproduce?

I'm using DagsterInstance() in the function of partition_key_to_vars_fn when running Dagster DBT.

# assets\__init__.py
dbt_assets = load_assets_from_dbt_project(
            project_dir=DBT_PROJECT_PATH,
            profiles_dir=DBT_PROFILES,
            select=f"tag:{source}",
            op_name=f"{dbt_ops}",
            partitions_def=partitions_dict[f"fivetran_{source}"],
            key_prefix=source,
            partition_key_to_vars_fn=date_from_partition_key,   <-------- FUNCTION TO TURN PARTITION KEY TO DBT VARS
            use_build_command=True,
            display_raw_sql=True
        )

And the partition_key_to_vars_fn function:

# date_from_partition_key
def date_from_partition_key(input_str: str):
    source, comment, date, code = partition_key_split(input_str)
    print(date)
    instance = DagsterInstance.get()          <---------- DagsterInstance TO GET ALL DYNAMIC PARTITIONS
    all_partition = instance.get_dynamic_partitions(
        partitions_def_name=f"fivetran_{source}")
    print(all_partition)                      <---------- THIS RETURN AN EMPTY LIST

    index = all_partition.index(input_str)

    if code != "INIT":
        _, _, last_incremental_load_end_date, _ = partition_key_split(all_partition[index - 1])
        incremental_load_end_date = date
        dbt_vars = {
            "last_incremental_load_end_date": last_incremental_load_end_date,
            "incremental_load_end_date": incremental_load_end_date  # <----use for vars
        }
        return dbt_vars

It's running fine on my local machine, but when I test on Dagster Cloud Serverless. The all_partition returns an empty list. image

I also have to set the DAGSTER_HOME = "/opt/dagster/app/dagster_home" , if I don't set the DAGSTER_HOME environment variables. I will get this error:

dagster._core.errors.DagsterHomeNotSetError: The environment variable $DAGSTER_HOME is not set. 
Dagster requires this environment variable to be set to an existing directory in your filesystem. This directory is used to store metadata across sessions, or load the dagster.yaml file which can configure storing metadata in an external database.
You can resolve this error by exporting the environment variable. For example, you can run the following command in your shell or include it in your shell configuration file:
    export DAGSTER_HOME=~"/dagster_home"
or PowerShell
$env:DAGSTER_HOME = ($home + '\dagster_home')or batchset DAGSTER_HOME=%UserProfile%/dagster_homeAlternatively, DagsterInstance.ephemeral() can be used for a transient instance.

  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 369, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 90, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 192, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/compute.py", line 161, in _yield_compute_results
    for event in iterate_with_context(
  File "/usr/local/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 445, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.9/site-packages/dagster_dbt/asset_defs.py", line 300, in _dbt_op
    kwargs["vars"] = partition_key_to_vars_fn(context.partition_key)
  File "/opt/dagster/app/dbt_vault/utils/dbt_utils.py", line 23, in date_from_partition_key
    instance = DagsterInstance.get()
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/instance/__init__.py", line 484, in get
    raise DagsterHomeNotSetError(

Deployment type

Dagster Cloud

Deployment details

Dagster Cloud Serverless with Github

Additional information

There is a way that can solve my problem.

At this line of asset_defs.py in DAGSTER_DBT's module_library I'm using context instead of context.partition_key, and it looks like this:

Original:

# Original
# variables to pass into the command
        if partition_key_to_vars_fn:
            kwargs["vars"] = partition_key_to_vars_fn(context.partition_key)

Modify:

# Modify
# variables to pass into the command
        if partition_key_to_vars_fn:
            kwargs["vars"] = partition_key_to_vars_fn(context)

It is no longer partition_key_to_vars_fn but context_to_vars_fn 😀. Back to my issue, instead of using DagsterInstance(), I'm using context.instance.get_dynamic_partitions() to get all dynamic partitions. And my partition_key_to_vars_fn function will change to:

def date_from_context(context):
    partition_key = context.partition_key             <----- get the partition_key from context
    all_partition = context.instance.get_dynamic_partitions(
            partitions_def_name=f"fivetran_{source}")       <----- get all dynamic_partitions from context

    index = all_partition.index(partition_key)

    if code != "INIT":
        _, _, last_incremental_load_end_date, _ = partition_key_split(all_partition[index - 1])
        incremental_load_end_date = date
        dbt_vars = {
            "last_incremental_load_end_date": last_incremental_load_end_date,
            "incremental_load_end_date": incremental_load_end_date  # <----use for vars
        }
        return dbt_vars

Using context instead of context.partition_key in partition_key_to_vars_fn, give me more choices to work with dbt. I hope you guys consider updating the dagster_dbt library with my opinion.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

rexledesma commented 1 year ago

As mentioned in https://github.com/dagster-io/dagster/pull/15084#issuecomment-1618620381, you can use @dbt_assets to accomplish your use case, rather than using DagsterInstance.get().