apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.26k stars 14.08k forks source link

GlueJobOperator has no effect on the number of workers executing the Glue job #41121

Open noamha opened 1 month ago

noamha commented 1 month ago

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.24.0

Apache Airflow version

2.8.1

Operating System

Ubuntu 22

Deployment

Amazon (AWS) MWAA

Deployment details

Using a local Docker to run the MWAA local runner: https://github.com/aws/aws-mwaa-local-runner

What happened

Running a AWS GlueJobOperator to execute a job has no effect on the actual number of workers allocated to the job execution. It uses the default number of workers defined in the job setup.

Here is a an example of the task code:

product = GlueJobOperator(
    task_id='product',
    job_name="product",
    job_desc='product',
    create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 2, "WorkerType": 'G.1X'},
    region_name=client.region,
    iam_role_name='my_iam',
    dag=dag,
)

Using the same exact region, AWS credentials, job and configuration with boto3 is executing the AWS Glue with the requested number of workers:

    import boto3

    # Initialize a session using Amazon Glue
    glue = boto3.client('glue', 'eu-central-1')

    # Define the job name
    job_name = "product"

    # Start the Glue job
    response = glue.start_job_run(
        JobName=job_name,
        WorkerType='G.1X', 
        NumberOfWorkers=2
    )

    # Print the job run ID
    print("Started Glue job with run ID:", response['JobRunId'])

I tried configuring the num_of_dpus param as well (without the create_job_kwargs) and it didn't work as well.

Thanks.

What you think should happen instead

Since the boto3 is working I assume the API usage of the operator is not passing the configuration correclty

How to reproduce

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

noamha commented 1 month ago

Debugging the code, I found out that when a job name is supplied, the create_job_kwargs param is skipped and not used:

    def get_or_create_glue_job(self) -> str | None:
        """
        Get (or creates) and returns the Job name.

        .. seealso::
            - :external+boto3:py:meth:`Glue.Client.create_job`

        :return:Name of the Job
        """
        if self.has_job(self.job_name):
            return self.job_name

        config = self.create_glue_job_config()
        self.log.info("Creating job: %s", self.job_name)
        self.conn.create_job(**config)

        return self.job_name
o-nikolas commented 1 month ago

@noamha Does the glue job "product" already exist? I assume so since you're not using a unique name for it (e.g. something with a date or hash). If you are running an already created job it is just fetched and used. If you'd like to update such an existing job with new configuration you must specify update_config=True.

noamha commented 1 month ago

@o-nikolas Indeed this is an already existing job. Using the update_config=True as you suggested triggers an error that requires supplying the script_location param. In our case we create the jobs manually using the AWS Glue interface and they do not exist as a separate file.

This limits the behaviour of the operator forcing you to work in a specific way, unlike the example from boto3 which allows you to trigger an existing AWS Glue job without the need maintain the script as a separate file.

I would expect the operator to allow you the agility of choosing to manage the job in AWS Glue interface while allowing you to also change the instance configuration.

o-nikolas commented 1 month ago

Hey @noamha I'll look further into it. Can you please include the parameters you're providing and the output (the error about the script location) when using update_config=True? The traceback/error message will help with debugging the issue.

github-actions[bot] commented 2 days ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.