aws / aws-cdk

The AWS Cloud Development Kit is a framework for defining cloud infrastructure in code
https://aws.amazon.com/cdk
Apache License 2.0
11.68k stars 3.92k forks source link

in stepfunctions, GlueStartJobRun ignores "integration_pattern" argument #8599

Closed psolaimani closed 4 years ago

psolaimani commented 4 years ago

when creating a step function where you need to run Glue jobs sequentially, the integration_pattern argument is ignored. This was the case with multiple cdk versions including 1.45.0 (current latest)

Reproduction Steps

from aws_cdk import (
    core,
    aws_iam,
    aws_glue,
    aws_stepfunctions_tasks as tasks,
    aws_stepfunctions as sf
)

class CdkSfStack(core.Stack):

    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        glue_role = aws_iam.Role(
            scope=self,
            id="Role__id",
            assumed_by=aws_iam.ServicePrincipal("glue.amazonaws.com"),
        )

        glue_job = aws_glue.CfnJob(
            scope=self,
            id="CfnJob__id",
            name="CfnJob__name",
            description="CfnJob_description",
            role=glue_role.role_arn,
            glue_version="1.0",
            command=aws_glue.CfnJob.JobCommandProperty(
                name="glueetl",
                python_version="3",
                script_location="CfnJob_command__script_location.py")
            )

        glue1_task = tasks.GlueStartJobRun(
                            scope=self,
                            id="GlueStartJobRun__id1",
                            glue_job_name="CfnJob__name1",
                            # integration_pattern=sf.IntegrationPattern.REQUEST_RESPONSE
                        )

        glue2_task = tasks.GlueStartJobRun(
                            scope=self,
                            id="GlueStartJobRun__id2",
                            glue_job_name="CfnJob__name2",
                            # integration_pattern=sf.IntegrationPattern.REQUEST_RESPONSE
                        )

        definition = glue1_task.next(glue2_task)

        sm = sf.StateMachine(
            scope=self, 
            id="StateMachine__id", 
            state_machine_name=f"StateMachine__state_machine_name",
            definition=definition, 
            timeout=core.Duration.hours(2)
        )

Error Log

relevant part of CloudFormation yaml

 StateMachineidRoleDefaultPolicy54557824:
    Type: AWS::IAM::Policy
    Properties:
      PolicyDocument:
        Statement:
          - Action: glue:StartJobRun
            Effect: Allow
            Resource:
              Fn::Join:
                - ""
                - - "arn:"
                  - Ref: AWS::Partition
                  - :glue:eu-west-1:account1:job/CfnJob__name1
          - Action: glue:StartJobRun
            Effect: Allow
            Resource:
              Fn::Join:
                - ""
                - - "arn:"
                  - Ref: AWS::Partition
                  - :glue:eu-west-1:account1:job/CfnJob__name2
        Version: "2012-10-17"
      PolicyName: StateMachineidRoleDefaultPolicy54557824
      Roles:
        - Ref: StateMachineidRoleAA791458
    Metadata:
      aws:cdk:path: cdk-sf/StateMachine__id/Role/DefaultPolicy/Resource
  StateMachineid19F73298:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      RoleArn:
        Fn::GetAtt:
          - StateMachineidRoleAA791458
          - Arn
      DefinitionString:
        Fn::Join:
          - ""
          - - '{"StartAt":"GlueStartJobRun__id1","States":{"GlueStartJobRun__id1":{"Next":"GlueStartJobRun__id2","Type":"Task","Resource":"arn:'
            - Ref: AWS::Partition
            - ':states:::glue:startJobRun","Parameters":{"JobName":"CfnJob__name1"}},"GlueStartJobRun__id2":{"End":true,"Type":"Task","Resource":"arn:'
            - Ref: AWS::Partition
            - :states:::glue:startJobRun","Parameters":{"JobName":"CfnJob__name2"}}},"TimeoutSeconds":7200}
      StateMachineName: StateMachine__state_machine_name
    DependsOn:
      - StateMachineidRoleDefaultPolicy54557824
      - StateMachineidRoleAA791458
    Metadata:
      aws:cdk:path: cdk-sf/StateMachine__id/Resource
  CDKMetadata:
    Type: AWS::CDK::Metadata
    Properties:
      Modules: aws-cdk=1.45.0,@aws-cdk/assets=1.45.0,@aws-cdk/aws-apigateway=1.45.0,@aws-cdk/aws-applicationautoscaling=1.45.0,@aws-cdk/aws-autoscaling=1.45.0,@aws-cdk/aws-autoscaling-common=1.45.0,@aws-cdk/aws-autoscaling-hooktargets=1.45.0,@aws-cdk/aws-batch=1.45.0,@aws-cdk/aws-certificatemanager=1.45.0,@aws-cdk/aws-cloudformation=1.45.0,@aws-cdk/aws-cloudfront=1.45.0,@aws-cdk/aws-cloudwatch=1.45.0,@aws-cdk/aws-cognito=1.45.0,@aws-cdk/aws-ec2=1.45.0,@aws-cdk/aws-ecr=1.45.0,@aws-cdk/aws-ecr-assets=1.45.0,@aws-cdk/aws-ecs=1.45.0,@aws-cdk/aws-elasticloadbalancing=1.45.0,@aws-cdk/aws-elasticloadbalancingv2=1.45.0,@aws-cdk/aws-events=1.45.0,@aws-cdk/aws-glue=1.45.0,@aws-cdk/aws-iam=1.45.0,@aws-cdk/aws-kms=1.45.0,@aws-cdk/aws-lambda=1.45.0,@aws-cdk/aws-logs=1.45.0,@aws-cdk/aws-route53=1.45.0,@aws-cdk/aws-route53-targets=1.45.0,@aws-cdk/aws-s3=1.45.0,@aws-cdk/aws-s3-assets=1.45.0,@aws-cdk/aws-sam=1.45.0,@aws-cdk/aws-secretsmanager=1.45.0,@aws-cdk/aws-servicediscovery=1.45.0,@aws-cdk/aws-sns=1.45.0,@aws-cdk/aws-sns-subscriptions=1.45.0,@aws-cdk/aws-sqs=1.45.0,@aws-cdk/aws-ssm=1.45.0,@aws-cdk/aws-stepfunctions=1.45.0,@aws-cdk/aws-stepfunctions-tasks=1.45.0,@aws-cdk/cdk-assets-schema=1.45.0,@aws-cdk/cloud-assembly-schema=1.45.0,@aws-cdk/core=1.45.0,@aws-cdk/custom-resources=1.45.0,@aws-cdk/cx-api=1.45.0,@aws-cdk/region-info=1.45.0,jsii-runtime=Python/3.6.10

Environment

Other

Same happens in CodeBuild (amazonlinux2-x86_64-standard:3.0, Python 3.7, CDK 1.45.0)


This is :bug: Bug Report

shivlaks commented 4 years ago

@psolaimani - The Request Response integration pattern should not have any suffix in the Resource as it appears to be based on your template. Keep in mind that request response means that your state machine will call the API and then move on when it gets a response (not necessarily when the job completes running).

If you want your state machine to wait until the Glue job itself finishes running, then you would want to use RUN_JOB. In this case, the resource would have a suffix of .sync.

it sounds like the integration pattern you want to use is RUN_JOB. Have you tried that and found that it has no effect?

A minimal repro that captures the expected behaviour as well as the observed behaviour would help.

shivlaks commented 4 years ago

marking this as a guidance issue for now - can add the bug label back when we have repro steps.

github-actions[bot] commented 4 years ago

This issue has not received a response in a while. If you want to keep this issue open, please leave a comment below and auto-close will be canceled.

jacido commented 11 months ago

Greetings, I deployed this solution in my dev environment and was wondering how I may validate that the task is synchronous?

As you may know, Step Functions is an orchestration service, if a use-case involves orchestrating/managing multiple tasks in sequence or in parallel, Step Functions could help with that.

In this scenario, we will start a Glue job and will periodically check on the job status by polling Glue. After the job is done we will continue on to the next step as specified in the state machine definition. It will wait for Glue to finish the job run before proceeding to the next state in the state machine.

After delving through documentation, I read that it can be possible in this use-case through .sync integration Using CDK, it can be specified through "IntegrationPattern" of "RUN_JOB" to make that state synchronous.

lauragalera commented 5 months ago

I've added the integration pattern RUN_JOB, but it doesn't appear with the suffix .sync when I check it on console.

Here is how I set up my state machine:

        const jobIngestionMixpanel = new GlueStartJobRun(this, 'StartJobIngestionMixpanel', {
            glueJobName: 'job1',
            integretaionPatter: IntegrationPattern.RUN_JOB
        })

        jobIngestionMixpanel.addRetry({
            errors: ['States.TaskFailed'],
            backoffRate: 1,
            maxAttempts: 2,
            interval: Duration.minutes(5)
        })

        jobIngestionMixpanel.addCatch(new Fail(this, 'JobFailed', {
            error: 'Glue job failed after retries'
        }))

        const jobDictionaryMixpanel = new GlueStartJobRun(this, 'StartJobDictMixpanel', {
            glueJobName: 'job2',
            integretaionPatter: IntegrationPattern.RUN_JOB
        })

        const definition = jobIngestionMixpanel.next(jobDictionaryMixpanel)

        new StateMachine(this, 'StateMachineJobs', {
            definitionBody: DefinitionBody.fromChainable(definition),
        })

And here the cloudformation template

 "StateMachineJobsB3D6E122": {
      "Type": "AWS::StepFunctions::StateMachine",
      "Properties": {
        "DefinitionString": {
          "Fn::Join": [
            "",
            [
              "{\"StartAt\":\"StartJobIngestionMixpanel\",\"States\":{\"StartJobIngestionMixpanel\":{\"Next\":\"StartJobDictMixpanel\",\"Retry\":[{\"ErrorEquals\":[\"States.TaskFailed\"],\"IntervalSeconds\":300,\"MaxAttempts\":2,\"BackoffRate\":1}],\"Catch\":[{\"ErrorEquals\":[\"States.ALL\"],\"Next\":\"JobFailed\"}],\"Type\":\"Task\",\"Resource\":\"arn:",
              {
                "Ref": "AWS::Partition"
              },
              ":states:::glue:startJobRun\",\"Parameters\":{\"JobName\":\"job2\"}},\"StartJobDictMixpanel\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"arn:",
              {
                "Ref": "AWS::Partition"
              },
              ":states:::glue:startJobRun\",\"Parameters\":{\"JobName\":\"job1\"}},\"JobFailed\":{\"Type\":\"Fail\",\"Error\":\"Glue job failed after retries\"}}}"
            ]
          ]
        }
       }}

I did a test creating the state machine on console directly before cdk, and the jobs ran synchronously if I added the .sync suffix, but I've tried RUN_JOB in cdk and it doesn't work.

My desired state machine:

{
  "Comment": "A state machine that starts a Glue job, retries upon failure, and proceeds differently upon success",
  "StartAt": "GlueStartJobRun",
  "States": {
    "GlueStartJobRun": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "job1"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "IntervalSeconds": 300,
          "MaxAttempts": 2,
          "BackoffRate": 1
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "JobFailed"
        }
      ],
      "Next": "Glue StartJobRun"
    },
    "Glue StartJobRun": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "job2"
      },
      "End": true
    },
    "JobFailed": {
      "Type": "Fail",
      "Error": "Glue job failed after retries."
    }
  }
}

What the above cdk code reproduced:

{
  "StartAt": "StartJobIngestionMixpanel",
  "States": {
    "StartJobIngestionMixpanel": {
      "Next": "StartJobDictMixpanel",
      "Retry": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "IntervalSeconds": 300,
          "MaxAttempts": 2,
          "BackoffRate": 1
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "JobFailed"
        }
      ],
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun",
      "Parameters": {
        "JobName": "job1"
      }
    },
    "StartJobDictMixpanel": {
      "End": true,
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun",
      "Parameters": {
        "JobName": "job2"
      }
    },
    "JobFailed": {
      "Type": "Fail",
      "Error": "Glue job failed after retries"
    }
  }
}