geosolutions-it / evo-odas

Code Repository for the EVO-ODAS
https://waffle.io/geosolutions-it/evo-odas
MIT License
31 stars 15 forks source link

Move XCom boilerplate code out of operators definition #132

Open randomorder opened 6 years ago

randomorder commented 6 years ago

See point 4 in email on EVO-ODAS mailing list ("Airflow Ingestion Review" below) ... XCom Handling. Should be done outside of the operators (so basically in the DAG). gdal_addo already refactored, but there is a lot of boilerplate code required for each single operator. Can we create something that make it easier to use? Maybe something like a "XComBaseOperator" that our operators can extend? Or create a "XComBaseObject" that we can pass to operators? ...

ricardogsilva commented 6 years ago

@randomorder @simboss

This post is a follow up from the discussion in #161:

As per airflow's XCom docs (emphasis mine):

Tasks can push XComs at any time by calling the xcom_push() method. In addition, if a task returns a value (either from its Operator’s execute() method, or from a PythonOperator’s python_callable function), then an XCom containing that value is automatically pushed

Tasks call xcom_pull() to retrieve XComs, optionally applying filters based on criteria like key, source task_ids, and source dag_id. By default, xcom_pull() filters for the keys that are automatically given to XComs when they are pushed by being returned from execute functions (as opposed to XComs that are pushed manually).

And also in the task instance API reference:

key (string) – A key for the XCom. If provided, only XComs with matching keys will be returned. The default key is ‘return_value’, also available as a constant XCOM_RETURN_KEY. This key is automatically given to XComs returned by tasks (as opposed to being pushed manually). To remove the filter, pass key=None.

The quotes above mean two nice things in our context:

With this in mind, in the context of #161, I've been implementing the following strategy:


# in the plugin module, where the custom operator is defined
class SomeCustomOperator(BaseOperator)

    def __init__(self, get_inputs_from, *args, **kwargs):
        # keep track of the id of the task that will be providing inputs to this one
        self.get_inputs_from = get_inputs_from
        # Other init stuff ...

    def execute(self, context):
        inputs = self.context["task_instance"].xcom_pull(self.get_inputs_from)
        # Now do stuff ...
        # Finally, by returning a result we are automatically making it 
        # available to be xcom_pulled by other tasks
        return result

# in the DAG module
my_task = SomeCustomOperator(
    task_id="my_task",
    get_inputs_from=previous_task.task_id
    # more arguments may be present here
)
another_task = SomeOthercustomOperator(
    task_id="another_task",
    get_inputs_from=my_task.task_id
)

This seems like a clean approach. It will also work fine if/when we eventually move from having custom operators to using the more standard PythonOperator.

Your thoughts?