dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.71k stars 1.48k forks source link

AssetOut io_manager_key argument not picked up when using graph_multi_asset #15001

Open pcadmanbosse opened 1 year ago

pcadmanbosse commented 1 year ago

Dagster version

1.3.4

What's the issue?

When decorating a function with @graph_multi_asset, outs defined with an io_manager_key do not use the correct io_manager (instead using the default io manager)

What did you expect to happen?

I believe this behaviour is incorrect, as multi_asset does use the provided io_manager_key.

How to reproduce?

Reproducing the bug:

from dagster import op, AssetOut, graph_multi_asset, define_asset_job
@op
def return_number():
    return {"number": 1}

@graph_multi_asset(
                   outs={
                       "asset1": AssetOut(description="asset1", is_required=False, io_manager_key="asset_io_manager"),
                       "asset2": AssetOut(description="asset2", is_required=False, io_manager_key="asset_io_manager")
                   }
                   )
def test_asset():
    return {
        "asset1": return_number(),
        "asset2": return_number()
    }

test_job_multi_asset_graph = define_asset_job("test_job_multi_asset_graph", ["asset1", "asset2"])

As you can see here in logs, the wrong IOManager is used image (8)

Functioning case with @multi_asset:

Here is the functioning case with multi_asset:

from dagster import AssetOut, multi_asset, Output, define_asset_job
@multi_asset(
        outs={
            "asset3": AssetOut(description="asset3", is_required=False, io_manager_key="asset_io_manager"),
            "asset4": AssetOut(description="asset4", is_required=False, io_manager_key="asset_io_manager")
})
def multi_asset():
    yield Output(value={"number": 3}, output_name="asset3")
    yield Output(value={"number": 4}, output_name="asset4")

test_job_multi_asset = define_asset_job("test_job_multi_asset", ["asset3", "asset4"])

Which when run, as you can see uses the provided "asset_io_manager":

image

FYI this is the init file:

defs = Definitions(
    assets=[*load_assets_from_modules([assets])],
    jobs=[test_job_multi_asset_graph, test_job_multi_asset],

    sensors=[
    ],
    schedules=[],
    resources={
        "io_manager": s3_pickle_io_manager.configured(
            {"s3_bucket": "test_bucket"}
        ),
        "asset_io_manager": CustomIOManager(),
    },
)

Deployment type

None

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

danielgafni commented 1 year ago

Lol I also reported this, but on Slack. And also linked the same issue.

pcadmanbosse commented 1 year ago

Just found your thread now! It give me the idea of using an op's Out "io_manager_key" as a temporary workaround solution for our use case ;) so thanks for that!