apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.42k stars 14.11k forks source link

expand_kwargs does not preserve templates #38745

Open alexdrydew opened 5 months ago

alexdrydew commented 5 months ago

Apache Airflow version

2.8.4

If "Other Airflow 2 version" selected, which one?

No response

What happened?

It looks like expand_kwargs does not preserve templates that are stored in xcom

What you think should happen instead?

I believe in this case templates should not be rendered as this is the behaviour of basic PlainXComArg.resolve and simple .expand as well

How to reproduce

import datetime

from airflow.decorators import dag, task

@task
def some_task():
    return [{"value": "{{ foo }}"}]

@task
def print_task(value):
    print(value)

@dag(
    schedule=None,
    start_date=datetime.datetime(2024, 1, 19),
    catchup=False,
)
def pipeline():
    print_task.expand_kwargs(some_task())

pipeline().test()

This fails with:

jinja2.exceptions.UndefinedError: 'foo' is undefined

Operating System

macOS 14.2

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 5 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Taragolis commented 5 months ago

return [{"value": "{{ foo }}"}]

return [{"value": "{{ 'foo' }}"}]

Taragolis commented 5 months ago

Oh... you mean it should be remaining the same. That is interesting behaviour TBH

hussein-awala commented 5 months ago

For more context, I updated the behavior of expand_kwargs in 2.7.1 by https://github.com/apache/airflow/pull/32272 to fix https://github.com/apache/airflow/issues/32260.

If we don't do this, we won't be able to use templates in mapped tasks, which is a big limitation for this great feature, so if it's not the same behavior as expand, then we need to fix expand method; I'll take a look at it when I have some free time.

alexdrydew commented 5 months ago

If we don't do this, we won't be able to use templates in mapped tasks, which is a big limitation for this great feature, so if it's not the same behavior as expand, then we need to fix expand method; I'll take a look at it when I have some free time.

In my opinion, the main problem with having templates being rendered in this case is that now we need to sanitize user data when processing it in expanded tasks (I didn't think of that at the beginning, but it is probably also a security issue in this case). When using plain tasks you generally don't need to be concerned whether the data you are processing can be harmful outside of the user code inside decorated python function context. Personally I think that would be unfortunate if this becomes a default behaviour for expand as well

potiuk commented 5 months ago

In my opinion, the main problem with having templates being rendered in this case is that now we need to sanitize user data when processing it in expanded tasks (I didn't think of that at the beginning, but it is probably also a security issue in this case). When using plain tasks you generally don't need to be concerned whether the data you are processing can be harmful outside of the user code inside decorated python function context. Personally I think that would be unfortunate if this becomes a default behaviour for expand as well

I don't think there are any significant security implications here.

The only code that can really get executed via templates that are considered in this case come from DAG Authors. There is no code that can be executed that can be supplied (as far as I know) by someone else (i.e. UI users - except those users that have "Connection Editing" capabilities). And this is all within Airflow Security Model: https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html - DAG authors already have capabilties to execute anything they want and there is no way nor intention to block them from doing so (this can be controlled and verified at code review / static checks on DAGs outside of Airflow).

Unless there is a scenario that other airflow users can provide a code that can be executed in the context of workers, I do not see how template expansion could go outside of the security model we already have and know.

alexdrydew commented 5 months ago

My concern was primarily about processing untrusted data from external sources in DAGs: it seems malicious data can be used to steal secrets in some cases:

@dag(...)
def pipeline():
    data = download_parameters_from_s3()
    transformed_data = transform.expand_kwargs(data)
    upload_to_s3(transformed_data)

in this case data author could include {{ var.value.get('SOME_SECRET') }} template and get access to the variable if the target storage is available for them. I understand that this case is probably out of scope of the airflow security model but the way how plain TaskFlow-style tasks communicate using XCom allows to process untrusted data in this way.

But not to change focus: my main concern is that even if we don't return processed untrusted data to potentially malicious user back we still need to sanitize inputs specifically for expand_kwargs in order not to fail while processing data that may contain template-like syntax (e.g. parsed webpage)

potiuk commented 5 months ago

in this case data author could include {{ var.value.get('SOME_SECRET') }} template and get access to the variable if the target storage is available for them

Yes. But you could do that with any code. DAG author could call self.render_template(EXTERNAL DATA, context) and store result somewhere as well. I think even the name of the method is pretty explicit that it is about expanding kwargs of a task. The signature of the method is:

def expand_kwargs(self, kwargs: OperatorExpandKwargsArgument, *, ...).

I don't see a danger of accidentally passing untrusted data there (but maybe I am missing something?)

alexdrydew commented 4 months ago

I think even the name of the method is pretty explicit that it is about expanding kwargs of a task.

But "expanding kwargs" does not describe whether templates will be rendered or not if some of the passed keyword arguments happen to be template strings. A valid point here could be that it is the default behaviour for how airflow treats arguments for the tasks but it is actually not the case (at least for usual TaskFlow tasks).

Case 1, template is defined during DAG import:

@task  
def my_task(dag_id: str):  
    assert dag_id == 'my_dag'  

@dag(  
    schedule=None,  
    start_date=datetime.datetime(2024, 1, 19),  
    catchup=False,  
)  
def my_dag():  
    my_task(dag_id='{{ dag.dag_id }}')

Case 2, template string is retrieved from XComArg:

@task  
def construct_template() -> str:  
    return '{{ dag.dag_id }}'  

@task  
def my_task(dag_id: str):  
    assert dag_id == '{{ dag.dag_id }}'  

@dag(  
    schedule=None,  
    start_date=datetime.datetime(2024, 1, 19),  
    catchup=False,  
)  
def my_dag():  
    my_task(dag_id=construct_template())

In our experience the fact that expand_kwargs resolves templates besides just unpacking value from XComArg to be passed as kwargs and mapping it to dynamic number of task instances was confusing and counterintuitive.

gnudiff commented 2 months ago

Is this the same issue that when creating task via dynamic task mapping, references to {{ var }} and {{ params }} get lost when templating, or should I opend a new case?

Eg. I have a custom operator created as per Airflow guide:

from airflow.models import BaseOperator

class CreateXMLFromTemplate(BaseOperator):
    template_fields = ('xml_file',)
    template_ext = ('.xml','.html')

    def __init__(
    self,
    *,
    xml_file,
    output_file,
    parameters = None,
    **kwargs,
    ):
        super().__init__(**kwargs)
        self.xml_file=xml_file
        self.parameters = parameters
        self.output_file=output_file

    def execute(self,context):
        with open(self.output_file,'w') as F:
            F.write(self.xml_file)
        return self.output_file

When called via usual tasks calls, the xml_file renders all {{ var.json .... }} and {{ params.field }} as expected:

    render_xml=CreateXMLFromTemplate(
    task_id='test_xml',
    xml_file='test.xml',
    output_file='/tmp/output.xml',
    parameters={ 'cargo':{ 'prep_date':'123123' } }
    )

When called via dynamic task mapping, it loses access to both {{ var }} and {{ params }}:

@task
def output_dynamic_task_data(): 
    results=[]
    for x in data: 
       single_result={'prep_date': xxxx }
       params=.... construct params dict
       single_result['parameters']=params
       results.append(single_result)
    return results

multi_output=output_dynamic_task_data()

create_xml_task=CreateXMLFromTemplate.partial(
    task_id='data_to_xml', xml_file='test.xml').expand_kwargs(multi_output)

Now the same CreateXMLFromTemplate operator when parsing the same xml file will come up with empty for {{ var }} and {{ params }}