apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.82k stars 14.24k forks source link

Can one operator output to other operator input? #170

Closed griffinqiu closed 9 years ago

kwent commented 9 years ago

+1

oren-yowza commented 9 years ago

+1

Susanne-ywz commented 9 years ago

+1

seibert commented 9 years ago

This would shift Airflow DAGs from dependency graphs to dataflow graphs, which brings some interesting issues. Output data would need to be serialized to the Airflow database to ensure execution could continue if workers need to be restarted.

r39132 commented 9 years ago

+1

mistercrunch commented 9 years ago

I need a few use cases to understand what everyone is trying to achieve with this feature.

It seems fairly straightforward to publish a message for a task downstream and have that task downstream pick it up, and bring it up into its context (making it available in the operator's execute method as well as in the templates).

It's also safe to assume that the message could be a "pickleabe" python object of limited size (probably smaller than 1MB). Right?

The question is how would the message be published? Is it an operator feature or a task feature? By that I mean is it baked into an operator, or is it baked when creating the task?

Let's assume it's a task level feature. What if we had a parameter of BaseOperator called drop_message that expects a callable and calls it back after executing the task, passing it the context. Whatever is returned by that callable gets serialized and put into the database, and associated with that task instance. Downstream tasks can specify a read_message, most likely as a list of references to upstream tasks it's expecting messages from. These messages are made available in the context. Would that work for your use cases?

griffinqiu commented 9 years ago

My use case: I have a mount folder. I need load the data to database. My steps:

  1. Use BashOperator to list the data
  2. Use HiveOperator to create tables and load data

So, I want to use BashOperator output to HiveOperator params

mistercrunch commented 9 years ago

And there are no way for you to know ahead of time what the filenames are going to be? Are all the files targeting the same table?

Maybe you could write a FolderToHiveTransfer operator, or just a PythonOperator that does that. ?

griffinqiu commented 9 years ago

Yep, the files are different tables. I do not how many, which table they have. I will try use pythonoperator do that.

mistercrunch commented 9 years ago

I think it's a somewhat legit use case, though it'd be nice to have a more defined "contract" with whoever drops those files. If you could have predictable filenames/folder and time interval it really simplifies ETL. Not only it simplifies the ETL logic, but the way the metadata and matching logs are organized.

But I understand that sometimes that's just the way it is and you have to deal with it...

mistercrunch commented 9 years ago

Keeping open as this feature is still needed. I'd like to hear as many use cases as possible before designing a solution.

kwent commented 9 years ago

Hi,

My scenario is :

I could do this in the same job but i had like to separate them.

Regards,

mistercrunch commented 9 years ago

Seems like a very small unit of work, you could easily squeeze both in the same PythonOperator.

Are you concerned about minimizing the number of hits on the first endpoint?

jlowin commented 9 years ago

I think the broader issue here is the difficulty of of building run-time dynamic pipelines, where the tasks and dependencies vary based on factors discovered during the run. I don't see a great way to handle that in Airflow. Here are some rambling thoughts...

This manifests in two ways:

  1. a task whose parameters depend on information discovered by earlier tasks (as described by @griffinqiu and @kwent)
  2. a task whose existence depends on information discovered by earlier tasks (haven't seen anyone mention this yet, but I think it's the larger issue -- basically a superset of the first one)

Issue 1) could be solved with a convenience function for serializing data to the database. That could be injected into the context, as @mistercrunch wrote, or simply persisted in a table (it seems like Variables, or something like Variables, are the right approach here. Using callbacks and context requires users to first store objects in context, then write a callback... it seems like the logic could be wrapped up nicely in a function and sanitized/checked for compatibility in the process. I will open a separate issue to discuss this.) I think, broadly speaking, that it's healthy to split up any logical task into small pieces even if the unit of work is small, if only because it leads to better monitoring of progress and errors.

Issue 2) is much harder and @griffinqiu's issue could be viewed as a variant of it. Basically, we don't know in advance what tasks we need to run. I think the most common variant of this is "vertical" scaling (vertical in the context of the web UI), where I know what tasks I will run, but I don't know how many or with what parameters (i.e. how many vertical rows I'll see in the web UI).

For example:

Airflow is built around the concept of DAG discovery taking place entirely separately from running them. So this is hard to reconcile with the idea of dynamic DAG building. But perhaps a DAGOperator could generate a DAG at runtime and run it ad-hoc, then delete it. However I'm not sure how it would play with the interface (in terms of displaying progress and also knowing if tasks have already been run on subsequent runs).

Food for thought... this is a problem we deal with in our pipeline so I will continue to think about it and would love any other perspective.

azaroth42 commented 9 years ago

For what it's worth, +1. This is a showstopper for our [Stanford] adoption, as much as we otherwise like airflow. For use cases and similar, consider Apache Spark's docs and particularly: http://spark.apache.org/docs/latest/streaming-programming-guide.html

Regarding @jlowin's excellent analysis, 1 is a must have, and 2 a nice to have (we can call a bunch of no -op operators that return straight away when not appropriate, but not creating/calling them at all would be better!)

mistercrunch commented 9 years ago

We're currently sketching a solution for this here, please join the PR and comment on the implementatoin details as it shapes up: https://github.com/airbnb/airflow/pull/232 I think it's taking shape nicely with a very integrated model.

Now keep in mind that Airflow is a batch workflow engine, not a realtime stream processing engine like Storm or Spark Streaming, and that the current model works extremely well, at scale, for what it is intended. A similar system at Facebook that inspired some of the design decisions behind Airflow is used by hundreds of data people everyday to satisfy a very large array of use cases. I'm convinced there would be a way to solve your use cases with Airflow as it is today, but agree that cross task communication can be more elegant in many cases.

@azaroth42, we use Airflow to trigger Spark, Cascading, Hive, MR, ML, Ruby, Python, MySql, and all sorts of other tasks at Airbnb. Airflow is the orchestra director, it glues all of these systems together in a nice symphony

For the chunks of pipelines that you'd want to be very dynamic and shape-shifting (I'd like more specific use-cases still to understand what you are all up to!), maybe something external to Airflow like some platform, program, or service can satisfy these use cases. Assuming that this thing works in batch, Airflow would just coordinate this service with others.

mistercrunch commented 9 years ago

http://pythonhosted.org/airflow/concepts.html#xcoms