apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.51k stars 14.14k forks source link

Dynamically creating tasks during run-time #289

Closed martinxsliu closed 4 years ago

martinxsliu commented 9 years ago

This pertains to #170 @jlowin's second issue of having the ability to dynamically create tasks based on the outputs of earlier tasks in the DAG. I think this broader question deserves its own discussion, separate from that issue's focus of piping one task's output to another task's input.

Our use case for this is in creating ETL pipelines where the overall job is chunked into sub-jobs based on a set data size per sub-job. The number of sub-jobs, or tasks, is strictly unknown prior to the first task of chunking up the total data set. This is inline with the examples @jlowin provided.

I would love for Airflow to support this type of dynamic dependency, this is one major blocker to us adopting it for all our data pipelining needs.

jlowin commented 9 years ago

@martinxsliu after some conversations with @mistercrunch, I increasingly think that completely dynamic DAGs are outside the scope of Airflow. Too much is premised on the idea that an entire DAG is available for analysis at the time that the DAG is fired. I think allowing DAGs to reconfigure themselves while in flight would require a total rethinking of the architecture.

However, there are a couple of possible directions to go:

  1. Add dynamics inside an operator. PythonOperators can execute arbitrary code with as few or as many "chunks" as required.
  2. The PythonBranchOperator can implement conditional paths within a DAG, so long as those paths are known in advance. Right now, I think it's mostly useful for "short circuiting" paths that you don't want to run, but once #261 is merged, they will add a lot of versatility.
  3. Lastly -- most experimentally -- I'm exploring an idea with a restricted subset of dynamic DAGs in which the tasks are mostly the same but could vary regularly. For example, lets say I have a DAG that has a task for processing each of N files, where N could change day to day (probably becoming N+1). In this case, I know exactly what tasks I'm going to need prior to each time I run my DAG (since I can count the files). So in theory, I could generate the appropriate DAG for every run. In other words, I don't need to by dynamic during the run, only before it.

    The problem is, Airflow only refreshes DAGs periodically (workers seem to do it more frequently, the web server less, and then only if a scheduler is running). @mistercrunch has commented on this in #116 and #46. So the trick is finding a way to refresh the DAG across the system. One way to do this is to manually expire the DAG when you are finished with it.1 This means that the next time a worker/server/process tries to load the DAG, it will refresh it because it sees that the current version is obsolete. But this might be expensive or infeasible with large DAGs.

    I emphasize that I'm experimenting with this and have not explored any side effects yet. For example, this could potentially introduce/remove tasks and create UI issues. I don't know yet and don't want to go as far as to endorse this -- just sparking conversation.

1: I'm doing this manually right now but it could be an interesting Operator... put it at the end of a DAG... but I have a feeling @mistercrunch doesn't want to make this too easy ;)

mistercrunch commented 9 years ago

Airflow currently thinks of a workflow as a mostly static, or slowly changing DAG of tasks. Different run instances are schedule based on a fixed schedule.

Having a fixed unit of work over time and a fixed schedule bring a lot of clarity (and some constraints) to workflows. For instance it allows for the Tree, Duration and Landing time views. It also allows for things like looking at resource consumption metadata over time and understand how things are changing.

It's all about how you define your unit of work, and you can pivot into doing things the Airflow way. So say if you receive an unpredictable stream of files, you can pivot into have a task that batches whatever landed in that folder since the last run and label the output based on the schedule. Once you've pivoted to a time based unit of work, things from that point are hourly jobs and align properly. At many companies, pivoting to fixed schedule batch approach happens far upstream in their data pipelines.

Now you could want to instantiate a DAG for each file, and change the workflow based on the content of the file or the nature of the file, that's reasonable and how programmers think in general.

The static-ish workflow approach is different. Is assumes you already know all the routes that can be taken, and wire the data down the pipes. So if you have 5 types of files that require different processing, maybe you need 5 sub-paths in your workflow that each treat a subset of the files.

But hey, I do see tons of value for "singleton" DAGs that are triggered by events, not all workflows are on a schedule! The whole "analytics as a service" is filled with "run just one time, or run on demand" use cases. I think it makes sense for Airflow to grow into allowing that as a first class use case. It's very easy to hack this together while dynamically generating DAGs (each with different dag_id) and forcing a dummy single start_date and end_date. I'd be curious to see how much mileage you'd get out of this approach. Feel free to give it a spin, all you have to do is to stamp differents DAGs as objects in the global namespace of your pipeline, as in:

for i in range(10):
    dag_id = 'my_dynamic_dag_number_' + str(i)
    globals()[dag_id] = DAG(dag_id=dag_id)

These 10 DAGs should get discovered as any other would.

mylons commented 9 years ago

This sounds relevant to my use case. Currently we're using Sun Grid Engine, qstat qsub and the rest of the q<cmd>'s, for resource management (http://www.drmaa.org/). I like the idea of airflow, and it seems possible to add hooks to drmaa in order to make our workflows cluster agnostic. (more hooks in for whatever we may replace drmaa with in the future).

It seems possible that in my case the operator could simply qsub jobs that may or may not qsub more jobs as new parts of the DAG? Or is that basically what @martinxsliu is asking?

georgebdavis commented 9 years ago

@mistercrunch, I definitely think that many dynamic-workflow use cases could be met with on-demand execution of a dag. In my experience, there often exist entire "plates" of tasks that you'd like to repeat for a series of triggers, while using dag logic like Airflow's to resolve dependencies within each plate. Airflow's DAGs represent blocks of known dependencies that are symmetric when unrolled over time; these "plates" are just the same thing unrolled over some axis other than time.

Letting us define a DAG once but then manually schedule it would maintain the invariant that the entire structure is known in between runs. Since task lists would be invariant, the cool analytics across instances would still work. (IMO these analytics are a killer feature for AirFlow, and unfortunately the "dynamically generate a new DAG for each run" approach above would lose that.) If we could stamp an arbitrary label on these runs/instances, and/or have another way of passing some metadata into them that would be accessible to operators in the context variable, my life would be complete!

mistercrunch commented 9 years ago

@georgebdavis I totally agree. This should become the next big item on the roadmap.

"Analytics as a service" where for example a data scientist would order prepared data set based on specific metadata is a use case we want to support as a first class citizen. Or simply batching your DAG execution based on events instead of fixed time is totally reasonable and should be supported. Many companies run workflows whenever a batch shows up and instantiate a dag for each batch.

I think there's potentially two new things here: "externally triggered DAGs", assuming a similar DAG structure across instances, and "singleton DAGs" that may have a very different shape over different instances, so much so that we may want different dag_ids to represent them.

The "externally triggered DAGs" could be implemented fairly easily by:

georgebdavis commented 9 years ago

Thanks @mistercrunch. I'd be happy to contribute to the dev on that. Maybe an initial step would be creating that new metadata table and the call that writes to it?

One thought for that "...": if the scheduler is discovering the next run time from the metadata table, we need to make sure any write to that table (e.g. the trigger call) wakes up the scheduler to find out if its next move has changed.

jlowin commented 9 years ago

@georgebdavis @mistercrunch exciting ideas, and definitely useful ones. I have a manual "Run DAG" button on my wishlist :)

Could an externally-triggered DAG be executed like a backfill for a single dttm? For example: airflow execute <DAG> <DTTM>, roughly equivalent to airflow backfill <DAG> -s <DTTM> -e <DTTM>.

I think this would make sensors much more powerful, since the analytics would be more closely aligned with the DAG (event-based rather than interval-based).

@georgebdavis I think this is still a little different from your question about letting tasks define other tasks and add them to an (in-flight) DAG. Would more powerful ad-hoc DAGs solve your use case there? My initial interpretation of your request was that you had a task which loaded some parameters or metadata and then used those to add downstream tasks to the DAG. Maybe you would actually generate an entirely new DAG on-demand and trigger it, rather than adding tasks to the current DAG. Just typing out loud...

georgebdavis commented 9 years ago

@jlowin, what I realized while reading this thread is that my use cases are all composed of chunks of tasks that are known in advance. For reference, here are the cases I've seen from myself or others:

In each of these cases, I know the work I want to do at deployment time, just not exactly when / how many times / on what. Enumerating those repeated chunks as separate DAGs is actually a great feature, because it lets Airflow align the repeated steps (tasks) across runs for analytics, and perform localized management operations on e.g. all tasks upstream of a certain point within one run of one chunk.

So, my problem is not really with static DAGs but with the fact that they can be only triggered by time. If I could schedule a DAG run on demand, ideally with a wee bit of metadata so that tasks know what triggered the run, I could have all of the features that depend on the static DAG and still deal with the irregularities in my schedule.

I can't think of a realistic use case where a truly dynamic DAG would be preferable to dynamically triggering static chunks. Extremely complex workflows might require tons of DAGs triggering each other - but if you find yourself in that situation, maybe you're using a workflow scheduler to do flow control that would be better implemented as program logic?

jlowin commented 9 years ago

That sounds right -- I also discovered that most of my "dynamic" DAGs were actually just static DAGs being used repeatedly but at non-regular times. I also learned that Airflow handles "slowly changing" DAGs very well -- where "slowly changing" means adding or removing a task after multiple runs.

mistercrunch commented 9 years ago

@georgebdavis said "maybe you're using a workflow scheduler to do flow control that would be better implemented as program logic?" Amen to that! I think I need to squeeze a section in the FAQ that clarifies how much dynamism make actual sense.

Another way to put it is to relate workflow and tasks to databases and tables. If you write code that alters your table structures based on the program's logic, you're probably doing it wrong. The number of tasks in a data processing workflow is often directly correlated with the number of data structures (tables). Sometimes it makes sense to break down you unit of work to a subset of a table, and to partition your tables to play along with that. For example our A/B testing framework's pipeline is partitioned and "tasked" at the data source level. When a new data source gets added, the corresponding tasks and partitions get created dynamically. Individual sources can get processed independently as they land, and experiments that pick smaller of more reliable sources are likely to land earlier in the day.

georgebdavis commented 9 years ago

I really like the DB table analogy -- it conveys that workflows might change with business needs, but probably shouldn't change during operation-as-usual. You also have some good language in a few threads about reorienting workflows to, for example, operate on all files rather than doing one task per file. Also to pick up occasional changes in a workflow by making the script declaring the DAG itself dynamic. Probably good stuff for that FAQ section!

The catch that led me to this thread is wanting to use Airflow's failure-management features to resume flows that fail during these corner cases. If the "per-file" work is complex, I don't want a failure on one file to block and require repeating of the whole task. And if the DAG-generation script would have to pull a schedule off of a failure-prone external service, I don't want Airflow to potentially barf when loading it. When you(/we?) get dynamic triggering going, I can take care of both kinds of risk by making a task that triggers a separate, pre-declared DAG. Then a failure in pulling down an external schedule just blocks that triggering task, and a failure in the "per-file" DAG only holds up that run.

(Now I'll officially stop belaboring how excited I am about that feature :) )

mistercrunch commented 8 years ago

Somewhat related reference, it's now possible to create external DAG runs from http://pythonhosted.org/airflow/code.html#airflow.operators.TriggerDagRunOperator

trnl commented 8 years ago

@mistercrunch Does it makes sense to remove the following line from README.md?

Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation. This allows for writting code that instantiate pipelines dynamically.