apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.85k stars 14.24k forks source link

EMRServerlessStartJobOperator Expand/Expand Kwargs not Serializing properly #38005

Closed jliu0812 closed 7 months ago

jliu0812 commented 7 months ago

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.19.0

Apache Airflow version

2.8.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Docker-Compose

Deployment details

Used breeze tool to deploy.

What happened

When using the EmrServerlessStartJobOperator, using the airflow expand functionality is not possible. The DAG will fail to serialize and it shows a DAG import error in the webserver. This is because EmrServerlessStartJobOperator.operator_extra_links is called and EmrServerlessStartJobOperator is of type MappedOperator, but MappedOperator does not have the EmrServerlessStartJobOperator.is_monitoring_in_job_override attribute.

What you think should happen instead

DAG should import successfully without any errors.

How to reproduce

The following single usage of EmrServerlessStartJobOperator works:

from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessStartJobOperator,
)

DAG_ID = "example_emr_serverless"
emr_serverless_app_id = "01234abcd"
role_arn = "arn:test"

with DAG(
    dag_id=DAG_ID,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
):
    start_job = EmrServerlessStartJobOperator(
        task_id="start_emr_serverless_job",
        application_id=emr_serverless_app_id,
        execution_role_arn=role_arn,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "test.jar",
                "entryPointArguments": ["--arg", "1"],
                "sparkSubmitParameters": "--conf sample",
            }
        },
        configuration_overrides={
            "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://test/logs"}}
        },
    )

Whereas the following usage of expanded EmrServerlessStartJobOperator will fail to serialize:

from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessStartJobOperator,
)

DAG_ID = "example_emr_serverless"
emr_serverless_app_id = "01234abcd"
role_arn = "arn:test"

with DAG(
    dag_id=DAG_ID,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
):
    start_job = EmrServerlessStartJobOperator.partial(
        task_id="start_emr_serverless_job",
        application_id=emr_serverless_app_id,
        execution_role_arn=role_arn,
        configuration_overrides={
            "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://test/logs"}}
        },
    ).expand(
        job_driver=[{
            "sparkSubmit": {
                "entryPoint": "test.jar",
                "entryPointArguments": ["--arg", "1"],
                "sparkSubmitParameters": "--conf sample",
            }
        },{
            "sparkSubmit": {
                "entryPoint": "test.jar",
                "entryPointArguments": ["--arg", "2"],
                "sparkSubmitParameters": "--conf sample",
            }
        }]
    )

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 7 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

jliu0812 commented 7 months ago

I will be working on this.

Full stack trace for reference:

[2024-03-09T00:45:58.280+0000] {logging_mixin.py:188} INFO - [2024-03-09T00:45:58.280+0000] {dagbag.py:540} INFO - Filling up the DagBag from /files/dags/example_mapped_emr_serverless.py [2024-03-09T00:45:58.298+0000] {processor.py:840} INFO - DAG(s) 'example_emr_serverless' retrieved from /files/dags/example_mapped_emr_serverless.py [2024-03-09T00:45:58.315+0000] {logging_mixin.py:188} INFO - [2024-03-09T00:45:58.310+0000] {dagbag.py:649} ERROR - Failed to write serialized DAG: /files/dags/example_mapped_emr_serverless.py Traceback (most recent call last): File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1354, in serialize_dag serializeddag["tasks"] = [cls.serialize(task) for , task in dag.task_dict.items()] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1354, in serializeddag["tasks"] = [cls.serialize(task) for , task in dag.task_dict.items()] ^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 462, in serialize return SerializedBaseOperator.serialize_mapped_operator(var) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 857, in serialize_mapped_operator serialized_op = cls._serialize_node(op, include_deps=op.deps != MappedOperator.deps_for(BaseOperator)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 899, in _serialize_node op.operator_extra_links.get(op) File "/opt/airflow/airflow/providers/amazon/aws/operators/emr.py", line 1273, in operator_extra_links if self.is_monitoring_in_job_override("s3MonitoringConfiguration", configuration_overrides): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: 'MappedOperator' object has no attribute 'is_monitoring_in_job_override'

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/opt/airflow/airflow/models/dagbag.py", line 637, in _serialize_dag_capturing_errors dag_was_updated = SerializedDagModel.write_dag( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/session.py", line 76, in wrapper return func(*args, *kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/serialized_dag.py", line 166, in write_dag new_serialized_dag = cls(dag, processor_subdir) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "", line 4, in init File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/state.py", line 481, in _initialize_instance with util.safereraise(): File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in exit compat.raise( File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/state.py", line 479, in _initialize_instance return manager.original_init(mixed[1:], **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/serialized_dag.py", line 113, in init dag_data = SerializedDAG.to_dict(dag) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1463, in to_dict json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)} ^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1378, in serialize_dag raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}") airflow.exceptions.SerializationError: Failed to serialize DAG 'example_emr_serverless': 'MappedOperator' object has no attribute 'is_monitoring_in_job_override' [2024-03-09T00:45:58.316+0000] {logging_mixin.py:188} INFO - [2024-03-09T00:45:58.316+0000] {dag.py:3068} INFO - Sync 1 DAGs [2024-03-09T00:45:58.326+0000] {logging_mixin.py:188} INFO - [2024-03-09T00:45:58.326+0000] {dag.py:3912} INFO - Setting next_dagrun for example_emr_serverless to 2021-01-01 00:00:00+00:00, run_after=2021-01-01 00:00:00+00:00 [2024-03-09T00:45:58.345+0000] {processor.py:183} INFO - Processing /files/dags/example_mapped_emr_serverless.py took 0.073 seconds