spring-cloud / spring-cloud-stream-binder-aws-kinesis

Spring Cloud Stream binder for AWS Kinesis
Apache License 2.0
99 stars 97 forks source link

Current KinesisProducerConfiguration does not work as expected #203

Closed kabennett closed 1 year ago

kabennett commented 1 year ago

In what version(s) of Spring Cloud Stream Binder for AWS Kinesis are you seeing this issue?

4.0.0

Describe the bug

Rather than repeat information already documented, please see the original problem and solution posted here.

artembilan commented 1 year ago

Well, I didn't dive into details, but sounds like you found a solution which may be incorporated back into this project to avoid the pain similar to yours in the future.

Thank you!

kabennett commented 1 year ago

You are welcome, Artem!

artembilan commented 1 year ago

I probably was not clear with my previous message, so let me fix that.

Are you willing to open a PR with the fix you propose in your SO thread? Or do you just leave it me to deal with your code as it is possible or appropriate?

kabennett commented 1 year ago

Artem, I can work on a PR for you.

artembilan commented 1 year ago

Hello, @kabennett !

Any chances to go an update from you? If you cannot work on PR, just post your suggestion here and we will incorporate it back to the project.

Thanks

artembilan commented 1 year ago

I have just pushed some fix on the matter. @kothapet, would be great if you can give a try 4.0.1-SNAPSHOT with your solution.

kothapet commented 1 year ago

@artembilan how do I pull the 4.0.1-SNAPSHOT into my project?

artembilan commented 1 year ago

Using https://repo.spring.io/snapshot as a Maven repository in your project. The mentioned snapshot is, essentially, here: https://repo.spring.io/ui/native/snapshot/org/springframework/cloud/spring-cloud-stream-binder-kinesis/4.0.1-SNAPSHOT/

kothapet commented 1 year ago

@artembilan, Yes its working fine.. Here is the output, and its using KPL from the logs. Thanks


2023-09-08T17:50:33.741-04:00  INFO 10396 --- [           main] c.g.k.scs.ScsKinesisProj1Application     : Started ScsKinesisProj1Application in 6.736 seconds (process running for 7.397)
2023-09-08T17:50:33.747-04:00 DEBUG 10396 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state LivenessState changed to CORRECT
2023-09-08T17:50:33.750-04:00 DEBUG 10396 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC
2023-09-08T17:50:33.809-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.800091] [0x000028d9][0x00007f246c5c5840] [info] [logging.cc:89] Set boost log level to info
2023-09-08T17:50:33.809-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.800153] [0x000028d9][0x00007f246c5c5840] [info] [logging.cc:179] Set AWS Log Level to INFO
2023-09-08T17:50:33.814-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.814053] [0x000028d9][0x00007f246c5c5840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T17:50:33.815-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815286] [0x000028d9][0x00007f246c5c5840] [info] [main.cc:257] Region has been successfully set to us-east-1 from user's input configuration
2023-09-08T17:50:33.815-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815515] [0x000028d9][0x00007f246c5c5840] [info] [main.cc:348] Setting CA path to /tmp/amazon-kinesis-producer-native-binaries/cacerts
2023-09-08T17:50:33.815-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815544] [0x000028d9][0x00007f246c5c5840] [info] [main.cc:383] Starting up main producer
2023-09-08T17:50:33.815-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815575] [0x000028d9][0x00007f246c5c5840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T17:50:33.816-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815595] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T17:50:33.816-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815609] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T17:50:33.816-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815622] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:62] Using default Kinesis endpoint
2023-09-08T17:50:33.816-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.816450] [0x000028d9][0x00007f246c5c5840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T17:50:33.817-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.816897] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T17:50:33.817-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.817314] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T17:50:33.817-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.817639] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:62] Using default CloudWatch endpoint
2023-09-08T17:50:33.818-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.818307] [0x000028d9][0x00007f246c5c5840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T17:50:33.818-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.818779] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T17:50:33.819-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.819092] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T17:50:33.819-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.819398] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:62] Using default STS endpoint
2023-09-08T17:50:33.820-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.820111] [0x000028d9][0x00007f246c5c5840] [info] [main.cc:394] Entering join
2023-09-08T17:50:33.820-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.820239] [0x000028d9][0x00007f24699fa6c0] [info] [kinesis_producer.cc:226] Created pipeline for stream "my-test-stream"
2023-09-08T17:50:33.925-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.924900] [0x000028d9][0x00007f24699fa6c0] [info] [pipeline.h:221] StreamARN "arn:aws:kinesis:us-east-1:xxxxxx:stream/my-test-stream" has been successfully configured, and will be used in requests including ListShards and PutRecords
2023-09-08T17:50:33.925-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.924979] [0x000028d9][0x00007f24699fa6c0] [info] [shard_map.cc:89] Updating shard map for stream "my-test-stream"
2023-09-08T17:50:34.041-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:34.040942] [0x000028d9][0x00007f243ffff6c0] [info] [shard_map.cc:151] Successfully updated shard map for stream "my-test-stream" (arn: "arn:aws:kinesis:us-east-1:xxxxxxxxx:stream/my-test-stream"). Found 4 shards
mySupplier creating payload 502
mySupplier creating payload 214
mySupplier creating payload 547
mySupplier creating payload 109
mySupplier creating payload 469
mySupplier creating payload 441
mySupplier creating payload 155
kothapet commented 1 year ago

@artembilan it works with AWS kinesis. However I am getting the same error when using localstack.. Am I missing something. Do you have a full example for https://github.com/localstack/localstack/issues/507 of working localstack version

2023-09-08T20:24:01.489-04:00  INFO 31372 --- [           main] c.g.k.scs.ScsKinesisProj1Application     : Started ScsKinesisProj1Application in 5.221 seconds (process running for 5.732)
2023-09-08T20:24:01.493-04:00 DEBUG 31372 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state LivenessState changed to CORRECT
2023-09-08T20:24:01.496-04:00 DEBUG 31372 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC
2023-09-08T20:24:01.548-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.537337] [0x00007ac8][0x00007f39f8bc0840] [info] [logging.cc:89] Set boost log level to info
2023-09-08T20:24:01.548-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.537408] [0x00007ac8][0x00007f39f8bc0840] [info] [logging.cc:179] Set AWS Log Level to INFO
2023-09-08T20:24:01.549-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.549447] [0x00007ac8][0x00007f39f8bc0840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T20:24:01.552-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550532] [0x00007ac8][0x00007f39f8bc0840] [info] [main.cc:257] Region has been successfully set to us-east-1 from user's input configuration
2023-09-08T20:24:01.552-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550711] [0x00007ac8][0x00007f39f8bc0840] [info] [main.cc:348] Setting CA path to /tmp/amazon-kinesis-producer-native-binaries/cacerts
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550727] [0x00007ac8][0x00007f39f8bc0840] [info] [main.cc:383] Starting up main producer
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550748] [0x00007ac8][0x00007f39f8bc0840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550765] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550776] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550788] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:185] Using Kinesis endpoint localhost:4567
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.551624] [0x00007ac8][0x00007f39f8bc0840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.551645] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.551655] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.551667] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:200] Using CloudWatch endpoint localhost:4567
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552020] [0x00007ac8][0x00007f39f8bc0840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552061] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552076] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552091] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:62] Using default STS endpoint
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552736] [0x00007ac8][0x00007f39f8bc0840] [info] [main.cc:394] Entering join
2023-09-08T20:24:01.557-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.556309] [0x00007ac8][0x00007f39f5ffa6c0] [info] [kinesis_producer.cc:226] Created pipeline for stream "my-test-stream"
2023-09-08T20:24:01.661-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.661036] [0x00007ac8][0x00007f39f5ffa6c0] [info] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'InvalidClientTokenId': The security token included in the request is invalid.
2023-09-08T20:24:01.661-04:00  WARN 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.661072] [0x00007ac8][0x00007f39f5ffa6c0] [warning] [AWS Log: ERROR](AWSXmlClient)HTTP response code: 403
Resolved remote host IP address: 67.220.245.43
Request ID: adeca5d6-70ab-438a-8d93-01906ea5eaf4
Exception name: InvalidClientTokenId
Error message: The security token included in the request is invalid.
4 response headers:
content-length : 306
content-type : text/xml
date : Sat, 09 Sep 2023 00:24:01 GMT
x-amzn-requestid : adeca5d6-70ab-438a-8d93-01906ea5eaf4
2023-09-08T20:24:01.661-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.661131] [0x00007ac8][0x00007f39f5ffa6c0] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
2023-09-08T20:24:01.662-04:00 ERROR 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.661181] [0x00007ac8][0x00007f39f5ffa6c0] [error] [pipeline.h:228] Failed to get StreamARN using STS GetCallerIdentity | Code: InvalidClientTokenId | Message: The security token included in the request is invalid. | Request was: Action=GetCallerIdentity&Version=2011-06-15
2023-09-08T20:24:01.662-04:00  WARN 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : terminate called after throwing an instance of 'boost::wrapexcept<boost::exception_detail::error_info_injector<boost::log::v2s_mt_posix::system_error> >'
2023-09-08T20:24:01.662-04:00  WARN 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    :   what():  Failed to set TLS value: Invalid argument
mySupplier creating payload 311
2023-09-08T20:24:02.518-04:00 ERROR 31372 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aws.outbound.KplMessageHandler@6694a721], failedMessage=GenericMessage [payload=byte[3], headers={contentType=application/json, id=a41b8fa8-9bab-6e1a-9343-dc9936275ee0, timestamp=1694219042514}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1185)
kabennett commented 1 year ago

@artembilan I apologize about not submitting a PR to you. I have been sick lately and am just now getting back to feeling somewhat normal again. In fact, today I was working on a PR for you when I saw you had made progress on it. Again, I apologize for not working on this sooner.

kabennett commented 1 year ago

I just submitted a PR for you with the changes I made to get the solution working with STS for our team.

artembilan commented 1 year ago

Sorry, we don't support here AWS KPL project, neither Localstack. It is better to ask such a question in their community. I only can speculate on what I've seen so far in the mentioned https://github.com/localstack/localstack/issues/507:

So now setting .setVerifyCertificate(false) on the KLP is enough.

Therefore I suggest you to have some fake bean in the test configuration to disable that option on our auto-configured KinesisProducerConfiguration:

@Bean 
String kinesisProducerConfigurationAdjuster(KinesisProducerConfiguration kinesisProducerConfiguration) {
    kinesisProducerConfiguration.setVerifyCertificate(false);
    return null;
}

Of course, you need to be sure to use the latest Localstack.

You also can look into our https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis/src/test/java/org/springframework/cloud/stream/binder/kinesis/LocalstackContainerTest.java to see how we expose Localstack ENV into auto-configuration properties.

kothapet commented 1 year ago

Yes I did follow the instructions on the other thread and did the .setVerifyCertificate(false) like below and using the latest localstack, but the issue persists. I raised an issue here at localstack repo https://github.com/localstack/localstack/issues/9110 and https://github.com/awslabs/amazon-kinesis-producer/issues/532

    @Bean
    public KinesisProducerConfiguration kinesisProducerConfiguration()  {
            return new KinesisProducerConfiguration()
                    .setCredentialsProvider(new DefaultAWSCredentialsProviderChain())
                    .setRegion("us-east-1")
                    .setStsEndpoint("localhost")
                    .setStsPort(4567)
                    .setKinesisEndpoint("localhost")
                    .setKinesisPort(4567)
                    .setCloudwatchEndpoint("localhost")
                    .setCloudwatchPort(4567)
                    .setVerifyCertificate(false);
    }   
kothapet commented 1 year ago

Actually I figured it out. Instead of bean returning the KinesisProducer, return the KinesisProducerConfiguration. Hopefully this helps someone in the future.

This works

    @Bean
    public KinesisProducerConfiguration kinesisProducerConfiguration() {
        System.out.println("**** KinesisProducerConfiguration ***");
        return new KinesisProducerConfiguration().setCredentialsProvider(getCredentialsProvider())
                .setKinesisEndpoint("localhost").setKinesisPort(4566)
                .setStsEndpoint("localhost").setStsPort(4566)
                .setCloudwatchEndpoint("localhost").setCloudwatchPort(4566)
                .setRegion(region).setVerifyCertificate(false);
    }