WordPress / openverse

Openverse is a search engine for openly-licensed media. This monorepo includes all application code.
https://openverse.org
MIT License
239 stars 191 forks source link

Replace `XCOM_PULL_TEMPLATE.format` calls with use of TaskFlow's `.output` parameter instead #2295

Open AetherUnbound opened 1 year ago

AetherUnbound commented 1 year ago

Description

Historically we've used str::format calls on the XCOM_PULL_TEMPLATE constant to define templated XComs for Airflow:

https://github.com/WordPress/openverse/blob/a5fb3ea5f5671085f0239641ddc1c637a6193763/catalog/dags/common/constants.py#L30-L29

Example usage:

https://github.com/WordPress/openverse/blob/a5fb3ea5f5671085f0239641ddc1c637a6193763/catalog/dags/common/ingestion_server.py#L125

With the addition of the TaskFlow API to Airflow, operators now have a .output value which is functionally equivalent: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#consuming-xcoms-between-decorated-and-traditional-tasks

For instance, in the example above:

        endpoint=XCOM_PULL_TEMPLATE.format(task_trigger.task_id, "return_value"),

would be replaced by

        endpoint=task_trigger.output,

For DAGs which use the pull template extensively (e.g. the Provider DAG factory), this would significantly reduce LOC and simplify the operator setup.

Alternatives

This would would supersede/replace #1488, as it would no longer be necessary.

sarayourfriend commented 1 year ago

@AetherUnbound in the example you shared, would the new version still work if the task_trigger wasn't called inside an @dag context? Or is this TaskFlow API available regardless?

AetherUnbound commented 1 year ago

I believe the task flow API is available regardless! Ultimately the functions will be used within the context of a DAG at some point in the function stack, so it doesn't need to be within a @dag decorated function (we've used it for DAGs set up using with Dag(...) as dag context managers).

sarayourfriend commented 1 year ago

So I did a bit of digging, because I realised that you're right that it doesn't have any bearing on the @dag context. It's just a convenience for calling with Dag(...) as you said. However, my understanding is that @task functions return XComArg rather than an operator instance. Which is to say, the task_trigger would need to either only ever be an operator instance or the function would need to check if it is an XComArg, or, because .output on an operator just returns the XComArg, we could make the input for that particular method the XComArg rather than the trigger task.

The XComArg models' documentation is also helpful here: https://github.com/apache/airflow/blob/a81ac70b33a589c58b59864df931d3293fada382/airflow/models/xcom_arg.py#L50

As far as I know the current usage wouldn't cause issues here, but if we tried to pass the return value of @task to wait_for_task (which I might be tempted to do, naive as I am about Airflow), the function would break. That's to say, it's probably an easier to use API if we changed it to take the XComArg in as the argument rather than the operator instance.

AetherUnbound commented 1 year ago

Thanks for doing that digging - yes in the wait_for_task function, we'd need to change that instance to be more agnostic about what it receives!