aws / aws-cdk

The AWS Cloud Development Kit is a framework for defining cloud infrastructure in code
https://aws.amazon.com/cdk
Apache License 2.0
11.67k stars 3.92k forks source link

(stepfunctions): support as-you-go definition construction #15354

Closed knihit closed 3 years ago

knihit commented 3 years ago

If the state machine definition was created before the defining the complete Chain, the definition string does not contain the task states.

Reproduction Steps

Below are two code samples the first creates the definition string correctly, but the second one does not. Notice that the statement is created as the last step, in the first code sample.

Working sample

    const isLangAvailable = new sfn.Choice(this, 'LanguageAvailable?', {
        comment: 'Check if the language is available',
        inputPath: '$'
    });

    isLangAvailable.when(sfn.Condition.isNotPresent('$.feed.lang'), detectLangTask);
    isLangAvailable.otherwise(imageAnalysisTask);
    detectLangTask.next(imageAnalysisTask);
    imageAnalysisTask.next(parallel)
    .next(new sfn.Pass(this, 'MergeJson', {
        parameters: {
            'account_name.$': '$[1].account_name',
            'platform.$': '$[1].platform',
            'moderation_labels_in_imgs.$' : '$[0]',
        }
    }))
    .next(publishEventsTask.stepFunctionTask)
    .next(new sfn.Succeed(this, 'Success'));

    // end of creating workflow

    const workflowChain = sfn.Chain.start(isLangAvailable);
     this._stMachine = new StateMachine(this, 'WorkflowEngine', {
            definition: workflowChain,
             role: new Role(this, 'StateMachineRole', {
                assumedBy: new ServicePrincipal(`states.${Aws.REGION}.amazonaws.com`),
        });
    });

Second sample

   const isLangAvailable = new sfn.Choice(this, 'LanguageAvailable?', {
        comment: 'Check if the language is available',
        inputPath: '$'
    });

    const workflowChain = sfn.Chain.start(isLangAvailable);
        this._stMachine = new StateMachine(this, 'WorkflowEngine', {
            definition: workflowChain,
             role: new Role(this, 'StateMachineRole', {
                assumedBy: new ServicePrincipal(`states.${Aws.REGION}.amazonaws.com`),
        });
     });

    isLangAvailable.when(sfn.Condition.isNotPresent('$.feed.lang'), detectLangTask);
    isLangAvailable.otherwise(imageAnalysisTask);
    detectLangTask.next(imageAnalysisTask);
    imageAnalysisTask.next(parallel)
    .next(new sfn.Pass(this, 'MergeJson', {
        parameters: {
            'account_name.$': '$[1].account_name',
            'platform.$': '$[1].platform',
            'moderation_labels_in_imgs.$' : '$[0]',
        }
    }))
    .next(publishEventsTask.stepFunctionTask)
    .next(new sfn.Succeed(this, 'Success'));

    // end of creating workflow

What did you expect to happen?

The first and second code should generate the same definition string, if not it should throw an error because here the sequence of statements should not matter.

What actually happened?

The definition string for the state machine with the second example was undeployable and CloudFormation returned an error.

Resource handler returned message: "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED: These fields are required: [Choices] at /States/LanguageAvailable?, MISSING_END_STATE: Workflow has no terminal state at null' 

Environment

Other

Definition string from the first code sample.

"DefinitionString": {
          "Fn::Join": [
            "",
            [
              "{\"StartAt\":\"LanguageAvailable?\",\"States\":{\"LanguageAvailable?\":{\"Type\":\"Choice\",\"Comment\":\"Check if the language is available\",\"InputPath\":\"$\",\"Choices\":[{\"Variable\":\"$.feed.lang\",\"IsPresent\":false,\"Next\":\"DetectLangSendMessage\"},{\"Variable\":\"$.feed.lang\",\"StringEquals\":\"und\",\"Next\":\"DetectLangSendMessage\"}],\"Default\":\"ImageAnalysisSendMessage\"},\"ImageAnalysisSendMessage\":{\"Next\":\"ImageAnalysisTaskComplete\",\"Retry\":[{\"ErrorEquals\":[\"States.ALL\"],\"IntervalSeconds\":3,\"MaxAttempts\":5,\"BackoffRate\":1.2}],\"Catch\":[{\"ErrorEquals\":[\"States.ALL\"],\"Next\":\"ImageAnalysisTaskFailed\"}],\"Type\":\"Task\",\"HeartbeatSeconds\":900,\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
              {
                "Ref": "AWS::Partition"
              },
              ":states:::sqs:sendMessage.waitForTaskToken\",\"Parameters\":{\"QueueUrl\":\"",
              {
                "Ref": "TextWfEngineImageAnalysisTaskqueue33EB53E2"
              },
              "\",\"MessageBody\":{\"input.$\":\"$\",\"token.$\":\"$$.Task.Token\"}}},\"DetectLangTaskComplete\":{\"Type\":\"Pass\",\"Next\":\"ImageAnalysisSendMessage\"},\"DetectLangSendMessage\":{\"Next\":\"DetectLangTaskComplete\",\"Retry\":[{\"ErrorEquals\":[\"States.ALL\"],\"IntervalSeconds\":3,\"MaxAttempts\":5,\"BackoffRate\":1.2}],\"Catch\":[{\"ErrorEquals\":[\"States.ALL\"],\"Next\":\"DetectLangTaskFailed\"}],\"Type\":\"Task\",\"HeartbeatSeconds\":900,\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
              {
                "Ref": "AWS::Partition"
              },
              ":states:::sqs:sendMessage.waitForTaskToken\",\"Parameters\":{\"QueueUrl\":\"",
              {
                "Ref": "TextWfEngineDetectLangTaskqueue3D68D883"
              },
              "\",\"MessageBody\":{\"input.$\":\"$\",\"token.$\":\"$$.Task.Token\"}}},\"DetectLangTaskFailed\":{\"Type\":\"Fail\",\"Cause\":\"$.cause\"},\"ImageAnalysisTaskComplete\":{\"Type\":\"Pass\",\"Next\":\"TextImageSplitProcess\"},\"TextImageSplitProcess\":{\"Type\":\"Parallel\",\"Comment\":\"Parallely process Text and Image\",\"Next\":\"MergeJson\",\"InputPath\":\"$\",\"OutputPath\":\"$\",\"Branches\":[{\"StartAt\":\"DetectModerationLabels\",\"States\":{\"DetectModerationLabels\":{\"End\":true,\"Retry\":[{\"ErrorEquals\":[\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2}],\"Type\":\"Task\",\"HeartbeatSeconds\":900,\"OutputPath\":\"$.Payload.moderation_labels_in_imgs\",\"Resource\":\"arn:",
              {
                "Ref": "AWS::Partition"
              },
              ":states:::lambda:invoke\",\"Parameters\":{\"FunctionName\":\"",
              {
                "Fn::GetAtt": [
                  "TextWfEngineModerationLabelsLambdaFunction0AF70AF5",
                  "Arn"
                ]
              },
              "\",\"Payload.$\":\"$\"}}}},{\"StartAt\":\"Translate\",\"States\":{\"Translate\":{\"Next\":\"AnalyzeText\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2}],\"Type\":\"Task\",\"HeartbeatSeconds\":900,\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
              {
                "Ref": "AWS::Partition"
              },
              ":states:::lambda:invoke\",\"Parameters\":{\"FunctionName\":\"",
              {
                "Fn::GetAtt": [
                  "TextWfEngineTranslateLambdaFunction06FA2EE3",
                  "Arn"
                ]
              },
              "\",\"Payload.$\":\"$\"}},\"AnalyzeText\":{\"End\":true,\"Retry\":[{\"ErrorEquals\":[\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2}],\"Type\":\"Task\",\"HeartbeatSeconds\":900,\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
              {
                "Ref": "AWS::Partition"
              },
              ":states:::lambda:invoke\",\"Parameters\":{\"FunctionName\":\"",
              {
                "Fn::GetAtt": [
                  "TextWfEngineTextAnalysisLambdaFunction36D6F05C",
                  "Arn"
                ]
              },
              "\",\"Payload.$\":\"$\"}}}}]},\"MergeJson\":{\"Type\":\"Pass\",\"Parameters\":{\"account_name.$\":\"$[1].account_name\",\"platform.$\":\"$[1].platform\",\"search_query.$\":\"$[1].search_query\",\"feed.$\":\"$[1].feed\",\"Sentiment.$\":\"$[1].Sentiment\",\"SentimentScore.$\":\"$[1].SentimentScore\",\"KeyPhrases.$\":\"$[1].KeyPhrases\",\"Entities.$\":\"$[1].Entities\",\"moderation_labels_in_imgs.$\":\"$[0]\",\"text_in_images.$\":\"$[1].text_in_images\"},\"Next\":\"PublishEvent\"},\"PublishEvent\":{\"Next\":\"Success\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2}],\"Type\":\"Task\",\"HeartbeatSeconds\":900,\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
              {
                "Ref": "AWS::Partition"
              },
              ":states:::lambda:invoke\",\"Parameters\":{\"FunctionName\":\"",
              {
                "Fn::GetAtt": [
                  "TextWfEnginePublishEventsLambdaFunctionF22DF4EC",
                  "Arn"
                ]
              },
              "\",\"Payload.$\":\"$\"}},\"Success\":{\"Type\":\"Succeed\"},\"ImageAnalysisTaskFailed\":{\"Type\":\"Fail\",\"Cause\":\"$.cause\"}}}"
            ]
          ]
        },

Definition string from second code sample

"DefinitionString": "{\"StartAt\":\"LanguageAvailable?\",\"States\":{\"LanguageAvailable?\":{\"Type\":\"Choice\",\"Comment\":\"Check if the language is available\",\"InputPath\":\"$\"}}}",

This is :bug: Bug Report

BenChaimberg commented 3 years ago

When a StateMachine is constructed, it uses the state graph to both render a definition as well as assign the appropriate permissions. Due to the way synthesis occurs (bottom-up), I don't believe it's possible to support "as-you-go" state machine definition, even with a major re-working of how we go about it currently. Is there a particular reason why you want to construct the state machine before the definition has been finalized?

knihit commented 3 years ago

So my use case was to use SqsSendMessage. So as part of this I have an SQS queue and a lambda function. This lambda function requires policy permissions to [ 'states:SendTaskSuccess', 'states:sendTaskFailure', 'states:SendTaskHeartbeat' ] to let the state machine know that the task is complete. I wanted to narrow down the resources to the state machine but if the machine is not available it would have to be '*'. Defining the machine in the beginning would allows the policy to have the resource arn to be specified in the lambda policy. I understand your viewpoint. I am trying to think how to rearrange my construct structure so that I can attach that policy later once the definition and state machine is created. Appreciate your help.

peterwoodworth commented 3 years ago

Hey @knihit have you made any progress here?

I am trying to think how to rearrange my construct structure so that I can attach that policy later once the definition and state machine is created

Do you have any additional questions?

knihit commented 3 years ago

Hi @peterwoodworth, for the immediate future I have decided to have the resources as '*' in the policy, For future refactoring, I am planning to add a method to the construct class I created, which I can invoke post the statemachine is created. Since I am using the stepfunction callback integration pattern where the task is published to SQS with a task token, I do have the flexibility to define the lambda (consumer of the SQS queue) IAM policy (for sendTaskSuccess/ Failure/ Heartbeat) once the stepfunction workflow is instantiated. If Cloudformation dependencies for are not resolved automatically for any reason here, I can add 'DependsOn' clauses to make it work. This is how I plan to solve it. If there are any other suggestions, please let me know.

peterwoodworth commented 3 years ago

Let me know if that works out for you, and if not feel free to ask any questions

github-actions[bot] commented 3 years ago

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.