apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.05k stars 14.29k forks source link

Dynamically Change Tasks Queues Using DAG Parameters #35542

Open ginwakeup opened 1 year ago

ginwakeup commented 1 year ago

Description

It is not possible at the moment to dynamically re-route tasks depending on the DAG Parameters.

An example:

@task
def helloWorld():
    print("Hello World!")

@dag(
    default_args=DEFAULT_ARGS,
    dag_id='python_task',
    description='Test Python Task',
    start_date=datetime.now(),
    catchup=False,
    params={
        "helloWorldQueue": "my-queue"
    }
)
def rsgPythonTask():
    helloWorld.override(queue="{{ params.helloWorldQueue }}")()

This is quite an important feature in a Task Scheduler and I currently cannot find another way of doing it.

I've also tried to use cluster_policies, but the task_policy does not allow to change the queue, and the task_mutation_policy somehow hangs my entire Airflow setup and does not work correctly.(2.5.3).

Use case/motivation

We do have a Core Airflow instance with Localized workers (India, UK, etc...) We run Airflow on top of a Job Automation system that creates jobs based on custom configurations and finally triggers a DAG linked to this Configuration.

Some Tasks must run in a localized way, and we want to be able to re-route tasks to the localized worker automatically using the Configuration. The Configuration can feed the location of the Worker by using the "queue" parameter, so each task can be re-reouted to a specific queue in a specific location.

I am not entirely sure why it's not possible to do this now. It's not sustainable and not scalable at all to have to hardcode queue names for workers in the DAG code. Unless I am missing something, and this is currently doable somehow.

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

jscheffl commented 1 year ago

I had the same observation some time ago which went into discussion https://github.com/apache/airflow/discussions/32471

I proposed to implement an improvement because we have the same demand.

Nevertheless my attempt in main codebase (must have been 2.6 or early 2.7) was successful. Interestinly I found an inconsistency that the initial routing to the queue is possible by cluster policies but the UI shows wrong.

Can you check the discussion? If yes, we might close the report as duplicate and I try to raise the task in my stack to a higher level, knowing another person having the same demand.

ginwakeup commented 1 year ago

I had the same observation some time ago which went into discussion #32471

I proposed to implement an improvement because we have the same demand.

Nevertheless my attempt in main codebase (must have been 2.6 or early 2.7) was successful. Interestinly I found an inconsistency that the initial routing to the queue is possible by cluster policies but the UI shows wrong.

Can you check the discussion? If yes, we might close the report as duplicate and I try to raise the task in my stack to a higher level, knowing another person having the same demand.

Yes I will, thank you. I am currently trying with the Cluster Policies as well but they are really not reliable

potiuk commented 1 year ago

Just to repeat what was there - this is not possible by definition. JINJA templated fields are resolved after task started (which is after queue was used to start the task in the first place. so new feature for that would have to be implemented - for example specify which parameter of DAG RUN should be used as queue. But it should not be done with JINJA - also because scheduler should not execute code. It would have to be a declarative way of mapping param to queue (without any pre-processing and imperative code)

Maybe a good idea for PR.

ginwakeup commented 1 year ago

@potiuk is there any other way this can be done at the moment?

It seems from documentation that I should be able to use task_instance_mutation_hook in airflow_local_settings.py, but as I mentioned in this issue: https://github.com/apache/airflow/issues/35575 I can't make it work, since the dag_run property of the task_instance in the policy is None and therefore I cannot access the DAGRun parameters. Is that expected behavior? Is there any way I could get to the DAGRun from the task instance in that mutation hook policy?

potiuk commented 1 year ago

Not that I am aware of. But you are welcome to make PR.

Also I would appreciate you do not try to hijack one thread for other things. Hopefully someone will answer on the other thread.