apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

dag_hash changed when using the XComArgs feature #35998

Closed zhangw closed 11 months ago

zhangw commented 11 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Airflow version 2.1.2

Dag file is simple but using the XComArgs feature, and I notice the dag_hash changed when parsing and serializing every time. I thought the hashing should be stable in this case.

The Dag file for testing

import logging

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, get_current_context
from airflow.utils.dates import days_ago

log = logging.getLogger(__name__)

def generate_value():
    """Dummy function"""
    return "Bring me a shrubbery!"

@task()
def print_value(value):
    """Dummy function"""
    ctx = get_current_context()
    log.info("The knights of Ni say: %s (at %s)", value, ctx['ts'])

with DAG(
    dag_id='example_xcom_args',
    default_args={'owner': 'airflow'},
    start_date=days_ago(2),
    schedule_interval=None,
    tags=['example'],
) as dag:
    task1 = PythonOperator(
        task_id='generate_value',
        python_callable=generate_value,
    )

    print_value(task1.output)

with DAG(
    "example_xcom_args_with_operators",
    default_args={'owner': 'airflow'},
    start_date=days_ago(2),
    schedule_interval=None,
    tags=['example'],
) as dag2:
    bash_op1 = BashOperator(task_id="c", bash_command="echo c")
    bash_op2 = BashOperator(task_id="d", bash_command="echo c")
    xcom_args_a = print_value("first!")
    xcom_args_b = print_value("second!")

    bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2

one of the serialized data

{"dag": {"tags": ["example"], "tasks": [{"pool": "default_pool", "label": "generate_value", "owner": "airflow", "_inlets": [], "op_args": [], "task_id": "generate_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, "op_kwargs": {}, "_task_type": "PythonOperator", "ui_fgcolor": "#000", "_task_module": "airflow.operators.python", "template_fields": ["templates_dict", "op_args", "op_kwargs"], "_downstream_task_ids": ["print_value"], "template_fields_renderers": {"op_args": "py", "op_kwargs": "py", "templates_dict": "json"}}, {"pool": "default_pool", "label": "print_value", "owner": "airflow", "doc_md": "Dummy function", "_inlets": [], "op_args": "(<airflow.models.xcom_arg.XComArg object at 0x107415d30>,)", "task_id": "print_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, "op_kwargs": {}, "_task_type": "_PythonDecoratedOperator", "ui_fgcolor": "#000", "_task_module": "airflow.decorators.python", "template_fields": ["op_args", "op_kwargs"], "_downstream_task_ids": [], "template_fields_renderers": {"op_args": "py", "op_kwargs": "py"}}], "_dag_id": "example_xcom_args", "fileloc": "/Users/vincent/Codes/Workspace/webull/airflow/airflow/airflow/example_dags/example_xcomargs.py", "timezone": "UTC", "edge_info": {}, "start_date": 1701216000.0, "_task_group": {"tooltip": "", "children": {"print_value": ["operator", "print_value"], "generate_value": ["operator", "generate_value"]}, "ui_color": "CornflowerBlue", "_group_id": null, "ui_fgcolor": "#000", "prefix_group_id": true, "upstream_task_ids": [], "upstream_group_ids": [], "downstream_task_ids": [], "downstream_group_ids": []}, "default_args": {"__var": {"owner": "airflow"}, "__type": "dict"}, "dag_dependencies": [], "schedule_interval": null}, "__version": 1}

another serialized data

{"dag": {"tags": ["example"], "tasks": [{"pool": "default_pool", "label": "generate_value", "owner": "airflow", "_inlets": [], "op_args": [], "task_id": "generate_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, "op_kwargs": {}, "_task_type": "PythonOperator", "ui_fgcolor": "#000", "_task_module": "airflow.operators.python", "template_fields": ["templates_dict", "op_args", "op_kwargs"], "_downstream_task_ids": ["print_value"], "template_fields_renderers": {"op_args": "py", "op_kwargs": "py", "templates_dict": "json"}}, {"pool": "default_pool", "label": "print_value", "owner": "airflow", "doc_md": "Dummy function", "_inlets": [], "op_args": "(<airflow.models.xcom_arg.XComArg object at 0x112a51d60>,)", "task_id": "print_value", "_outlets": [], "ui_color": "#ffefeb", "_is_dummy": false, "op_kwargs": {}, "_task_type": "_PythonDecoratedOperator", "ui_fgcolor": "#000", "_task_module": "airflow.decorators.python", "template_fields": ["op_args", "op_kwargs"], "_downstream_task_ids": [], "template_fields_renderers": {"op_args": "py", "op_kwargs": "py"}}], "_dag_id": "example_xcom_args", "fileloc": "/Users/vincent/Codes/Workspace/webull/airflow/airflow/airflow/example_dags/example_xcomargs.py", "timezone": "UTC", "edge_info": {}, "start_date": 1701216000.0, "_task_group": {"tooltip": "", "children": {"print_value": ["operator", "print_value"], "generate_value": ["operator", "generate_value"]}, "ui_color": "CornflowerBlue", "_group_id": null, "ui_fgcolor": "#000", "prefix_group_id": true, "upstream_task_ids": [], "upstream_group_ids": [], "downstream_task_ids": [], "downstream_group_ids": []}, "default_args": {"__var": {"owner": "airflow"}, "__type": "dict"}, "dag_dependencies": [], "schedule_interval": null}, "__version": 1}

the only difference between them

the value of the op_args

image-20231201173700350

What you think should happen instead

No response

How to reproduce

  1. just copy the above dag file into airflow environment and make the scheduler running.
  2. run the sql several times with the interval 30s (it depends on your settings of the configuration min_serialized_dag_update_interval)
select `dag_id`, `dag_hash`, `last_updated`, `data` from `serialized_dag` where `dag_id` = 'example_xcom_args';

and compare the row results for these executions.

Operating System

My MacPro 14.1.1 (23B81) M1 chipset

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 11 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

potiuk commented 11 months ago

Don't use days_ago. It's a bad practice from Airlow 1.10 which we since fixed in all (I believe) our examples and documentations. days_ago effectively calculates new start_date for the DAG every time the dag is parsed, which mean that yes - the dag is different every time.

Generally when you create a DAG you should decide WHEN it's life should start (fixed date) rather than continue moving the start date over and over again - which is effectively what days_ago does.

zhangw commented 11 months ago

Don't use days_ago. It's a bad practice from Airlow 1.10 which we since fixed in all (I believe) our examples and documentations. days_ago effectively calculates new start_date for the DAG every time the dag is parsed, which mean that yes - the dag is different every time.

Generally when you create a DAG you should decide WHEN it's life should start (fixed date) rather than continue moving the start date over and over again - which is effectively what days_ago does.

For my case, the value of start_date calculated by days_ago not changed, the changed thing is the value of op_args. Please advise more.

image

potiuk commented 11 months ago

What you see is likely effect of serialization how it was implemented 2.5 years ago - with thousands of bug fixes released since - including tens of them released in the 2.1 line you are using (2.1.3. and 2.1.4).

hash calculation for Airflow only uses those fields:

    _comps = {
        "task_id",
        "dag_id",
        "owner",
        "email",
        "email_on_retry",
        "retry_delay",
        "retry_exponential_backoff",
        "max_retry_delay",
        "start_date",
        "end_date",
        "depends_on_past",
        "wait_for_downstream",
        "priority_weight",
        "sla",
        "execution_timeout",
        "on_execute_callback",
        "on_failure_callback",
        "on_success_callback",
        "on_retry_callback",
        "do_xcom_push",
    }

But since 2.1.2 there were probably 10s of changes in this area.

Please advise more.

Upgrade to latest version of Airflow. This is the fastest way. I you want to see if there are any bugfixes related to serialization, to be sure that it is worth it - you can go through release notes https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html and check the few thousands of fixes since, but I strongly advice you to just upgrade - even if there was a bug in the hash implementation back then, the only way to fix it is to upgrade anyway, so you can safe a lot of time on looking by just upgrading.

If you see similar problems after upgrading. please report it here.

zhangw commented 11 months ago

What you see is likely effect of serialization how it was implemented 2.5 years ago - with thousands of bug fixes released since - including tens of them released in the 2.1 line you are using (2.1.3. and 2.1.4).

hash calculation for Airflow only uses those fields:

    _comps = {
        "task_id",
        "dag_id",
        "owner",
        "email",
        "email_on_retry",
        "retry_delay",
        "retry_exponential_backoff",
        "max_retry_delay",
        "start_date",
        "end_date",
        "depends_on_past",
        "wait_for_downstream",
        "priority_weight",
        "sla",
        "execution_timeout",
        "on_execute_callback",
        "on_failure_callback",
        "on_success_callback",
        "on_retry_callback",
        "do_xcom_push",
    }

But since 2.1.2 there were probably 10s of changes in this area.

Please advise more.

Upgrade to latest version of Airflow. This is the fastest way. I you want to see if there are any bugfixes related to serialization, to be sure that it is worth it - you can go through release notes https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html and check the few thousands of fixes since, but I strongly advice you to just upgrade - even if there was a bug in the hash implementation back then, the only way to fix it is to upgrade anyway, so you can safe a lot of time on looking by just upgrading.

If you see similar problems after upgrading. please report it here.

Thanks for your suggestions!