dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.53k stars 1.45k forks source link

Problem with `ins` using an asset factory #16364

Open slopp opened 1 year ago

slopp commented 1 year ago

Dagster version

1.4.4

What's the issue?

If you dynamically create assets, e.g. using an asset factory, Dagster gets confused if you attempt to use the dynamic assets as ins.

A sample is in included below, the error message is:

__ASSET_JOB cannot be executed with the provided config. Please fix the following errors:

Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'asset1_downstream': {'inputs': {'upstream': '<selector>'}}}}

What did you expect to happen?

Materializing dynamically created assets that rely on ins should work

How to reproduce?

The following code creates a valid asset graph:

from dagster import Definitions, AssetsDefinition, asset, AssetExecutionContext, AssetIn
from typing import List
from itertools import chain 

specs = [
    {"name": "asset1",  "sql": "sql for asset1"},
    {"name": "asset2", "sql": "sql for asset2"},
]

def execute_sql(context: AssetExecutionContext, sql: str) -> None:
    context.log.info(sql)

def build_assets(spec):
    @asset(name=spec["name"])
    def _asset_upstream(context: AssetExecutionContext):
        execute_sql(context, spec["sql"])
        return "upstream_test"

    @asset(
        name=spec["name"]+"_downstream", 
        deps=[spec["name"]],
        ins={"upstream": AssetIn(key=spec["name"])}
    )
    def _asset_downstream(context: AssetExecutionContext, upstream):

        execute_sql(context, upstream + spec["sql"]+" downstream")

    return [_asset_upstream, _asset_downstream]

assets=[build_assets(spec) for spec in specs]

defs = Definitions(assets=list(chain.from_iterable(assets)))
Screen Shot 2023-09-07 at 8 58 56 AM

If you select asset_1 and asset_1_downstream and click "Materialize" you get the error:

__ASSET_JOB cannot be executed with the provided config. Please fix the following errors:

Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'asset1_downstream': {'inputs': {'upstream': '<selector>'}}}}

However, if you remove ins the two assets can be successfully materialized.

Deployment type

Local

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

slopp commented 1 year ago

Update, this issue is caused by having deps and ins. If you remove deps the assets materialize as expected:

from dagster import Definitions, AssetsDefinition, asset, AssetExecutionContext, AssetIn
from typing import List
from itertools import chain 

specs = [
    {"name": "asset1",  "sql": "sql for asset1"},
    {"name": "asset2", "sql": "sql for asset2"},
]

def execute_sql(context: AssetExecutionContext, sql: str) -> None:
    context.log.info(sql)

def build_assets(spec):
    @asset(name=spec["name"])
    def _asset_upstream(context: AssetExecutionContext):
        execute_sql(context, spec["sql"])
        return "upstream_test"

    @asset(
        name=spec["name"]+"_downstream", 
        ins={"upstream": AssetIn(key=spec["name"])}
    )
    def _asset_downstream(context: AssetExecutionContext, upstream):

        execute_sql(context, upstream + spec["sql"]+" downstream")

    return [_asset_upstream, _asset_downstream]

assets=[build_assets(spec) for spec in specs]

defs = Definitions(assets=list(chain.from_iterable(assets)))