apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.3k stars 13.79k forks source link

EmrServerlessStartJobOperator causes dag load failure when using XComArg #40103

Open fjmacagno opened 4 weeks ago

fjmacagno commented 4 weeks ago

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.23.0

Apache Airflow version

2.9.1

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Astronomer

Deployment details

No response

What happened

We get the dag load error

Broken DAG: [/usr/local/airflow/dags/governance/scrub/parquet/parquet_scrub.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/providers/amazon/aws/operators/emr.py", line 1321, in operator_extra_links
    if operator_class.is_monitoring_in_job_override(
  File "/usr/local/lib/python3.10/site-packages/airflow/providers/amazon/aws/operators/emr.py", line 1535, in is_monitoring_in_job_override
    monitoring_config = (job_override or {}).get("monitoringConfiguration")
AttributeError: 'PlainXComArg' object has no attribute 'get'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1540, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1453, in serialize_dag
    raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}")
airflow.exceptions.SerializationError: Failed to serialize DAG 'parquet-scrub': 'PlainXComArg' object has no attribute 'get'

What you think should happen instead

It shouldn't error, and we should see the operator links in the UI.

How to reproduce

Pass an XCom result into the configuration_overrides parameter of the EmrServerlessStartJobOperator.

Anything else

Happens every time, issue just appears to be that the link function is called before templating is rendered.

Are you willing to submit PR?

Code of Conduct

vatsrahul1001 commented 4 weeks ago

@fjmacagno could you share an example DAG which you used here ?

fjmacagno commented 4 weeks ago
from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessCreateApplicationOperator,
    EmrServerlessDeleteApplicationOperator, EmrServerlessStartJobOperator,
)

from foursquare_plugin.policies import FsqDagTag
from fsq.airflow.helpers.emr.serverless.application_config_builder import ServerlessApplicationConfigBuilder
from fsq.airflow.helpers.emr.serverless.job_driver_builder import ServerlessJobDriverBuilder
from fsq.airflow.team.aws_account_configs import NotebookInternalProtocol
from fsq.airflow.team.team import DEVELOPER_EXPERIENCE_TEAM

with DAG(
    dag_id="spark-pipeline-example",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    default_args={
        "owner": DEVELOPER_EXPERIENCE_TEAM.name(),
    },
    tags=[FsqDagTag.TEST_AT_WILL.name],
) as dag:
    create_app = EmrServerlessCreateApplicationOperator(
        task_id="create-spark-app",
        job_type="SPARK",
        release_label="emr-6.10.0",
        config=ServerlessApplicationConfigBuilder(NotebookInternalProtocol())
        .with_name("spark-pipeline-example")
        .build(),
        aws_conn_id=NotebookInternalProtocol().aws_conn_id(),
    )

    application_id: str = str(create_app.output)

    jar_submit_job = EmrServerlessStartJobOperator(
        task_id="join-chains-categories",
        config={"name": "join-chains-categories-test"},
        application_id=application_id,
        execution_role_arn=NotebookInternalProtocol().emr_serverless_role_arn(),
        job_driver=(
            ServerlessJobDriverBuilder()
            .with_entry_point("s3://4sq-dev/artifacts/fsq-graph-spark-examples_deploy.jar")
            .with_class("com.foursquare.spark.examples.S3ReadWriteExample")
            .build()
        ),
        wait_for_completion=True,
        deferrable=True,
        aws_conn_id=NotebookInternalProtocol().aws_conn_id(),
        configuration_overrides=create_app.output,   <----- important line
    )

    delete_app = EmrServerlessDeleteApplicationOperator(
        task_id="delete_app",
        application_id=application_id,
        trigger_rule="all_done",
        aws_conn_id=NotebookInternalProtocol().aws_conn_id(),
    )

    (create_app >> jar_submit_job >> delete_app)

Works fine until i add the line labeled "important line".

vatsrahul1001 commented 3 weeks ago

@fjmacagno EmrServerlessCreateApplicationOperator returns application id which you are using in your DAG here application_id: str = str(create_app.output). configuration_overrides is of type dict why are we passing xcom result of EmrServerlessCreateApplicationOperator which would not be dict in this case.

vatsrahul1001 commented 3 weeks ago

Saying that I updated my DAG to use a xcom value which should return a dict, however, still I am getting same parsing error

  SPARK_CONFIGURATION_OVERRIDES = {
        "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://{bucket_name}/logs"}}
    }

    def test_xcom():
        return SPARK_CONFIGURATION_OVERRIDES

    @task()
    def test_log():
        print(f"data is {type(emr_serverless_app_id)}")
        print(f"data is {str(emr_serverless_app_id)}")
        return SPARK_CONFIGURATION_OVERRIDES

    t1 = PythonOperator(
        task_id="t1",
        python_callable=test_xcom,
    )

    # [START howto_sensor_emr_serverless_application]
    wait_for_app_creation = EmrServerlessApplicationSensor(
        task_id="wait_for_app_creation",
        application_id=emr_serverless_app_id,
    )
    # [END howto_sensor_emr_serverless_application]
    wait_for_app_creation.poke_interval = 1

    # [START howto_operator_emr_serverless_start_job]
    start_job = EmrServerlessStartJobOperator(
        task_id="start_emr_serverless_job",
        application_id=emr_serverless_app_id,
        execution_role_arn=role_arn,
        job_driver=SPARK_JOB_DRIVER,
        configuration_overrides=t1.output
    )

Verified configuration_overrides is a templated_field

o-nikolas commented 3 hours ago

@fjmacagno @vatsrahul1001 Can you have a look at #40627