apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.43k stars 14.11k forks source link

Allow template resolution of a upstream string passed via XComArg #26016

Open JeremyDOwens opened 2 years ago

JeremyDOwens commented 2 years ago

Edit: The original issue (kept verbatim under the separator) described a feature request toward dynamically mapped tasks specifically, but this non-rendering behaviour is not specific to dynamic task mapping, as explained in https://github.com/apache/airflow/issues/26016#issuecomment-1229364541 below. Please keep in mind this feature is technically orthogonal to dynamic task-mapping when discussing.


Description

Consider the following pseudocode:

@task
def get_templated_echo():
    return [
        'echo "The date is {{ds}}"',
        'echo "The time is {{ts}}"'
    ]

BashOperator.partial(task_id="echo_task").expand(bash_command=get_templated_echos())

This will result in generating two BashOperator tasks that execute echo statements, but the templates won't be rendered, despite bash_command being a template field.

The expected result is that the template would be applied to templated keyword arguments even when applied via expand()

Use case/motivation

While the above example can be trivially solved by accessing the dag run context inside the function, let's take about this use case.

I want to allow users to provide any number of '.sql' files with internal template strings to allow semi-technical users to drop in their SELECT statements into a folder as a file and have them be run as part of a standard routine (yes, there's lots of protection against injection). These files would internally rely on templating.

Much simplified example.

file contents: (assume there are a few dozen of these doing different things)

SELECT '{{ts_nodash}}' as dag_run_timestamp, 'file1' as filename;

Dag works kinda like this:

@task
def get_filenames_from_directory(dir):
  return [ listoffiles in dir]

SnowflakeOperator.partial(task_id='run_sql_files').expand(get_filenames_from_directory('/opt/airflow/dags/sql/mysubdir'))

In fact, we're passing this into a subclass of SnowflakeOperator that validates and cleans the SQL and uses it as part of a CTAS. We're working around this by just running the listdir function on dag load, but would prefer to do it dynamically at runtime

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

JeremyDOwens commented 2 years ago

One potential workaround in the meantime would be to apply templating to the result of the initial task and then expand on the map task, but I think it would be better to have this happen as part of the call.

uranusjr commented 2 years ago

This is actually intentionally done to match the current behaviour without task-mapping. For example:

@task
def get_templated_echo():
    return 'echo "The date is {{ ds }}"'

BashOperator(task_id="echo_task", bash_command=get_templated_echo())

The task echo_task will echo The date is {{ ds }}" without template resolution.

You could argue that the current behaviour (regardless of task mapping) is unexpected, but it is an established behaviour (taskflow has existed for a while now), and the change needs to be made for all cases, not different between dynamic task-mapping and non-mapped tasks.

I’m modifying the title to reflect the actual scope of the feature request.

JeremyDOwens commented 2 years ago

Thanks, I hadn't realized that it was the case for all strings passed as XComArg. I would argue that this is unexpected behaviour and should up updated. I'm happy to start working on it if others agree.

ashb commented 2 years ago

Nothing about xcom had ever been templated. In TP's example you can do this:

@task
def get_templated_echo(ds):
    return f'echo "The date is {ds}"'

Looking at the original ask, I'm not sure how the return of dir list needs to be templated? Can you expand on that a bit please?

Oh, list returns .sql files, and as part of the templating process the attribute is "templated" to be the contents of the file, right?

JeremyDOwens commented 2 years ago

Oh, list returns .sql files, and as part of the templating process the attribute is "templated" to be the contents of the file, right?

Yes. My intent was to pass in the filenames with a template extension and then process the file contents using that method. I was thinking more on this, and there's a way to do something additive instead of manipulative. I could potentially write a decorator similar to the @task decorator, but to extend it in a way that it renders results.

Concept

# Generate a single task that renders template strings, along with arrays or dictionaries
# of template strings
@rendered_task(template_ext=['.sql'])
def return_templates():
  return [
      'This is a template {{ ds }}',
      'this_is_a_template.sql'
   ]

A different option would be to just wrap it

# Create one task
@task
@render(template_ext=['.sql'])
def return_templates():
   return [
      'This is a template {{ ds }}',
      'this_is_a_template.sql'
   ]

or

# Create two tasks, one that processes the result of the first and returns
# applied templates
@render(template_ext=['.sql'])
@task
def return_templates():
   return [
      'This is a template {{ ds }}',
      'this_is_a_template.sql'
   ]

By doing this, we could provide options for easily applying templates to results in an extensible way without needing to change existing behavior of XComArg

Thoughts and preferences on adding something like this?

uranusjr commented 2 years ago

In 2.4.0 we’ll roll out some helper methods to transform XCom values, specifically there’s map() that can be used to iterate through the list of values. So if we provide a helper function to render a template file, you could write something like

# Illustrative only, actual module and function to be implemented.
from airflow.utils.templates import render_as_template

@task
def return_templates():
   return [
      'This is a template {{ ds }}',
      'this_is_a_template.sql'
   ]

SnowflakeOperator.partial(task_id='run_sql_files').expand(
    return_templates().map(render_as_template)
)

Would this make sense?

JeremyDOwens commented 2 years ago

Yeah, I love that. It really fits with the concept of expanding a functional interface. Happy to contribute

uranusjr commented 2 years ago

The most difficult part is actually to find a good place to host the functions.

potiuk commented 2 years ago

The most difficult part is actually to find a good place to host the functions.

Following the "common.sql" approach, we could create a common.taskflow provider. This would have the benefit of being able to evolve independently of Airlfow (but it also have a number of quirks when it comes to compatibility between packages - I treated common.sql as a playground and the last issue (I hope) of cross-compatibility that we solved is in https://github.com/apache/airflow/pull/26051 -> plus we would have to add some test harness for cross compatibilities.

potiuk commented 2 years ago

And we will have to maintain strict backwards compatibility for that common code for as long as possible.

It will be extremely difficult to make a breaking change there, if there will be multiple providers depending on it (we discussed all those scenarios while implementing common.sql - it's a very similiar case). The main problem is that introducing backwards incompatible code in a common code has a very strong "coupling" effect on providers using it. The provider versions that rely on breaking changes from new version of common provider cannot be installed together with providers that rely on pre-breaking changes.

This basically means that anything we have in the common code SHOULD be backwards compatible and we can only add features for a very long time (possibly till the end of 2. line of airflow). Unless we introduce handling of both pre/post versions in providers using the common code.

But at least we should be able to add new features much faster without waiting for Airflow upgrades.

JeremyDOwens commented 2 years ago

I think there's a further design decision to be made when we think about how helper functions may need to be extensible or themselves be function generators that accept arguments. For example,in the specific use case here, being able to manipulate the jinja environment via kwargs is important.

The example above might need to be called like this:

SnowflakeOperator.partial(task_id='run_sql_files').expand(
    return_templates().map(render_as_template(template_ext=['.sql']))
)

where render_as_template returns a function customized by the kwargs

Is this something that folks would like to see happen, or does that make the interface clunky?

potiuk commented 2 years ago

I personally have no opinion on the interface here - I think @uranusjr here is better to propose some "good" interface, however I think that packing this (and possibly few other functions into common.taskflow package (and make packages that need it, depend on it, is a good idea)