apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.17k stars 13.75k forks source link

Spark distributed computing using SagemakerProcessingOperator #38822

Open dhana-sekhar opened 2 months ago

dhana-sekhar commented 2 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.4.3

What happened?

we are trying to create a cluster with SageMakerProcessingOperator with "InstanceCount":2 and passing a custom docker image where it have my spark code. Now when I try to run the spark code in the container on this cluster, my spark code is not running on distributed way but I can see from cloudwatch that sagemaker is able to spin-up 2 instances.

[!IMPORTANT] here is the code i am using to run the SageMakerProcessingOperator

def create_sagemaker_processingjob_config(
    name_prefix: str,
    image_uri: str,
    iam_role: str,
    entrypoint_cmd: List[str],
    instance_type: C.InstanceType,
) -> Dict:

    now = pendulum.now(C.timezone)
    id_from_time = now.strftime("%Y-%m-%d-%H%M%S-") + str(uuid4())[:8]
    conf = {
        "ProcessingJobName": f"{name_prefix}-{id_from_time}",
        "ProcessingResources": {
            "ClusterConfig": {
                "InstanceCount": 2,
                "InstanceType": instance_type.value,
                "VolumeSizeInGB": 60,
            }
        },
        "StoppingCondition": {"MaxRuntimeInSeconds": "432000"},
        "AppSpecification": {
            "ImageUri": image_uri,
            "ContainerEntrypoint": entrypoint_cmd,
        },
        "Environment": {
            "AWS_ACCOUNT_ID": get_aws_account_id(),
            "startdate": get_start_date(),
            "enddate": get_end_date(),
        },
        "RoleArn": iam_role,
        "NetworkConfig": {
            "VpcConfig": {
                "SecurityGroupIds": get_security_group_ids(),
                "Subnets": get_subnets(),
            },
        },
    }
    return conf

def create_sagemaker_processing_operator(
    account_id: C.AccountID,
    job_name: C.JobName,
    instance_type: C.InstanceType,
) -> SageMakerProcessingOperator:
    env_name = C.AccountID(account_id).name
    iam_role = C.IAMRole[env_name].value
    ecr_path = "dkr.ecr.ap-southeast-1.amazonaws.com"
    image_uri = f'{account_id}.{ecr_path}/{C.ImageName.image_name}'
    cmd = C.ENTRYPOINT_COMMAND[job_name]
    out = SageMakerProcessingOperator(
        config=create_sagemaker_processingjob_config(
            name_prefix=job_name,
            image_uri=image_uri,
            iam_role=iam_role,
            entrypoint_cmd=cmd,
            instance_type=instance_type,
        ),
        task_id=job_name,
    )

    return out

What you think should happen instead?

when I run my code with the SageMakerProcessingOperator with "InstanceCount":2 it should run in distributed mode

[!NOTE] when I run the job with one instance it is taking 1hr to finish the job, my expectation is if I use 2 instances with same instance type it should finish less than 1hr.

How to reproduce

you can use this code and take any docker image with spark installed. run some pyspark code witin the container with 2 or more InstanceCount.

def create_sagemaker_processingjob_config(
    name_prefix: str,
    image_uri: str,
    iam_role: str,
    entrypoint_cmd: List[str],
    instance_type: C.InstanceType,
) -> Dict:

    now = pendulum.now(C.timezone)
    id_from_time = now.strftime("%Y-%m-%d-%H%M%S-") + str(uuid4())[:8]
    conf = {
        "ProcessingJobName": f"{name_prefix}-{id_from_time}",
        "ProcessingResources": {
            "ClusterConfig": {
                "InstanceCount": 2,
                "InstanceType": instance_type.value,
                "VolumeSizeInGB": 60,
            }
        },
        "StoppingCondition": {"MaxRuntimeInSeconds": "432000"},
        "AppSpecification": {
            "ImageUri": image_uri,
            "ContainerEntrypoint": entrypoint_cmd,
        },
        "Environment": {
            "AWS_ACCOUNT_ID": get_aws_account_id(),
            "startdate": get_start_date(),
            "enddate": get_end_date(),
        },
        "RoleArn": iam_role,
        "NetworkConfig": {
            "VpcConfig": {
                "SecurityGroupIds": get_security_group_ids(),
                "Subnets": get_subnets(),
            },
        },
    }
    return conf

def create_sagemaker_processing_operator(
    account_id: C.AccountID,
    job_name: C.JobName,
    instance_type: C.InstanceType,
) -> SageMakerProcessingOperator:
    env_name = C.AccountID(account_id).name
    iam_role = C.IAMRole[env_name].value
    ecr_path = "dkr.ecr.ap-southeast-1.amazonaws.com"
    image_uri = f'{account_id}.{ecr_path}/{C.ImageName.image_name}'
    cmd = C.ENTRYPOINT_COMMAND[job_name]
    out = SageMakerProcessingOperator(
        config=create_sagemaker_processingjob_config(
            name_prefix=job_name,
            image_uri=image_uri,
            iam_role=iam_role,
            entrypoint_cmd=cmd,
            instance_type=instance_type,
        ),
        task_id=job_name,
    )

    return out

Operating System

Amazon Linux

Versions of Apache Airflow Providers

MWAA version 2.4.3

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

ferruzzi commented 4 weeks ago

Looking at the API docs [here], there isn't any way to set "distributed mode".

But it does have this:

S3DataDistributionType (string) –

Whether to distribute the data from Amazon S3 to all processing instances with FullyReplicated, or whether the data from Amazon S3 is shared by Amazon S3 key, downloading one shard of data to each processing instance.

so setting either ProcessingInputs["S3Input"]["S3DataDistributionType"] or ProcessingInputs["DatasetDefinition"]["DataDistributionType"] to "ShardedByS3Key" in the config may get the result you are looking for? But you aren't using any ProcessingInputs in the config at all, so I'm not sure how this works.

github-actions[bot] commented 1 week ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.