aws / amazon-sagemaker-examples

Example 📓 Jupyter notebooks that demonstrate how to build, train, and deploy machine learning models using 🧠 Amazon SageMaker.
https://sagemaker-examples.readthedocs.io
Apache License 2.0
10.14k stars 6.78k forks source link

[Example Request] - Distributed Processing SKLEARN : AWS Sagemaker #2852

Open anansrivastava opened 3 years ago

anansrivastava commented 3 years ago

I have a few raw .csv files in my S3 bucket. How can I process them in parallel to reduce run time? See comments on where I require a little help. I am using SKLearnProcessor and s3_data_distribution_type='ShardedByS3Key'

Screenshot 2021-08-02 at 16 37 30
%%writefile preprocessing/preprocessing_sklearn.py

import pandas as pd
import argparse
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import os

def process(input_data_path):
    df = pd.read_csv(input_data_path)
#     drop first col (unamed: 0)
    df = df.iloc[: , 1:]

    features = df.iloc[:,1:]
    headers = features.columns
    labels = df.iloc[:,0]

    scaler = StandardScaler()

    normalized_x_train = scaler.fit_transform(features)

    # write
    pd.DataFrame(normalized_x_train).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_features.csv')), header=False, index=False)
    pd.DataFrame(labels).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_labels.csv')), header=False, index=False)

if __name__ == '__main__':
    # HOW DO I MAKE THIS DYNAMIC? CHUNK_1.CSV, CHUNK_2.CSV ETC
    input_data_path = os.path.join("/opt/ml/processing/input", "train-data-with-header.csv")  
    process(input_data_path)

My calling fn -

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
import timeit

start = timeit.default_timer()
# WHAT SHOULD BE MY SOURCE?
source = "s3://sagemaker-end-to-end/data_tuning/train/chunk_0.csv" 
source2 = "s3://sagemaker-end-to-end/data_tuning/train/"

sklearn_processor = SKLearnProcessor(framework_version='0.23-1',
                                     role=role,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2,
                                     base_job_name = 'preprocess-sklearn'
                                    )

sklearn_processor.run(
    code='preprocessing/preprocessing_sklearn.py',
    inputs=[
        ProcessingInput(
            source=source2,
            s3_data_distribution_type='ShardedByS3Key',
            destination='/opt/ml/processing/input')
    ],

    outputs=[
        ProcessingOutput(
          source='/opt/ml/processing/output/train', 
          destination= make_url(store_bucket, "preprocess_sklearn", "train")
        ),
#                                
        ProcessingOutput(
            source='/opt/ml/processing/output/test',
            destination= make_url(store_bucket, "preprocess_sklearn", "test")
        )
    ]

)

stop = timeit.default_timer()

print('Time: ', stop - start)
rthamman commented 3 years ago

Hi, You don't have to specify the file names in the script. Depending on the instance count, files will be distributed behind the scenes. In your example, you have 8 files and have 2 for instance count, each instance will get 4 files for processing.

anansrivastava commented 3 years ago

Hi, I'll request you to provide a sample code as well because I did that and it throws me an error, saying source invalid.

AvivTahar commented 2 years ago

Hi, Did you get a chance to find an example using SKLearn aws sagemaker processor in parallel calculation? Would appreciate it if you can point to example code resources. Thank you

marckarp commented 2 years ago

In your ProcessingInput source parameter you just need to pass the S3 location which contains all the files. In your case, I assume you uploaded all the chunk_*.csv files to s3://sagemaker-end-to-end/data_tuning/train/

Then in your preprocessing_sklearn.py, You can list the files that are available to the instance and use that list as input for your processing logic. For example,

import os
files = list(filter(None,os.popen('ls /opt/ml/processing/input | grep chunk').read().split("\n")))

This will return a list of chunk_*.csv files to process that are on the instance.

You can then iterate over them with:

for file in files:
    print(file)
    #Add your processing logic here
joe-webb commented 2 years ago

In hopes this will help someone who stumbles across this post in the future:

%%writefile preprocess.py
import pandas as pd # used for handling the dataset
import boto3
import glob

#s3 settings
bucket = 'bucket
output_prefix = 'users/name/output'

def main(input_parquet):
    """parse the parquet, remoe white space from the text, upload it to s3 again"""
    try:
        #load in the parquet
        new_df = pd.read_parquet(input_parquet, engine='auto', columns = ["index", "fake text"])

        new_df['cleaned_text'] = new_df['fake text'].apply(lambda x: "".join(x.split()))

        #name the processed file
        processed_file_name = '{}_processed.parquet'.format(input_parquet)
        #write it to parquet
        new_df.to_parquet(processed_file_name)

        #let the user know it was successfully processed
        print("The final output can be downloaded from: s3://{}/{}{}".format(bucket, output_prefix, processed_file_name))
        print("finished processing{}".format(input_parquet))

        return True, None

    except Exception as e:
        print(e)
        return False, e

if __name__ == "__main__":
    import os
    import boto3
#     os.system("pip install package_here") # add any system installs for packages you need

    #get a list of the input files that are copied onto the instance in the input folder
    print("The files we found were:")
    print("\n")
    files = glob.glob("/opt/ml/processing/input_data/*.parquet")
    print(files)
    # your total number of files will be input_files_number / instance_count
    for index, file in enumerate(files):

        #run our main function
        main(file)
        print("successfully procesed", file)

code to actually launch the processing containers


from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

#name your job name
base_job_name='dev-name-testing'

script_processor = ScriptProcessor(command=['python3'],
                image_uri="763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-pytorch-trcomp-training:1.9.0-transformers4.11.0-gpu-py38-cu111-ubuntu20.04",
                role=get_execution_role(),
                base_job_name=base_job_name,
#                 instance_type='ml.g4dn.xlarge',
                instance_type="ml.m5.large",
                instance_count=20)

script_processor.run(code='preprocess.py',
                    inputs=[ProcessingInput(
                        source='s3://bucket/users/name/input/',
                        destination='/opt/ml/processing/input_data',
                        s3_data_distribution_type='ShardedByS3Key',
                        s3_data_type='S3Prefix'),
                        ],
                    outputs=[ProcessingOutput(
                        source='/opt/ml/processing/processed_data',
                        destination='s3://bucket/users/name/output',
                        s3_upload_mode="EndOfJob")],
                    )