awslabs / amazon-kinesis-connector-flink

This is a fork of the Apache Flink Kinesis connector adding Enhanced Fanout support for Flink 1.8/1.11 on KDA.
Apache License 2.0
20 stars 11 forks source link

Validation exception for shardIteratorType being used with Ddb Stream in Amzn-Kinesis-Connector-Flink #9

Closed dbadami closed 3 years ago

dbadami commented 3 years ago

Hi,

I'm deploying a Flink application (1.11.1) and using a JAR of the amazon kinesis flink connector (pulled and built the JAR on December 7th, 2020). The application fails to start because of a validation exception from Ddb -- Ddb stream doesn't support AT_TIMESTAMP in its GetShardIterator API and from the stack trace the the connector is calling the API with this shard iterator type.

I'm using the FlinkDynamoDBStreamsConsumer in my application and it is being deployed to KDA using CDK. The configurations being passed to the consumer is the ARN of the Ddb stream, and "flink.stream.initpos": "LATEST". I also have a FlinkKinesisConsumer but from what I can tell it's not the cause of this issue (or at least I don't expect it to be as I don't expect it to call Ddb's API for a shard iterator).

I need to build the JAR from mainline because the 1.04 release doesn't have this change which was causing my deployments to fail earlier.

I've attached the exception that I'm seeing. Please let me know if there is any more information that is required from me.


software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: 1 validation error detected: Value 'AT_TIMESTAMP' at 'shardIteratorType' failed to satisfy constraint: Member must satisfy enum value set: [AFTER_SEQUENCE_NUMBER, LATEST, AT_SEQUENCE_NUMBER, TRIM_HORIZON] (Service: AmazonDynamoDBStreams; Status Code: 400; Error Code: ValidationException; Request ID: K8VO2NDEVCTH3TP4NAHK7OJMA3VV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.doInvoke(AmazonDynamoDBStreamsClient.java:686)
    at software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:653)
    at software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:642)
    at software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.executeGetShardIterator(AmazonDynamoDBStreamsClient.java:544)
    at software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.getShardIterator(AmazonDynamoDBStreamsClient.java:515)
    at software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient.getShardIterator(AmazonDynamoDBStreamsAdapterClient.java:355)
    at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:313)
    at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:304)
    at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.getShardIterator(PollingRecordPublisher.java:175)
    at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.<init>(PollingRecordPublisher.java:95)
    at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:88)
    at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:39)
    at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:501)
    at software.amazon.kinesis.connectors.flink.internals.DynamoDBStreamsDataFetcher.createShardConsumer(DynamoDBStreamsDataFetcher.java:110)
    at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:569)
    at software.amazon.kinesis.connectors.flink.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:350)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)```
dannycranmer commented 3 years ago

Thank-you for reporting this issue. This is caused by a recent change to reduce starting position drift when consuming from multiple Kinesis shards. This is impacting DDB consumers. We will patch this issue in a 1.0.5 and include the fix in 2.0.0. Since you are building from source, if you want to unblock yourself in the meantime, you can remove this if condition:

Before:

if (sequenceNumber.equals(SENTINEL_LATEST_SEQUENCE_NUM.get())) {
    // LATEST starting positions are translated to AT_TIMESTAMP starting positions. This is to prevent data loss
    // in the situation where the first read times out and is re-attempted. Consider the following scenario:
    // 1. Consume from LATEST
    // 2. No records are consumed and Record Publisher throws retryable error
    // 3. Restart consumption from LATEST
    // Any records sent between steps 1 and 3 are lost. Using the timestamp of step 1 allows the consumer to
    // restart from shard position of step 1, and hence no records are lost.
    return StartingPosition.fromTimestamp(new Date());
} else if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
    Date timestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(configProps);
    return StartingPosition.fromTimestamp(timestamp);
} else {
    return StartingPosition.restartFromSequenceNumber(sequenceNumber);
}

After:

if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
    Date timestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(configProps);
    return StartingPosition.fromTimestamp(timestamp);
} else {
    return StartingPosition.restartFromSequenceNumber(sequenceNumber);
}
dbadami commented 3 years ago

Thanks for the quick response and remediation steps. I will try them out. Is there a timeline for the launch of 2.0.0?

dannycranmer commented 3 years ago

I cannot commit to any dates. We are currently testing the release internally and hope to finalise it within 2 weeks. I hope this works for you. Thanks.

dannycranmer commented 3 years ago

This is fixed in v1.1.0 and v2.0.0