dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.95k stars 1.5k forks source link

DagsterInvariantViolationError with asset returning `Output(None)` when `output_required=False` #24298

Open clayheaton opened 2 months ago

clayheaton commented 2 months ago

Dagster version

1.8.4

What's the issue?

Assets with output_required=False throw the following error when returning None.

dagster._core.errors.DagsterInvariantViolationError: Unexpected 'None' output value. If a 'None' value is intentional, set the output type to None by adding return type annotation '-> None'.

What did you expect to happen?

When output_required=False is set, Dagster should not throw an error when an asset returns None.

It would help if the documentation specified what the return type should be for assets with output_required=False.

How to reproduce?

My asset basically is:

@asset(
    output_required=False
)
def MY_ASSET(context: AssetExecutionContext) -> SOME_RETURN_TYPE:
    proceed = checkSomething()
    if not proceed:
        yield Output(None, output_name="result") # also tried just Output(None)
    else:
        df = returnsADataFrame()
        yield Output(df, output_name="result")

In place of SOME_RETURN_TYPE I have tried:

Deployment type

Docker Compose

Deployment details

This is a self-hosted Docker Compose instance, version 1.8.4. Everything else works smoothly. In this case, the asset uses the snowflake_pandas_io_manager so that returning a Pandas DataFrame should materialize a table in Snowflake (as it successfully does when a DataFrame is returned).

Additional information

It works fine when a DataFrame is returned but throws the error when None is returned, either as a bare None or Output(None).

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

clayheaton commented 2 months ago

What's it called when you figure out the answer right after typing up the problem? It works without an error if you don't yield anything at all when there's nothing to yield. In other words, this works:

@asset(
    output_required=False
)
def MY_ASSET(context: AssetExecutionContext) -> pd.DataFrame | None:
    proceed = checkSomething()
    if not proceed:
        context.log.info("Skipping")
    else:
        df = returnsADataFrame()
        yield Output(df, output_name="result")

I'm of the opinion that yield Output(None) also should work without an error. Either way, this probably should be specified somewhere in the documentation.

garethbrickman commented 2 months ago

Using Generator[Optional[pd.DataFrame], None, None] worked for me, also 1.8.4.

from dagster import Definitions, asset, AssetExecutionContext, Output
from typing import Generator, Optional
import pandas as pd

@asset(output_required=False)
def my_asset(context: AssetExecutionContext) -> Generator[Optional[pd.DataFrame], None, None]:

    data = {
    'Name': ['John', 'Jane', 'Tom', 'Lucy'],
    'Age': [28, 34, 29, 40],
    'City': ['New York', 'Los Angeles', 'Chicago', 'Houston']
    }

    proceed = False

    if not proceed:
        yield Output(None, output_name="result") # also tried just Output(None)
    else:
        df = pd.DataFrame(data)
        yield Output(df, output_name="result")

defs = Definitions( assets=[my_asset] )