awslabs / amazon-kinesis-connector-flink

This is a fork of the Apache Flink Kinesis connector adding Enhanced Fanout support for Flink 1.8/1.11 on KDA.
Apache License 2.0
20 stars 11 forks source link

AmazonKinesisException when authenticating Kinesis through AWS SSO (profile) #50

Open mberchon opened 2 years ago

mberchon commented 2 years ago

Hello,

We are experiencing issues while using flink with AWS SSO authent to consume a kinesis stream (we don't have problem when using directly KCL, KPL or KDS SDK native library through SSO and we don't have problem too if flink is configured with basic authentication)

Some details about our context:

Authenticating using basic credentials (setting AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) works properly. Authenticating with SSO raize the below exception.

Caused by: software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: The security token included in the request is invalid (Service: AmazonKinesis; Status Code: 400; Error Code: UnrecognizedClientException; Request ID: e2873f40-b9d8-7649-b885-c3d801979544; Proxy: null)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
    at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
    at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
    at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
    at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
    at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.listShards(KinesisProxy.java:430)
    at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:380)
    at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.getShardList(KinesisProxy.java:273)
    at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:855)
    at software.amazon.kinesis.connectors.flink.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:292)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

Here is a git repo for reproduction

Following code is used when setting up SSO:

@Test
    public void test() throws Exception {

        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(1);

        Properties kinesisConsumerConfig = new Properties();
        kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
        kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, AWSConfigConstants.CredentialProvider.AUTO.toString());

        DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
                "flink-test",
                new SimpleStringSchema(),
                kinesisConsumerConfig));

        kinesis.print();

        see.execute();
    }

Attached following debug logs (credentials purged):