dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.18k stars 1.4k forks source link

The type `Optional[T]` seems not to be "really" optional #7745

Open Byloth opened 2 years ago

Byloth commented 2 years ago

Summary

I was using the new op / graph APIs, was testing out the Dagster's behaviour with some edge-cases related to Optional types.

Going into detail, I've defined both a graph and an op with an Optional[Int] parameter as input.
I tried passing an Int input to this graph and it works fine.
Then, I tried passing a None value to the same graph and it throws an error.

dagster.core.errors.DagsterInvalidConfigError: Error in config for job
    Error 1: Value at path root:inputs:number:value must not be None. Expected "Int"

Reproduction

Here's the code I used:

from dagster import op
from dagster import GraphDefinition, InputDefinition, JobDefinition
from dagster import SolidInvocation
from dagster import Int, Optional

@op
def show_number(number: Optional[Int] = None):
    if number is None:
        print("Called 'show_number' without any number")

    else:
        print(f"Called 'show_number' with {number}")

def factory_optionals_job():
    children_ops = [show_number]
    children_depends = {SolidInvocation('show_number', 'show_number'): {}}
    children_inputs = [InputDefinition('number', Optional[Int]).mapping_to('show_number', 'number')]

    composite_graph = GraphDefinition('composite_graph', node_defs=children_ops,
                                                         input_mappings=children_inputs,
                                                         dependencies=children_depends)

    pipeline = JobDefinition(composite_graph, name='optionals_job')

    return pipeline

if __name__ == "__main__":
    config = {
        'inputs': {
            'number': {'value': None}
        }
    }

    optionals_job = factory_optionals_job()
    optionals_job.execute_in_process(run_config=config)

Message from the maintainers:

Impacted by this bug? Give it a 👍. We factor engagement into prioritization.

Byloth commented 2 years ago

This issue originated from Issue #7646.

prha commented 2 years ago

Hi Byloth.

I was able to get the job to run by slightly changing the run_config to the following:

config = {
    "inputs": {
        "number": None
     }
}

Would that work for you?

prha commented 2 years ago

I think in an ideal world, the Optional[T] would accept None values in the scalar union value.

The default Int / String / etc scalar union values either accept the raw value, or a dict that specifies the value either directly or via json/pickle.

To support this fully, we would need to plumb the "optional" part through the ScalarUnion all the way through to the base builtin type support.

Byloth commented 2 years ago

Hi, @prha!
Sorry for my late reply.

Yes... It works.
But, actually, it was a simplified version of a more complex problem.

In my case, what I represent as "inputs": { "numbers": None }, is the output from another solid causing it to throw the same error.


I think the solution should be achieved as you described in your second comment.

prha commented 2 years ago

Just so that I can figure out the severity of the problem, do you have an example of something not using input config where this is causing problems? I'm somewhat surprised that we'd hit issues from Optional outputs of solids triggering this type of error.

For example, in the below simple job, the upstream solid can return an int or None and the job will execute fine.

@op
def upstream() -> Optional[Int]:
    return None

@op
def downstream(context, num: Optional[Int]) -> Optional[Int]:
    context.log.info(f'num is {num}, {type(num)}')

@job
def prha_job():
    downstream(upstream())