aws / aws-step-functions-data-science-sdk-python

Step Functions Data Science SDK for building machine learning (ML) workflows and pipelines on AWS
Apache License 2.0
288 stars 88 forks source link

feat: Add support for nested Aws StepFunctions service integration #166

Closed ca-nguyen closed 3 years ago

ca-nguyen commented 3 years ago

Description

Fixes #125

Why is the change necessary?

This adds support for AWS Step Functions service integration. Added StartExecution step that allows to start an execution of another state machine

Solution

Provide support for 3 integration patterns by using IntegrationPattern enum in the step constructor to determine how to call the integrated service. The IntegrationPattern is used to build a Resource arn: IntegrationPattern Resource Doc
WaitForCompletion "arn:aws:states:::states:startExecution.sync:2" Run A job
WaitForTaskToken "arn:aws:states:::states:startExecution.waitForTaskToken" Wait for a Callback with the Task Token
CallAndContinue "arn:aws:states:::states:startExecution" Request Response

See Service Integration Patterns for more details

Omitting support for "arn:aws:states:::states:startExecution.sync" resource that allows to get a string format response. CDK does not expose a property to configure the response format and there was no demand for one. If there is an ask to get a response in string format, we can easily add the possibility to do so in the future.

Renaming IntegrationPattern.RequestResponse to IntegrationPattern.CallAndContinue

Note

Changing the existing IntegrationPattern is technically backwards incompatible, but this is acceptable due to the following:

  1. IntegrationPattern was not documented or exposed to any "public" interface
  2. We received feedback from our customers that RequestResponse was a confusing term as it sounded synchronous and they were not sure what it meant. After lengthy discussion with the team, we settled on CallAndContinue which indicates clearly that we are not waiting for the state execution to end before continuing to the next state.

After the next release, IntegrationPattern will be something we need to protect from backwards incompatible changes since it is part of the step constructor args.

Trade-offs

Future considerations

To provide customers with an intuitive and consistent experience when using service integrations, I am proposing that we follow this pattern for all future integrations. This includes the services integrations that have not been released yet (EKS and Glue Databrew).

What about existing services?

Once we are ready for a major update, let's change existing service integrations to use the enum pattern to provide a consistent customer experience throughout.

Testing

Test1: WaitForCompletion

Validated that the state machine succeeded after the nested state machine execution succeeded (after 60 s wait time)

# Created state machine with a Wait step 
wait_workflow = Workflow(
    name="WaitWF",
    definition=Wait("Wait State", seconds=60),
    role=workflow_execution_role
)
wait_workflow_arn = wait_workflow.create()

# Created state machine with nested state machine using default and WaitForCompletion integration pattern
start_execution_step_default = "StartExecutionDefault", StepFunctionsStartExecutionStep(
    "SFN Start Execution", parameters={
        "StateMachineArn": wait_workflow_arn,
        "Name": unique_name_from_base("StartExecution1")
    })

start_execution_step_sync = "StartExecutionSync", StepFunctionsStartExecutionStep(
    "SFN Start Execution - Sync", integration_pattern=IntegrationPattern.WaitForCompletion,
    parameters={
        "StateMachineArn": wait_workflow_arn,
        "Name": unique_name_from_base("StartExecutionSync")
    })

Test2: CallAndContinue

Validated that the state machine succeeded without waiting for the nested step machine execution to complete

# Using same wait_workflow defined in Test1
# Created state machine with nested state machine using CallAndContinue integration pattern
start_execution_step_call_and_continue = "StartExecutionCallAndContinue", StepFunctionsStartExecutionStep(
    "SFN Start Execution", integration_pattern=IntegrationPattern.CallAndContinue, parameters={
        "StateMachineArn": wait_workflow_arn,
        "Name": unique_name_from_base("StartExecutionCallAnContinue")
    })

Test3 : WaitForTaskToken

Validated that the state machine succeeded after the token was returned

# Created state machine with a LambdaStep that invokes `SendTaskSuccessFunction` and takes the token in input:
# Note: `SendTaskSuccessFunction` lambda function calls SendTaskSuccess with the token provided as input

lambda_params = {
    "FunctionName": "SendTaskSuccessFunction",  
    "Payload.$": "$"
}

send_success_workflow = Workflow(
    name="SendTaskSuccessWF",
    definition=LambdaStep("Lambda Step", parameters=lambda_params),
    role=workflow_execution_role
)

send_success_workflow_arn = send_success_workflow.create()

# Created state machine with nested state machine usinf WaitForTaskToken integration pattern

start_execution_step_wait_for_task_token = "StartExecutionWaitForTaskToken", StepFunctionsStartExecutionStep(
    "SFN Start Execution - Wait for Callback", integration_pattern=IntegrationPattern.WaitForTaskToken, parameters={
        "Input": {
            "token.$": "$$.Task.Token"
        },
        "StateMachineArn": send_success_workflow_arn,
        "Name": unique_name_from_base("StartExecutionCallback")
    })

Pull Request Checklist

Please check all boxes (including N/A items)

Testing

Documentation

Title and description


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license.

shivlaks commented 3 years ago

not directly related to this PR, but since we are trying to address the suggestion in #74 - any reason we shouldn't follow the same pattern for un-released service integrations we've added recently in #156 and #151 (not in this PR)

I feel we should establish that in all service integrations we support, but especially ones we have not released yet.

thoughts? - @wong-a @ca-nguyen

ca-nguyen commented 3 years ago

not directly related to this PR, but since we are trying to address the suggestion in #74 - any reason we shouldn't follow the same pattern for un-released service integrations we've added recently in #156 and #151 (not in this PR)

Since these service integrations have not been released yet, it makes sense to follow the same patterns that provide a clearer solution to our customers.

I feel we should establish that in all service integrations we support, but especially ones we have not released yet.

I agree - ideally, we are consistent through all the service integrations

ca-nguyen commented 3 years ago

I don't think this is a design decision at all and really the only tradeoff worth documenting is that we are opting into inconsistency for simplicity. The future consideration worth documenting is what we will do with existing service integrations as well as all future integrations. Will other service integrations now follow the enum pattern or fall back to past patterns?

I have updated the CR summary to only reflect that tradeoff and discuss future considerations

ca-nguyen commented 3 years ago

But no integ tests are in the diff. Did you forget to commit them? What kind of manual testing did you do?

I added details on the Manual tests that were performed in the Testing section of the PR summary. No integ test were added since we needed an external Lambda function to test. This was checked since this was N/A and to reflect that no more tasks were left. The template says:

Please check all boxes (including N/A items)

I updated with a Note next to the checked task to avoid confusion

ca-nguyen commented 3 years ago

Unit tests are failing due to PyYAML upgrade to v6.0.0 PR to fix failing tests: https://github.com/aws/aws-step-functions-data-science-sdk-python/pull/172

StepFunctions-Bot commented 3 years ago

AWS CodeBuild CI Report

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository