apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.04k stars 14.29k forks source link

Testing tasks from CLI when they need dependencies from other tasks #28287

Open bletvaska opened 1 year ago

bletvaska commented 1 year ago

UPDATE: The title of the issue was updated (and type was changed to a feature) to reflect the discussion that we need a feature where we can supply dependencies from other tasks to those run via task test CLI command.

Apache Airflow version

2.5.0

What happened

Let's have following DAG with task echo:

@dag(start_date=pendulum.now())
def playground():

    @task
    def echo(message: str):
        logging.info(f'Received message: "{message}"')
        return message

    echo("hello world")

playground()

I would like to test the task from CLI. So according to the help and documentation I've created JSON with required parameter for the function and then I ran the following command:

$ airflow tasks test playground echo -t '{"message": "this is airflow"}'

In the output I'll get the following lines:

[2022-12-10 21:15:19,874] {playground.py:10} INFO - Received message: "hello world"
[2022-12-10 21:15:19,874] {python.py:177} INFO - Done. Returned value was: hello world

In my opinion, this is wrong behavior.

What you think should happen instead

If I provide the content of message parameter for the task, it should be used. So instead of message "hello world" the message "this is airflow" should be displayed and returned. Also it looks like, the task was called with the parameter from inside of the DAG.

If I change the task this way:

@task
def echo(message: str, *args, **kwargs):
   print(kwargs['params'])
   logging.info(f'Received message: "{message}"')
   return message

I can see in the output, that the parameter message from the test was really passed to the task with the proper value:

{'message': 'this is airflow'}
[2022-12-10 21:26:10,495] {playground.py:11} INFO - Received message: "hello world"
[2022-12-10 21:26:10,495] {python.py:177} INFO - Done. Returned value was: hello world

How to reproduce

  1. Write simple task with TaskFlow API, such this:

    import logging
    from airflow.decorators import task, dag
    import pendulum
    
    @dag(start_date=pendulum.now())
    def playground():
       @task
       def echo(message: str, *args, **kwargs):
           print(kwargs['params'])
           logging.info(f'Received message: "{message}"')
           return message
    
       echo("hello world")
    
    playground()
  2. Test the task from CLI:

    $ airflow tasks test playground echo -t '{"message": "this is airflow"}'

Operating System

Fedora Linux 37

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template!

potiuk commented 1 year ago

Yep. It would be nice to fix it.

vemikhaylov commented 1 year ago

Hey @potiuk! I'd like to work on this task, so feel free to assign me to it!

Also I'd highly appreciate some guidance around a potential solution here. As per discussion in #28667, it seems that the observed behaviour around params here might be correct? Should we keep it then and add another flag to pass kwargs to the task (e.g., --task-kwargs)?

uranusjr commented 1 year ago

Arguments of a taskflow function are supplied by dependencies, so I think a better solution would be to solve the more general problem of supplying depended XComs to a task. This also applies to a classical operator:

@task
def get_command():
    return "ls -l"

# How can you test this?
BashOperator(task_id="run", bash_command=get_command())
potiuk commented 1 year ago

Yes. I think the way tasks tests with it's current "no DB access" assumption is very limited as you cannot really test task which rely on the dependencies.

One way of solving this will be to be able to add all extra dependencies needed (Xcom and Params I think are important) as command line parameters and injecting them whenever the task retrieves them. We could even go as far as failing such tasks when those dependencies are miissing and the error could suggest the right CLI command line to pass to "task test" command.

potiuk commented 1 year ago

I converted the issue from bug to a feature and updated the subject to reflect that.