aws / sagemaker-python-sdk

A library for training and deploying machine learning models on Amazon SageMaker
https://sagemaker.readthedocs.io/
Apache License 2.0
2.09k stars 1.14k forks source link

ScriptProcessor unable to handle millions of output files to be saved to S3 #1619

Open maslick opened 4 years ago

maslick commented 4 years ago

Describe the bug I am having trouble with data post-processing in AWS Sagemaker, where I need to split one large text file with predictions (~2-10 GB) into millions of small files (one file per user ~3-10KB).

I've been able to process a small dataset (32MB, 13540 records). When I try 1.2 million records (2.2 GB), ScriptProcessor successfully processes the input file and saves the output files to /opt/ml/processing/output, however it fails to put them in S3 with an error.

To reproduce Jupyter notebook:

import boto3
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput, NetworkConfig

role = get_execution_role()
instance_type = 'ml.m4.4xlarge'
ecr_image_full_name = '0123456789.dkr.ecr.eu-central.amazonaws.com/maslick-sagemaker-processing-image:latest'

input_file = 'input.csv'
input_object = 's3://my-awesome-dataset/input.csv'
output_object = 's3://my-awesome-results'

network_config = NetworkConfig(enable_network_isolation=False,
                               subnets=["subnet-12345", "subnet-67890"],
                               security_group_ids=["sg-0123456789"])

script_processor = ScriptProcessor(role=role,
                                   image_uri=ecr_image_full_name,
                                   command=['python3'],
                                   instance_count=1,
                                   instance_type=instance_type)

input = ProcessingInput(source=input_object, destination='/opt/ml/processing/input')
output = ProcessingOutput(source='/opt/ml/processing/output', destination=output_object)

script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])

callable.py:

import hashlib
import json
import sys
from collections import defaultdict
from concurrent.futures.process import ProcessPoolExecutor
from pathlib import Path
import pandas as pd

def saveFilesMultiProcesses(items):
    with ProcessPoolExecutor() as executor:
        for item in items:
            executor.submit(saveFile, item)

def readCsv(input_file):
    colnames = ['id', 'article', 'type', 'rank']
    df = pd.read_csv('/opt/ml/processing/input/{}'.format(input_file), sep='|', names=colnames)
    return df

def processCsv(df):
    dicts = []
    for row in df.itertuples():
        dict = defaultdict(lambda: defaultdict(list))
        dict["id"] = row.id
        dict["article"] = row.article
        dict["type"] = row.type
        dict["rank"] = row.rank
        dicts.append(dict)

    return dicts

def saveFile(item):
    hashed_prefix = hashlib.md5(str(item['id']).encode('utf-8')).hexdigest()
    short = hashed_prefix[:5]

    file_name = short + "_" + str(item['id']) + "_latest.json"
    outfile = Path('/opt/ml/processing/output', file_name)
    with open(outfile, 'w') as json_file:
        json.dump(item, json_file)

if __name__ == '__main__':
    input_file = sys.argv[1]
    df = readCsv(input_file)
    list_of_dicts = processCsv(df)
    saveFilesMultiProcesses(list_of_dicts)
    print("Done. Wait until all files are saved to S3")

Dockerfile:

FROM python:3.7-slim-buster
RUN pip3 install pandas==0.25.3
ENV PYTHONUNBUFFERED=TRUE

Expected behavior All files that I save to /opt/ml/processing/output should be saved to S3.

Screenshots or logs

---------------------------------------------------------------------------
UnexpectedStatusException                 Traceback (most recent call last)
<ipython-input-66-48dccaef0bee> in <module>()
----> 1 script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in run(self, code, inputs, outputs, arguments, wait, logs, job_name, experiment_config)
    402         self.jobs.append(self.latest_job)
    403         if wait:
--> 404             self.latest_job.wait(logs=logs)
    405 
    406     def _get_user_code_name(self, code):

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in wait(self, logs)
    726         """
    727         if logs:
--> 728             self.sagemaker_session.logs_for_processing_job(self.job_name, wait=True)
    729         else:
    730             self.sagemaker_session.wait_for_processing_job(self.job_name)

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in logs_for_processing_job(self, job_name, wait, poll)
   3132 
   3133         if wait:
-> 3134             self._check_job_status(job_name, description, "ProcessingJobStatus")
   3135             if dot:
   3136                 print()

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in _check_job_status(self, job, desc, status_key_name)
   2636                 ),
   2637                 allowed_statuses=["Completed", "Stopped"],
-> 2638                 actual_status=status,
   2639             )
   2640 

UnexpectedStatusException: Error for Processing job maslick-sagemaker-processing-image-2020-06-11-15-42-34-593: Failed. Reason: InternalServerError: We encountered an internal error.  Please try again.

System information

Additional context See my stackoverflow question for more details.

RoelantStegmann commented 3 years ago

Any ideas? It's rather annoying - for the moment we're just uploading inside the container but that removes part of the "ease" of sagemaker processing