apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.09k stars 14.29k forks source link

Task Instance DAGRun is null inside Mutation Hook Task Policy #35575

Open ginwakeup opened 1 year ago

ginwakeup commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

I am using task_instance_mutation_hook to change a task instance at run-time but the task_instance.dag_run property is None.

I can see this is executed on the WebServer, and the dag_run property is None in all these cases:

What you think should happen instead

The dag_run property should not be None, so I can fetch properties such as params from it in the task_mutation_hook.

How to reproduce

Install Airflow 2.5.3 using Helm Chart on Kubernetes

Edit the airflowLocalSettings section to use the following policy:

  from airflow.models import TaskInstance  

  def task_instance_mutation_hook(task_instance: TaskInstance):
      print("Entering mutation hook")
      try:
          print(task_instance)
          print(dir(task_instance))

          dagRun = task_instance.dag_run
          conf = dagRun.conf
          if conf.get("queue"):
              task_instance.queue = conf.get("queue")
      except Exception as error:
          print(error)

Run Airflow

Create the following DAG:

from datetime import datetime
from airflow.models import Param
from airflow.decorators import task, dag

@task.python(task_id="hello_world")
def helloWorld(**kwargs):
    print("Hello World!")

@dag(
    dag_id='reroute_queue',
    description='Test Reroute Queue Task',
    start_date=datetime.now(),
    catchup=False,
    tags=["example"],
    params={
        "queue": Param("")
    }
)
def rereouteQueue():
    helloWorld(
        queue="{{ params.queue }}"
    )

dag = rereouteQueue()

Trigger the DAG from WebServer UI or API call.

Observe the logs in the webserver, the task_instance dag_run property is None:

Entering mutation hook
<TaskInstance: reroute_queue.hello_world manual__2023-11-10T15:40:39.507555+00:00 [None]>
['_AssociationProxy_dag_run_140498322044432_inst', '_AssociationProxy_task_instance_note_140498322044688_inst', '_AssociationProxy_trigger_140498322043984_inst', '__abstract__', '__annotations__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__mapper__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__table__', '__table_args__', '__tablename__', '__weakref__', '_date_or_empty', '_defer_task', '_execute_task', '_execute_task_with_callbacks', '_handle_reschedule', '_log', '_log_state', '_record_task_map_for_downstreams', '_register_dataset_changes', '_run_execute_callback', '_run_finished_callback', '_run_raw_task', '_sa_class_manager', '_sa_instance_state', '_sa_registry', '_set_context', '_try_number', 'are_dependencies_met', 'are_dependents_done', 'check_and_change_state_before_execution', 'clear_db_references', 'clear_next_method_args', 'clear_xcom_data', 'command_as_list', 'current_state', 'dag_id', 'dag_model', 'dag_run', 'dry_run', 'duration', 'email_alert', 'end_date', 'error', 'execution_date', 'executor_config', 'external_executor_id', 'filter_for_tis', 'generate_command', 'get_dagrun', 'get_email_subject_content', 'get_failed_dep_statuses', 'get_num_running_task_instances', 'get_previous_dagrun', 'get_previous_execution_date', 'get_previous_start_date', 'get_previous_ti', 'get_relevant_upstream_map_indexes', 'get_rendered_k8s_spec', 'get_rendered_template_fields', 'get_template_context', 'get_truncated_error_traceback', 'handle_failure', 'hostname', 'init_on_load', 'init_run_context', 'insert_mapping', 'is_eligible_to_retry', 'is_premature', 'job_id', 'key', 'log', 'log_url', 'map_index', 'mark_success_url', 'max_tries', 'metadata', 'next_kwargs', 'next_method', 'next_retry_datetime', 'next_try_number', 'note', 'operator', 'overwrite_params_with_dag_run_conf', 'pid', 'pool', 'pool_slots', 'prev_attempted_tries', 'previous_start_date_success', 'previous_ti', 'previous_ti_success', 'priority_weight', 'queue', 'queued_by_job', 'queued_by_job_id', 'queued_dttm', 'raw', 'ready_for_retry', 'refresh_from_db', 'refresh_from_task', 'registry', 'render_k8s_pod_yaml', 'render_templates', 'rendered_task_instance_fields', 'run', 'run_as_user', 'run_id', 'schedule_downstream_tasks', 'set_duration', 'set_state', 'start_date', 'state', 'task', 'task_id', 'task_instance_note', 'test_mode', 'ti_selector_condition', 'trigger', 'trigger_id', 'trigger_timeout', 'triggerer_job', 'try_number', 'unixname', 'updated_at', 'xcom_pull', 'xcom_push']
'NoneType' object has no attribute 'conf'

Operating System

Ubuntu 20.04

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

Helm chart in Kubernetes

Anything else

Happens every time

Are you willing to submit PR?

Code of Conduct

prernadubey commented 10 months ago

Hi,

I am also looking to change task queues dynamically. Did you find any solution?

ginwakeup commented 10 months ago

Hi,

I am also looking to change task queues dynamically. Did you find any solution?

@prernadubey I started implementing a solution for this but it's quite hard. I got something to work by setting a __queue attribute inside dag_run.conf[airflow] that is evaluated to define what queue to use, but I am struggling a bit to make it work with the UI, so the queue is correctly visualized on the WebServer. The solution still has some issues, it's more a hack than anything else for now.