apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.04k stars 14.28k forks source link

map_index_template not compute when task marked success manually #39092

Open raphaelauv opened 6 months ago

raphaelauv commented 6 months ago

Apache Airflow version

2.9.0

What happened?

If i mark a task manually at success while it's running only already succeed mapped task have a map_index_template

Screenshot from 2024-04-17 18-13-16

What you think should happen instead?

map_index_template

should be compute in all cases

How to reproduce

from airflow.operators.python import PythonOperator

from pendulum import today
from airflow import DAG

dag_1 = DAG(
    dag_id="dag_1",
    schedule_interval=None,
    max_active_tasks=2,
    start_date=today("UTC").add(days=-1)
)

with dag_1:
    def generate():
        return [{'date': '2024-01-01'}, {'date': '2024-01-02'}, {'date': '2024-01-03'}, {'date': '2024-01-04'},
                {'date': '2024-01-05'}, {'date': '2024-01-06'}, {'date': '2024-01-07'}]

    def toto(date, arg_2=None):
        import time
        print(date)
        time.sleep(10)

    a = PythonOperator(
        task_id="a",
        python_callable=generate)
    b = PythonOperator.partial(
        task_id="b",
        map_index_template="{{ task.op_kwargs['date'] }}",
        python_callable=toto).expand(op_kwargs=a.output)

    a >> b

trigger a run , wait for a mapped task to succes , then mark state of the task b at success

Are you willing to submit PR?

Code of Conduct

RNHTTR commented 6 months ago

This might be fixed by #38902 . It might require adding some logic to render mapped tasks in set_task_instance_state, which I believe is called when manually setting a task instance's state.

karenbraganz commented 6 months ago

I would like to be assigned this issue.

raphaelauv commented 3 months ago

@karenbraganz are you still working on this issue ? thanks

karenbraganz commented 3 months ago

I created a draft PR but was informed that rendering the Jinja template on the webserver is not viable. @uranusjr could you please clarify why this is the case? I accidentally closed the PR before I could get a complete explanation from you. Here is the PR for reference: https://github.com/apache/airflow/pull/39505

raphaelauv commented 3 months ago

accidentally closed the PR you can't re-open it ?

karenbraganz commented 3 months ago

I did not re-open it because it looks like the functionality is not viable. The only thing left was for @uranusjr to provide an explanation of why they think it's not viable, which is why I have tagged them here for an explanation.

raphaelauv commented 3 months ago

could you please tell @RNHTTR if it's okay to unassign you

uranusjr commented 3 months ago

Rendering Jinja templates in the webserver means injecting custom user logic into protected Airflow services, which is not acceptable. Rendering must happen in the worker. A fix along the line mentioned in Ryan’s comment above is more likely correct, although I am not sure (and never checked) where exactly the rendering should happen.

karenbraganz commented 3 months ago

I got the error referenced in my PR after adding my rendering logic to set_task_instance_state() in models/dag.py. I just tried adding the logic to _set_state() (from models/taskinstance.py) which is called by set_task_instance_state() in www/views.py and am getting the same error. I'm not sure how to get the worker to execute the code at the moment, so I will unassign myself.