awslabs / dynamodb-streams-kinesis-adapter

The Amazon DynamoDB Streams Adapter implements the Amazon Kinesis interface so that your application can use KCL to consume and process data from a DynamoDB stream.
Apache License 2.0
97 stars 37 forks source link

Not able to get this to work with assumeRole credentials #45

Closed jimbobhickville closed 3 years ago

jimbobhickville commented 3 years ago

Perhaps this is by design, but the shard sync background thread traps all exceptions and issues log warnings, but does not provide any mechanism for handling errors or bubble the exceptions up to any calling code. We have a situation where we need the same process to talk to dynamodb endpoints in multiple AWS accounts, so the dynamodb client we pass to the streams adapter uses a token generated by sts AssumeRoleRequest. However, those credentials expire after a time and the streams kinesis adapter just goes into an infinite loop of logging error messages.

We tried creating a custom AwsCredentialsProvider that will detect the expired credentials and issue a new AssumeRoleRequest whenever getCredentials() is called, but it seems that that code is never called by the background thread (we see only 1 logging instance of it being called when we first initialize the stream).

We're using version 1.5.1 of this project with version 1.11.828 of the AWS SDK.

Is there some way to force it to refresh the credentials or at a minimum bubble up the exception so we can trap it and restart the stream with new credentials?

Stacktrace (if helpful):

com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The security token included in the request is expired (Service: AmazonDynamoDBStreams; Status Code: 400; Error Code: ExpiredTokenException; Request ID: G762UUV7S4ASDEQISM6U4CET2VVV4KQNSO5AEMVJF66Q9ASUAAJG)
       at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
       at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
       at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
       at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
       at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
       at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
       at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
       at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
       at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
       at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
       at com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.doInvoke(AmazonDynamoDBStreamsClient.java:686)
       at com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:653)
       at com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:642)
       at com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.executeDescribeStream(AmazonDynamoDBStreamsClient.java:361)
       at com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.describeStream(AmazonDynamoDBStreamsClient.java:332)
       at com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient.describeStream(AmazonDynamoDBStreamsAdapterClient.java:250)
       at com.amazonaws.services.dynamodbv2.streamsadapter.DynamoDBStreamsProxy.getStreamInfo(DynamoDBStreamsProxy.java:166)
       at com.amazonaws.services.dynamodbv2.streamsadapter.DynamoDBStreamsProxy.buildShardGraphSnapshot(DynamoDBStreamsProxy.java:279)
       at com.amazonaws.services.dynamodbv2.streamsadapter.DynamoDBStreamsProxy.getShardList(DynamoDBStreamsProxy.java:221)
       at com.amazonaws.services.dynamodbv2.streamsadapter.DynamoDBStreamsShardSyncer.getShardList(DynamoDBStreamsShardSyncer.java:320)
       at com.amazonaws.services.dynamodbv2.streamsadapter.DynamoDBStreamsShardSyncer.syncShardLeases(DynamoDBStreamsShardSyncer.java:124)
       at com.amazonaws.services.dynamodbv2.streamsadapter.DynamoDBStreamsShardSyncer.checkAndCreateLeasesForNewShards(DynamoDBStreamsShardSyncer.java:100)
       at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.checkAndCreateLeasesForNewShards(ShardSyncer.java:41)
       at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask.call(ShardSyncTask.java:84)
       at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
       at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTaskManager.lambda$checkAndSubmitNextTask$0(ShardSyncTaskManager.java:119)
jimbobhickville commented 3 years ago

This might be a logic error in my credential provider. I'm debugging it. Will close if so.

jimbobhickville commented 3 years ago

Yep, I had the logic wrong for determining if the credentials were expired.