aws / aws-cdk

The AWS Cloud Development Kit is a framework for defining cloud infrastructure in code
https://aws.amazon.com/cdk
Apache License 2.0
11.51k stars 3.85k forks source link

lambda.add_event_source: Unable to add trigger to existing Stream enabled DynamoDB table #24731

Open PinchofLogic opened 1 year ago

PinchofLogic commented 1 year ago

Describe the bug

If we add an existing DynamoDB stream-enabled table using "dynamodb.Table.from_table_arn".

The lambda add_event_source using "DynamoEventSource" throws a runtime error "RuntimeError: Error: DynamoDB Streams must be enabled on the table".

Expected Behavior

Should be able to add the event trigger for the DynamoDB table.

Current Behavior

Not able to add the event trigger.

Reproduction Steps

Below CDK resources should be able to deploy:

Stream enabled DynamoDB table

    existing_table = _dynamodb.Table.from_table_arn(
        self, "my_table", **TABLE_ARN**
    )

Simple lambda

  trigger_lambda = _lambda.Function(
        self,
        id="trigger",
        runtime=_lambda.Runtime.PYTHON_3_9,
        handler="lambda_trigger.lambda_handler",
        code=_lambda.Code.from_asset("lambda_functions/trigger")
    )

Lambda Trigger

trigger_lambda.add_event_source( DynamoEventSource( existing_table, starting_position=_lambda.StartingPosition.LATEST, batch_size=1, retry_attempts=3, filters=[ _lambda.FilterCriteria.filter( {"new_notification": _lambda.FilterRule.is_equal("INSERT")} ) ], ) )

Possible Solution

No response

Additional Information/Context

No response

CDK CLI Version

2.69.0 (build 60a5b2a

Framework Version

2.69.0

Node.js Version

16.17.0

OS

Mac OS 13.2.1

Language

Python

Language Version

3.10.0

Other information

No response

khushail commented 1 year ago

Hi @PinchofLogic , thanks for reporting and sharing the repro code. I am able to reproduce the same issue you are facing.

Currently I am marking it as P2 which means our team won't be able to work on it immediately. Although if you would like to contribute by submitting a PR , you can refer to these guidelines.

We use +1s to help prioritize our work, and are happy to re-evaluate this issue based on community feedback. You can reach out to the cdk.dev community on Slack to solicit support for re-prioritization.

antoniordz96 commented 1 year ago

@PinchofLogic have you been able to find a workaround for this? would it possible to import the table using fromTableAttributes?

antoniordz96 commented 1 year ago

For some background we have a construct that provisions a CFnGlobalTable. We've enabled dynamodbstreams on the globalDynmoDb but get the same issue as you when trying to add it as a trigger

antoniordz96 commented 1 year ago

Update we found a workaround.

We ended up using a custom resource to fetch the dynmoDB table arn.

    def _getTableStreamArn(self, deployment_region: str):
        awsSdkCall = AwsSdkCall(
            action="listStreams",
            service="DynamoDBStreams",
            region=deployment_region,
            parameters={"TableName": self._global_dynamodb_table_name},
            physical_resource_id=PhysicalResourceId.of(
                f"{self._global_dynamodb_table_name}-ListStreams"
            ),
        )
        res = AwsCustomResource(
            scope=self,
            id=f"{self._global_dynamodb_table_name}-AWSCustomResource-Stream",
            policy=AwsCustomResourcePolicy.from_statements(
                [PolicyStatement(actions=["dynamodb:*"], resources=["*"])]
            ),
            log_retention=RetentionDays.ONE_WEEK,
            on_create=awsSdkCall,
            on_update=awsSdkCall,
        )
        self._global_dynamodb_stream_arn = res.get_response_field(
            data_path="Streams.0.StreamArn"
        )

Since we use CfnTable we ended up creating another reference to the table using the Table.from_table_attributes method and passing in the table_name & table_stream_arn

example snippet:

        lambda_func.add_event_source(
            DynamoEventSource(
                Table.from_table_attributes(
                    self,
                    "fake-table",
                    table_name=dynamo_db_table_name,
                    table_stream_arn=res.get_response_field(
                        data_path="Streams.0.StreamArn"
                    ),
                ),
                starting_position=lambda_.StartingPosition.LATEST,
            )
        )
ikathuria commented 10 months ago

Hi, any update on this?

aplanos commented 9 months ago

I create another workaround, i was not able to create the trigger by updating the lamda cdk script, so what i did was the opposite. I passed a parameter to my dynamoDB cdk script with the lambda arn. Something like this:

if(lambdaArnTrigger != "") {

    const lambdaFunction = lambda.Function.fromFunctionArn(this, `Lambda-${lambdaArnTrigger}`, lambdaArnTrigger);
    dynamotable.grantStream(lambdaFunction);

    lambdaFunction.addEventSourceMapping(`EventSourceMapping-TableStream`, {
        eventSourceArn: dynamotable.tableStreamArn,
        maxBatchingWindow: Duration.seconds(1),
        startingPosition: StartingPosition.LATEST,
        batchSize: 1
    });
}

Before executing again the dynamoDB script i was required to update the Lambda policy with these permissions: 'dynamodb:DescribeStream', 'dynamodb:GetRecords', 'dynamodb:GetShardIterator', 'dynamodb:ListStreams'

Hope this could help to anyone

AleksandrBorkun commented 7 months ago

Hi, any updates on this?

LukaASoban commented 3 months ago

still getting this issue myself. An update would be great

just4give commented 2 months ago

Update we found a workaround.

We ended up using a custom resource to fetch the dynmoDB table arn.

    def _getTableStreamArn(self, deployment_region: str):
        awsSdkCall = AwsSdkCall(
            action="listStreams",
            service="DynamoDBStreams",
            region=deployment_region,
            parameters={"TableName": self._global_dynamodb_table_name},
            physical_resource_id=PhysicalResourceId.of(
                f"{self._global_dynamodb_table_name}-ListStreams"
            ),
        )
        res = AwsCustomResource(
            scope=self,
            id=f"{self._global_dynamodb_table_name}-AWSCustomResource-Stream",
            policy=AwsCustomResourcePolicy.from_statements(
                [PolicyStatement(actions=["dynamodb:*"], resources=["*"])]
            ),
            log_retention=RetentionDays.ONE_WEEK,
            on_create=awsSdkCall,
            on_update=awsSdkCall,
        )
        self._global_dynamodb_stream_arn = res.get_response_field(
            data_path="Streams.0.StreamArn"
        )

Since we use CfnTable we ended up creating another reference to the table using the Table.from_table_attributes method and passing in the table_name & table_stream_arn

example snippet:

        lambda_func.add_event_source(
            DynamoEventSource(
                Table.from_table_attributes(
                    self,
                    "fake-table",
                    table_name=dynamo_db_table_name,
                    table_stream_arn=res.get_response_field(
                        data_path="Streams.0.StreamArn"
                    ),
                ),
                starting_position=lambda_.StartingPosition.LATEST,
            )
        )

@antoniordz96 your work around worked for me as well - thank you! Instead of fromTable , I had to use fromTableAttributes passing table name and stream arn.

    const tableEntries = cdk.aws_dynamodb.Table.fromTableAttributes(this, "Entries", {
      tableName: "Entries",
      tableStreamArn: "steam_arn",
    }); 

dynamoStreamLambda.addEventSource(
      new DynamoEventSource(tableEntries, {
        startingPosition: lambda.StartingPosition.LATEST,
        batchSize: 1,
      })
    );

Cheers!