apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.13k stars 14.31k forks source link

More dynamic pod_mutation_hook, allowing access to task context and inputs #19673

Open hterik opened 3 years ago

hterik commented 3 years ago

Description

We have some use cases which require fairly complex and dynamic configuration of Kubernets pods. For example configuring the nodeSelector VM size based on task inputs. Also juggling of Volumes, and in particular persistent volume claims. Imagine tasks allocating and preparing volumes which get passed on as input to other tasks.

The mechanism for this today in airflow is

Would it be possible to add a new more rich pod mutation function with following additions:

Use case/motivation

No response

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

potiuk commented 2 years ago

Aren't Cluster Policies the exact answer to your question?

https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html

Specifycally task_instance_mutation_hook seems to be launched at the very time you want and have all the information needed from the context?

shiying2012 commented 2 years ago

My pod is in a similar situation. We are currently using taskflow API and KubernetesExecutor.

Any ideas?

potiuk commented 2 years ago

So just to suggest the possible solution again:

Aren't Cluster Policies the exact answer to your question?

https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html

I am not 100% if it will work and whether you will be able to modify both POD and queue this way, but I think there is no reason you shoud not be able to. Can you try it @shiying2012 ?

hterik commented 2 years ago

I've tried out task_instance_mutation_hook from Cluster Policies now. It sounds a bit strange to use something called policies for this use case but if it works the way it's explained it's probably good enough.

Unfortunately it seems to not behave the way it's described in the documentation, eg "Called right before task execution.". What happens instead is the hook is being called on all task instances whenever a dagrun is scheduled. Additionally it's not called at all when manually triggering a dag, only when triggered by schedule.

Following example demonstrates:

    @task
    def pvc_provider():
        time.sleep(5)
        return "pvc://blablabla"

    @task    
    def pvc_consumer(inparam):
        time.sleep(5)        
        return "build result"

    pvc_consumer(pvc_provider())

The only thing my task_instance_mutation_hook does is to log the task instances being passed into it. Observe the time stamps, in the log we see the mutation hook being called in the very same moment for both the tasks, when they should be 5 seconds apart.

[2021-12-02 12:50:03,024] {airflow_local_settings.py:16} INFO - In the mutation hook for ti='\x1b[01mpvc_provider\x1b[22m', run_id='\x1b[01mscheduled__2021-12-02T11:49:52.937626+00:00\x1b[22m'
[2021-12-02 12:50:03,025] {airflow_local_settings.py:16} INFO - In the mutation hook for ti='\x1b[01mpvc_consumer\x1b[22m', run_id='\x1b[01mscheduled__2021-12-02T11:49:52.937626+00:00\x1b[22m'

producer task starts shortly thereafter

[2021-12-02 12:50:03,060] {dag.py:2928} INFO - Setting next_dagrun for pvc to 2021-12-02T11:50:02.937626+00:00
[2021-12-02 12:50:03,097] {scheduler_job.py:288} INFO - 1 tasks up for execution:
    <TaskInstance: pvc.pvc_provider scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
[2021-12-02 12:50:03,098] {scheduler_job.py:410} INFO - Setting the following tasks to queued state:
    <TaskInstance: pvc.pvc_provider scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
[2021-12-02 12:50:03,099] {scheduler_job.py:450} INFO - Sending TaskInstanceKey(dag_id='pvc', task_id='pvc_provider', run_id='scheduled__2021-12-02T11:49:52.937626+00:00', try_number=1) to executor with priority 2 and queue default
[2021-12-02 12:50:03,099] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'pvc', 'pvc_provider', 'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_pvc.py']
[2021-12-02 12:50:03,105] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'pvc', 'pvc_provider', 'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_pvc.py']
Running <TaskInstance: pvc.pvc_provider scheduled__2021-12-02T11:49:52.937626+00:00 [queued]> on host 
[2021-12-02 12:50:09,307] {scheduler_job.py:504} INFO - Executor reports execution of pvc.pvc_provider run_id=scheduled__2021-12-02T11:49:52.937626+00:00 exited with status success for try_number 1
[2021-12-02 12:50:09,311] {scheduler_job.py:547} INFO - TaskInstance Finished: dag_id=pvc, task_id=pvc_provider, run_id=scheduled__2021-12-02T11:49:52.937626+00:00, run_start_date=2021-12-02 11:50:04.024368+00:00, run_end_date=2021-12-02 11:50:09.107072+00:00, run_duration=5.082704, state=success, executor_state=success, try_number=1, max_tries=0, job_id=288, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator

5 seconds later producer finishes and consumer is scheduled. Here is where the mutation is desired to happen.

[2021-12-02 12:50:09,458] {scheduler_job.py:288} INFO - 1 tasks up for execution:
    <TaskInstance: pvc.pvc_consumer scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
[2021-12-02 12:50:09,459] {scheduler_job.py:317} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2021-12-02 12:50:09,459] {scheduler_job.py:345} INFO - DAG pvc has 0/16 running and queued tasks
[2021-12-02 12:50:09,459] {scheduler_job.py:410} INFO - Setting the following tasks to queued state:
    <TaskInstance: pvc.pvc_consumer scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
[2021-12-02 12:50:09,460] {scheduler_job.py:450} INFO - Sending TaskInstanceKey(dag_id='pvc', task_id='pvc_consumer', run_id='scheduled__2021-12-02T11:49:52.937626+00:00', try_number=1) to executor with priority 1 and queue default
[2021-12-02 12:50:09,460] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'pvc', 'pvc_consumer', 'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_pvc.py']
[2021-12-02 12:50:09,465] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'pvc', 'pvc_consumer', 'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 'DAGS_FOLDER/dag_pvc.py']
Running <TaskInstance: pvc.pvc_consumer scheduled__2021-12-02T11:49:52.937626+00:00 [queued]> on host

5 seconds later consumer finishes

[2021-12-02 12:50:15,675] {scheduler_job.py:504} INFO - Executor reports execution of pvc.pvc_consumer run_id=scheduled__2021-12-02T11:49:52.937626+00:00 exited with status success for try_number 1
[2021-12-02 12:50:15,679] {scheduler_job.py:547} INFO - TaskInstance Finished: dag_id=pvc, task_id=pvc_consumer, run_id=scheduled__2021-12-02T11:49:52.937626+00:00, run_start_date=2021-12-02 11:50:10.399690+00:00, run_end_date=2021-12-02 11:50:15.498334+00:00, run_duration=5.098644, state=success, executor_state=success, try_number=1, max_tries=0, job_id=289, pool=default_pool, queue=default, priority_weight=1, operator=_PythonDecoratedOperator
[2021-12-02 12:50:15,700] {manager.py:1051} INFO - Finding 'running' jobs without a recent heartbeat
[2021-12-02 12:50:15,700] {manager.py:1055} INFO - Failing jobs without heartbeat after 2021-12-02 11:45:15.700869+00:00
[2021-12-02 12:50:15,815] {airflow_local_settings.py:16} INFO - In the mutation hook for ti='\x1b[01mpvc_provider\x1b[22m', run_id='\x1b[01mscheduled__2021-12-02T11:50:02.937626+00:00\x1b[22m'
[2021-12-02 12:50:15,816] {airflow_local_settings.py:16} INFO - In the mutation hook for ti='\x1b[01mpvc_consumer\x1b[22m', run_id='\x1b[01mscheduled__2021-12-02T11:50:02.937626+00:00\x1b[22m'

This was tried out using the SequentialScheduler and sqllite, maybe other schedulers behave differently? Though i doubt it, skimming through the DagRun.verify_integrity where the hook is called it appears to always apply to all taskinstances at the same time.

easontm commented 2 years ago

FYI @hterik, the pods should have some basic metadata attached to them in the labels section. I modify my pods' nodeSelector with the hook based on DAG and task information.

labels = pod.metadata.labels
dag_id = labels['dag_id']}
task_id = labels['task_id']

etc