dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.34k stars 1.43k forks source link

Improve syntax for op order dependencies #1861

Open helloworld opened 4 years ago

helloworld commented 4 years ago

Assume we have solid_a and solid_b. If we want solid_b to execute after solid_a, we can composition functions to express the ordering:

@pipeline
def pipe():
    solid_b(solid_a)

However, if solid_b doesn't depend on solid_a's outputs, we need to define solid_a's input_defs as something like:

input_defs=[InputDefinition(_START, Nothing)]

We could probably come up with a way to express order dependencies in the composition function syntax, or at least create a simple alias for InputDefinition(_START, Nothing) to make it easier to understand.

mgasner commented 4 years ago

maybe solid_a.done() or solid_a.after()

mgasner commented 4 years ago

it's tricky when a solid both has outputs on which some other solids depend and when it also is used for pure sequencing

mgasner commented 4 years ago
(a, b) = solid_a()
solid_a_done = solid_a.done()
solid_b(a)
solid_c(a, b)
solid_d(solid_a_done)
natekupp commented 4 years ago

I really like being able to depend on a.done(), that feels nice and clean to me

schrockn commented 4 years ago

I think everyone is underestimating the conceptual confusion and implementation complexity of introducing a parallel system of dependency management that has different semantics and sidesteps the functional principles of the system.

I understand that there is a a practical issue where it is annoying to string to together solids where the data dependencies are unexpressed. However I think we should hold the line for awhile and see what happens.

My starting point of this is twofold. When there is a Nothing input there is generally either

1) An unexpressed data dependency. I actually like in someways the user having to declare this. When someone approaches a solid and see a nothing input, they know that there is an unexpressed dep on something. 2) Something that we haven't modeled properly in the system, likely on an operational axis. (e.g. a nice way to model spinning up a globally available, expensive resource, such as an EMR cluster).

I admit this is a principled stance that may cause usability friction, especially in the short term. But it leads to a pretty compelling world, especially when combined with the "strict mode" for resources. This way you can approach a solid and it says "I explicitly depend on these inputs and these resources". You know the state of the world necessary for computation to complete successfully. I do think that allowing an arbitrary done() or finished() method subverts that goal.

helloworld commented 4 years ago

This makes sense, and I agree. It might be worth it to give a syntactically better + more ergonomic solution for users who want to express this ordering dependency while keeping system behavior exactly the same.

InputDefinition("arbitrary_name", Nothing) is really confusing, and Nothing inputs are confusing in general since we don't specify arguments for them in the solid compute function. The input name is not necessarily meaningful either.

Here's a concrete example of a pipeline with order dependencies:

@solid
def slow(_):
    time.sleep(1)

@solid
def very_slow(_):
    time.sleep(5)

@solid
def one(_):
    return 1

@solid
def two(_):
    return 2

@solid(
    input_defs=[
        InputDefinition("number_one", Int),
        InputDefinition("number_two", Int),
        InputDefinition("order_dependency_1", Nothing),
        InputDefinition("order_dependency_2", Nothing),
    ]
)
def add(context, number_one, number_two):
    context.log.info("number_one {}".format(number_one))
    context.log.info("number two {}".format(number_two))
    return number_one + number_two

@pipeline
def addition_pipeline():
    A = slow()
    B = very_slow()
    add(one(), two(), A, B)

The questions that come to mind as a user when I write this pipeline are:

But what we could do is take another argument, called after to the solid invocation that specifies an input that is actually an order-dependency, but internally behaves exactly like an input. This is syntactic sugar at this level.

@pipeline
def addition_pipeline():
    A = slow()
    B = very_slow()
    add(one(), two(), after=[A, B])

Then, internally, we can add two input_defs to the solid definition that are sequentially numbered order_dependency_1, order_dependency_2, etc. This removes the need to specify this input in input_defs, leaving us with just:

@solid(
    input_defs=[
        InputDefinition("number_one", Int),
        InputDefinition("number_two", Int),
    ]
)
def add(context, number_one, number_two):
    ...

Even though they're not explicitly defined here, they are added later and the system behavior is identical.

We could go a step further and give these dependencies a slightly special treatment in dagit (the lines could have a different color, for example).

cc @mgasner

asingh16 commented 4 years ago

Oooh I did not see this issue. I want to take a step back here. From a user perspective, let's first think about "When would we end up in a situation where we want to model a pipeline that has a solid that needs to wait for another solid to complete".

A more realistic scenario is: "I want to run an sql query which creates a table, and then run another sql table which joins that created table to another table in order to create a rolled up view that I can use to answer a question".

Under the hood this would currently be described as:

@solid
def query_for_table_a(_):
    try:
        # Use sqlalchemy to execute a query which updates table a
    except QueryError as ex:
        context.log.error("Query failed for table a")
        raise

@solid
def query_for_table_b(context):
    try:
        # Use sqlalchemy to execute a query which updates table b
    except QueryError as ex:
        context.log.error("Query failed for table b")
        raise

@solid(
    input_defs=[
        InputDefinition('_TABLE_A', Nothing),
        InputDefinition('_TABLE_B', Nothing)
    ]
)
def execute_rollup_query(context, _TABLE_A, _TABLE_B):
    try:
        # Use sqlalchemy to execute a query which joins a and b. 
    except QueryError as ex:
        context.log.error("Query failed for table b")
        raise

@pipeline
def create_rollup_view():
    execute_rollup_query(
        query_for_table_a(),
        query_for_table_b()
    )

So I think that we are actually getting the drivers behind the problem wrong here. The user problem here is:

"As a pipeline creator, I want my input/output definitions to consistently match up with the solid dependnecy semantics I expressed in my pipeline code."

Right now we are just thinking about solid completion, but this is more of a question of control semantics because what happens if I actually want execute_rollup_query to run a different query if query_for_table_a fails? It would be hella awkward to express this at the pipeline level, and thus the after or done idiom starts to break down.

I think instead of introducing more complexity and confusion at the pipeline level, we ought to just fix the simple problem which is "Nothing" is a super meaningless way of expressing control flow. Let's just have custom dagster constant types (SUCCESS, FAILED, ERROR, etc.) which folks can use which would make the above pipeline look something like this:

SUCCESS = 1
FAILURE = 0

@solid
def query_for_table_a(context):
    try:
        # Use sqlalchemy to execute a query which updates table a
        return SUCCESS
    except QueryError as ex:
        context.log.error("Query failed for table a")
        # You can raise here too but let's say we want to adapt to failure
        return FAILURE

@solid
def query_for_table_b(context):
    try:
        # Use sqlalchemy to execute a query which updates table b
        # we can use output types here, but im "casting" for expediency
        return DagsterSignal(SUCCESS)
    except QueryError as ex:
        context.log.error("Query failed for table b")
        return DagsterSignal(FAILURE)

@solid(
    input_defs=[
        InputDefinition('_TABLE_A', DagsterSignal),
        InputDefinition('_TABLE_B', DagsterSignal)
    ]
)
def execute_rollup_query(context, _TABLE_A, _TABLE_B):
    if _TABLE_A == SUCCESS:
        try:
            # Use sqlalchemy to execute a query which joins a and b. 
        except QueryError as ex:
            context.log.error("Query failed for table b")
            raise
    if _TABLE_A == SUCCESS and _TABLE_B == FAILURE:
        # Do something special
    else:
        # raise or whatever

@pipeline
def create_rollup_view():
    execute_rollup_query(
        query_for_table_a(),
        query_for_table_b()
    )

Now I am not sold on the DagsterSignal stuff, but I moreso meant that we just need a better way to express control flow in a way thats consistent with our pipeline semantics.

schrockn commented 4 years ago

The problem with the FAILURE signal value is that this encapsulates the "Successful Failure" scenario but nothing else. That is not what you want sometime. I believe that most of the time a something like a QueryError is raised you want to report the execution step as failed. The proposed approach would mark is as succeeded but with the signal with the value of FAILURE.

A further issue with using the signal value as control is that it directly contradicts the way that the system proposes to do control flow currently most of the time, which is handling this with multiple outputs, which allows one to fire solids conditionally.

schrockn commented 4 years ago

I think the most productive way to think about is to think about an input as a precondition or an expectation that must be true in order for the computation to succeed.

Any solid that does something is changing something. Either it is doing a computation on an in-memory value and (in any multiprocess case) persisting that value in the intermediate store, or it is changing the external state of the world, or else it should not exist. A downstream solid depends on that changed external state (let's just group persistence into intermediate store as "external state change" for now for simplification), else the dependency should not exist.

With that mental framework, let's dig into into the SQL example.

"A more realistic scenario is: "I want to run an sql query which creates a table, and then run another sql table which joins that created table to another table in order to create a rolled up view that I can use to answer a question"."

The output signifies that a table has been created and that anything that depends on that table existing can now execute. (The body of the type check could be an existence check to be strict). The input on the downstream solid signifies that the table must exist in order for the computation to proceed and succeed.

In terms of having to assign a name to an input all the time, we can make a simple change: make the input name optional and then force the use of kwargs in the pipeline DSL for named parameters. We would autogenerate parameter names such as "_1" for our internal representations.


TableFoo = define_table('Foo')
TableBar = define_table('Bar')

@solid
def create_table_foo(context) -> TableFoo:
   # execute a query

@solid(InputDefinition(dagster_type=TableFoo))
def create_table_bar(context) --> TableBar
   # execute some query

@pipeline
def tables()
   create_bar(create_foo())

You could change type or parameter names as you desire if you prevent more "event-oriented" names.

asingh16 commented 4 years ago

hmmm good point. That def solves the table issue.

For more pythonic solids. Can we at least have something better than Nothing at the very least? Like Success() or something.

RE @helloworld's point. You are right this can get confusing if input definitions don't match up with parameters. I think one solution here is just adding a dependency field to the SolidDefinition? The dependency field is exactly the same as input_defs (you can union those lists by doing input_defs+=data_dependencies in the back), but it just makes explicit that we have data dependencies and not input/output dependencies? @schrockn what do you think?

@solid(
    input_defs=[
        InputDefinition("number_one", Int),
        InputDefinition("number_two", Int),
    ],
    data_dependencies=[
        InputDefinition("order_dependency_1", Success),
        InputDefinition("order_dependency_2", Success),
    ]
)
def add(context, number_one, number_two):
    context.log.info("number_one {}".format(number_one))
    context.log.info("number two {}".format(number_two))
    return number_one + number_two
schrockn commented 4 years ago

Adding a dependency field to SolidDefinition is similar to adding a caller field to a function declaration. It doesn't make too much sense.

I'm open to renaming Nothing. However this presumes that all Nothing signify "Success". There's any number of things a Nothing can represent.

asingh16 commented 4 years ago

RE Nothing. Let's ignore this for now so that we can go back to the main thing, I will throw up a diff and we can talk about it then. I think we are at implementation details at this point.

RE dependency fields: I don't know what you mean by that. My real point is this. You claim:

an input as a precondition or an expectation that must be true in order for the computation to succeed.

So I think the main issue here is the difference between "input" and "dependency". Now this subtle difference seems trivial, but I think this is the root of all confusion here. (Hold onto your butts).

The definition of an input[I] is what is put in, taken in, or operated on by any process or system.

The definition of a dependency[D][dependent] is contingent on or determined by.

Contextualized, I am going to claim that dependencies are {Materializations, Inputs}. Moreover, python also has a physical meaning for a function input subject to linting rules and interpretor rules, let's call it a parameter[P] to disambiguate it from an "input" which is a value that is taken in and operated on. (All parameters are inputs, but not all inputs are parameters).

The problem with claiming that Dependencies, Parameters, and Inputs are the same is that you get some unclear semantics in the programming model (exemplified by @helloworld example above where you have 4 input definitions but 2 function parameters in the function definition). Moreover, we might end up doing a lot of stitching in the SolidDefinition as we start supporting more and more complex dependencies (how do you express materialization dependencies today)?

While this def seems grafted on (ala caller field), what I am trying to express is a need for a lower level/clearer API for expressing dependencies so that we can better roll them up into parameters.

asingh16 commented 4 years ago

food for thought! This is def very academic and I would like to see more real world examples/data points before proposing anything real so I am perfectly happy with tabling this.

basilvetas commented 4 years ago

I've run into some implementation challenges related to this issue -- spoke with some of your team in Slack and was encouraged to provide my use case as an example here.

So I'm building an ELT pipeline, and as a convention have set up my pipeline to have three stages: extract, load, transform. I'm using Nothing passed between them to create the dependency structure. My pipeline looks like this:

@pipeline(
    mode_defs=mode_defs(),
    preset_defs=preset_defs(),
)
def example_pipeline():
    transform(depends_on=[
        load(depends_on=[
            extract()
        ])
    ])

In general, "extract" means extract data from original source into an S3 staging bucket, "load" means copy data from S3 staging bucket into Redshift within my "raw data" schema, and "transform" means move data from "raw data" schema into "production ready" schema while performing necessary cleaning, filtering, transforms along the way. I'm primarily using dbt models via dagster-dbt in the transform stage.

Each of the three solids - extract, load, transform - are defined as a composite_solid, allowing me to break these stages down into child solids within each. (I've established these conventions for consistency because I'll eventually be building more pipelines for different data sources/other developers will be building pipelines etc)

I didn't want to have to map the Nothing input into my child solids because many of my child solids are reusable utils, and I didn't want to have to change the solid input definition just to map Nothing for this specific use case. So, as a workaround I wrote this solid to map my Nothing to within each composite_solid:

@solid(input_defs=[InputDefinition(name='depends_on', dagster_type=Nothing)])
def do_nothing(context) -> Nothing:
    return

My three stages basically look like this:

@composite_solid(
    input_defs=[InputDefinition(name='depends_on', dagster_type=Nothing)]
)
def extract(depends_on: Nothing) -> Nothing:

    # child solids extract stuff...

    return do_nothing(depends_on=depends_on)

@composite_solid(
    input_defs=[InputDefinition(name='depends_on', dagster_type=Nothing)]
)
def load(depends_on: Nothing) -> Nothing:

    # child solids load stuff...

    return do_nothing(depends_on=depends_on)

@composite_solid(
    input_defs=[InputDefinition(name='depends_on', dagster_type=Nothing)]
)
def transform(depends_on: Nothing) -> Nothing:

    # child solids transform stuff...

    return do_nothing(depends_on=depends_on)

The challenge that I'm still running into is that because I haven't mapped the Nothing input to my child solids, the dependency structure is only enforced for the do_nothing solids, but isn't enforced for the child solids that actually do stuff (i.e. my dbt model in transform starts executing before load has finished copying data). It sounds like I could go back and map Nothing into my custom util child solids to enforce the dependency structure, however, in the case of my dbt models I'm not sure what to do because when using the create_dbt_run_solid from dagster-dbt I can't change the solid definition in order to map the Nothing input.

I'm still learning Dagster so please let me know if my understanding of things is incorrect/incomplete -- there may be simple solutions that I've missed, and I may be abusing the system a bit. If that is the case please let me know how you guys would suggest to resolve these issues.

In general though, as a user, I would ideally like a way to establish my dependency structure between my composite solids, and be guaranteed that the structure will be enforced across the child solids without having to do this extra layer of Nothing mapping to child solids. Additionally, my do_nothing solid feels hacky and long term I'd love a way to do away with that.

I can't speak to the implementation challenges in your system, but as a user I think something more semantic than passing Nothing for establishing dependency structures would definitely be welcome. In my specific use case, it seems like the notion of my "extract" stage being "done" or my "load" stage being "done" really corresponds to "some asset exists in S3" or "some table has been populated in Redshift" etc and these are the real notions that should be used to establish dependencies/preconditions. Just thinking out loud now. Hope my example is helpful in brainstorming solutions and please let me know if there is anything I can do to help. Have really enjoyed using Dagster thus far!

sephib commented 4 years ago

In general though, as a user, I would ideally like a way to establish my dependency structure between my composite solids, and be guaranteed that the structure will be enforced across the child solids without having to do this extra layer of Nothing mapping to child solids.

I think this is the key issue

MeganBeckett commented 1 year ago

Hi thee, is there a solution to this?

I have this exact issue where I want to order dependencies between ops but ins={"start": In(Nothing)} doesn't suffice as described here: https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#defining-nothing-dependencies

The reason is that one op has to happen after another op but also relies on the input of a separate op.

To try explain, here is some sudo code:

@op
def pull_rawdata_1():
    pulls data from API end point 1

@op
def process_rawdata_1(df):
    inserts rawdata_1 df into DB

@op
def pull_rawdata_2():
    pulls data from API end point 2

@op(ins={"start": In(Nothing)})
def process_rawdata_2(df):
    processes and inserts rawdata_2 df into DB which requies creating foreign key relations to rawdata_1 hence this needs to be  in the DB first

@graph
def process_data():
    # Do I need this line or can I just include it in the start argument below?
    process_rawdata_1(df=pull_rawdata_1())

   # If I try this, using the start argument in addition to a data dependency on a different op, it doesn't work
    process_rawdata_2(df=pull_rawdata_2(), start=process_rawdata_1(df=pull_rawdata_1()))

The error I get from the above is Invalid dependencies: solid "process_rawdata_2" does not have input "start". Available inputs: ['df']

How can I do this? ie. i want to define the order dependency explicitly where there are data dependencies and also not?

I have also tried using tags, for example:

@op(
    required_resource_keys={"database"},
    tags = {
        "dagster/priority": 5
    })

And giving the ops different dagster/priority tag numbers from negative to positive to try create a relative ordering, but this doesn't work.

sryza commented 1 year ago

@MeganBeckett - I would expect that to work, so if it doesn't, it's a bug.

This just ran successfully for me:

from dagster import op, graph, In, Nothing

@op
def pull_rawdata_1():
    ...

@op
def process_rawdata_1(df):
    ...

@op
def pull_rawdata_2():
    ...

@op(ins={"start": In(Nothing)})
def process_rawdata_2(df):
    ...

@graph
def process_data():
    # If I try this, using the start argument in addition to a data dependency on a different op, it doesn't work
    process_rawdata_2(df=pull_rawdata_2(), start=process_rawdata_1(df=pull_rawdata_1()))

process_data.execute_in_process()

Does it error for you?

MeganBeckett commented 1 year ago

Thanks for the reply @sryza, it does work - I must have another issue in my code as the error I get is Invalid dependencies: solid "process_rawdata_2" does not have input "start". Available inputs: ['df']