aws / aws-step-functions-data-science-sdk-python

Step Functions Data Science SDK for building machine learning (ML) workflows and pipelines on AWS
Apache License 2.0
285 stars 87 forks source link

No support for intrinsic functions #79

Open xpaulnim opened 4 years ago

xpaulnim commented 4 years ago

The library does not appear to have support for Intrinsic Functions. Here is a list of intrinsic functions available in aws states language.

yoodan93 commented 3 years ago

Hi @xpaulnim, we are planning on supporting changes made to the Amazon States Language which includes Intrinsic Functions by 10/31.

Will keep you updated once Intrinsic Functions becomes available in the SDK

matiassciencenow commented 3 years ago

any updates on this?

xpaulnim commented 3 years ago

I personally ended up using a workaround whereby I edit the step parameters. Something like

from stepfunctions.steps.sagemaker import ProcessingStep

step = ProcessingStep(...)
step_params = step.parameters
step_params.pop('ProcessingJobName')
step_params['ProcessingJobName.$'] = f"States.Format('NewStepName-{{}}', $$.Execution.Input['execution_id'])"
step.update_parameters(step_params)
wong-a commented 3 years ago

Sorry for the delay on this one. Our current priority is a v2 release (https://github.com/aws/aws-step-functions-data-science-sdk-python/issues/108) but this is still on our radar.

As @xpaulnim mentioned, you are not blocked from using intrinsic functions in the current version. You can pass an intrinsic function wherever you provide parameters. Beyond this, are there any utilities or APIs for intrinsic functions that you would like to see supported in the SDK?

Here's another example with a Pass state:

hello = Pass(
    state_id="hello",     
    parameters={
        "param.$": "States.Format('Hello {}', name)" 
    }
)

hello_world = Workflow(
    name="MyWorkflow_123",
    definition=Chain([hello]),
    role='arn:aws:iam::12345678912:role/dummy'
)
print(hello_world.definition.to_json(pretty=True))

Which produces the following:

{
    "StartAt": "hello",
    "States": {
        "hello": {
            "Parameters": {
                "param.$": "States.Format('Hello {}', name)"
            },
            "Type": "Pass",
            "End": true
      }
}
a13zen commented 3 years ago

Any new updates on this?

shivlaks commented 3 years ago

@a13zen - have you tried passing intrinsic functions as parameters?

we have not gotten into design of utilities/APIs just yet, but are looking to prioritize filling in Amazon States Language gaps that are not yet supported in the data science SDK.

What APIs or utility functions would simplify working with intrinsic functions for you?

a13zen commented 3 years ago

@shivlaks, one common ask is for ml pipelines to have all input/outputs for each step stored in the same location on s3 for lineage reasons.

In order to do that, we commonly use the following pattern:

def step_output_path(*, suffix):
    return f"States.Format('{bucket_uri}/{{}}/{{}}/{suffix}', $$.Execution.Name, $$.State.Name)"

def execution_root_path(*, suffix):
    return f"States.Format('{bucket_uri}/{{}}/{suffix}', $$.Execution.Name)"

Then we would use those functions to generate the dynamic input/output path for steps.

# Specifying outputs for data_load step (ProcessingStep)
...
data_load_outputs = [
    ProcessingOutput(
        source="/opt/ml/processing/output/",
        destination=step_output_path(suffix="raw-data"),
        output_name="raw_data",
    )
]
...

# Specify inputs for feature_engineering step (ProcessingStep)
inputs = [
    ProcessingInput(
        source=execution_root_path(suffix='data-load-step/raw-data/'),
        destination="/opt/ml/processing/input/data",
        input_name="input-raw-data",
    ),
]

This allows us to have inputs/outputs always live in s3://<bucket>/<step_function_execution_name>/<step_name>

However, since these inputs and outputs do not detect when their values contain an intrinsic function or $, it doesn't suffix the .$ to the key. We do this manually with a function based on @xpaulnim's code above.

def fix_dynamic_params(step):
    step_params = step.parameters
    for param_key, param_value in step_params.items():
        # Fix ProcessingSteps
        if param_key == 'ProcessingInputs':
            for pi in param_value:
                if '$' in pi['S3Input']['S3Uri']:
                    pi['S3Input']['S3Uri.$'] = pi['S3Input'].pop('S3Uri', None)  
        if param_key == 'ProcessingOutputConfig':
            for po in param_value['Outputs']:
                if '$' in po['S3Output']['S3Uri']:
                    po['S3Output']['S3Uri.$'] = po['S3Output'].pop('S3Uri', None)
        #  Fix TrainingSteps
            ... 
        # Fix TuningSteps
            ...
        # Fix TransformSteps
           ...

We then simply call this function for each step

data_load_step = ProcessingStep(....)
fix_dynamic_params(data_load_step)

We use these dynamic paths for ProcessingSteps, TrainingSteps, TransformSteps, LambdaSteps etc.

Actually, any step that supports JSONPath Instrinsic functions or substitutions I think should out of the box auto-fix the key like above.

a13zen commented 3 years ago

We also have an utility function for finding the trained model when using such dynamic paths

def trained_model_path(*, suffix):
    return f"States.Format('{bucket_uri}/{{}}/{suffix}/{{}}/output/', $$.Execution.Name, $.training_job_name)"

Since training job name is usually specified as ExecutionInput

rodrick10 commented 2 years ago

Fields like the output_data_config_path in the TrainingStep do not have a unique/direct parameter, such as the example illustrated by @xpaulnim.

There are workarounds for that as well, but the purpose of an SDK should be to simplify and not to overcomplicate things.

I understand there should be things with a higher priority, but this feature is highly needed.

MorganWeiss commented 1 year ago

Any update on this? This is exactly what I need.

ZMarouani commented 6 months ago

Any updates ?