aws / aws-msk-iam-auth

Enables developers to use AWS Identity and Access Management (IAM) to connect to their Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters.
Apache License 2.0
138 stars 65 forks source link

Connector is not working as expected using AWS IAM AUTH #142

Closed ns-se-ops closed 7 months ago

ns-se-ops commented 7 months ago

I have two jars in my plugin zip file :

https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/2.1.0 https://github.com/aws/aws-msk-iam-auth/releases

I created connector using below configuration and selected source AWS MSK with IAM Authentication :

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
snowflake.topic2table.map=snowflake-topic:kafka_table
tasks.max=1
topics=snowflake-topic
buffer.flush.time=60
snowflake.url.name=<removed>.snowflakecomputing.com:443
snowflake.database.name=kafka_db
snowflake.schema.name=kafka_schema
buffer.count.records=1
snowflake.user.name=kafka_connector_user
snowflake.private.key=<removed>
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
buffer.size.bytes=5000000

The Connector was running in the console but in backend I got below errors :

1700071730000,"[Worker-0f5c0100d321988e8] [2023-11-15 18:08:50,626] INFO [snowflake-connector|task-0] [SF_KAFKA_CONNECTOR] task started, execution time: 0 milliseconds (com.snowflake.kafka.connector.SnowflakeSinkTask:46)"
1700071730000,"[Worker-0f5c0100d321988e8] [2023-11-15 18:08:50,626] INFO [snowflake-connector|task-0] WorkerSinkTask{id=snowflake-connector-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:309)"
1700071730000,"[Worker-0f5c0100d321988e8] [2023-11-15 18:08:50,629] INFO [snowflake-connector|task-0] [Consumer clientId=connector-consumer-snowflake-connector-0, groupId=connect-snowflake-connector] Failed to create channel due to  (org.apache.kafka.common.network.SaslChannelBuilder:227)"
1700071730000,[Worker-0f5c0100d321988e8] org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700071730000,[Worker-0f5c0100d321988e8] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM
1700071730000,"[Worker-0f5c0100d321988e8] [2023-11-15 18:08:50,630] WARN [snowflake-connector|task-0] [Consumer clientId=connector-consumer-snowflake-connector-0, groupId=connect-snowflake-connector] Error connecting to node b-1.msk.xxxxx.xx.kafka.us-east-1.amazonaws.com:9098 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1011)"
1700071730000,[Worker-0f5c0100d321988e8] java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]

1700071730000,[Worker-0f5c0100d321988e8] Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700071730000,[Worker-0f5c0100d321988e8]    at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:228)
1700071730000,[Worker-0f5c0100d321988e8]    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
1700071730000,[Worker-0f5c0100d321988e8]    ... 24 more
1700071730000,[Worker-0f5c0100d321988e8] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700071730000,[Worker-0f5c0100d321988e8] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM

At first, I thought this could be an issue with the SnowflakeSinkConnector so I decided to create another connector with S3Sink : https://www.confluent.io/hub/confluentinc/kafka-connect-s3

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
format.class=io.confluent.connect.s3.format.json.JsonFormat
flush.size=1
schema.compatibility=NONE
topics=s3sinktopic
tasks.max=1
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=<removed>

What I see, the same issue the connector is running in the console, but in backend I got these errors :

1700108453000,"[Worker-0c13cfeafdaec1993] [2023-11-16 04:20:53,001] INFO [s3sinkiamauth|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)"
1700108453000,"[Worker-0c13cfeafdaec1993] [2023-11-16 04:20:53,001] INFO [s3sinkiamauth|task-0] WorkerSinkTask{id=s3sinkiamauth-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:309)"
1700108453000,"[Worker-0c13cfeafdaec1993] [2023-11-16 04:20:53,005] INFO [s3sinkiamauth|task-0] [Consumer clientId=connector-consumer-s3sinkiamauth-0, groupId=connect-s3sinkiamauth] Failed to create channel due to  (org.apache.kafka.common.network.SaslChannelBuilder:227)"
1700108453000,[Worker-0c13cfeafdaec1993] org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700108453000,[Worker-0c13cfeafdaec1993] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM
1700108453000,"[Worker-0c13cfeafdaec1993] [2023-11-16 04:20:53,006] WARN [s3sinkiamauth|task-0] [Consumer clientId=connector-consumer-s3sinkiamauth-0, groupId=connect-s3sinkiamauth] Error connecting to node b-1.msk.xxxxx.xx.kafka.us-east-1.amazonaws.com:9098 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1011)"
1700108453000,[Worker-0c13cfeafdaec1993] java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1006)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:313)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:457)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
1700108453000,[Worker-0c13cfeafdaec1993]    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
1700108453000,[Worker-0c13cfeafdaec1993]    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
1700108453000,[Worker-0c13cfeafdaec1993]    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
1700108453000,[Worker-0c13cfeafdaec1993]    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
1700108453000,[Worker-0c13cfeafdaec1993]    at java.base/java.lang.Thread.run(Thread.java:829)
1700108453000,[Worker-0c13cfeafdaec1993] Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:228)
1700108453000,[Worker-0c13cfeafdaec1993]    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
1700108453000,[Worker-0c13cfeafdaec1993]    ... 24 more
1700108453000,[Worker-0c13cfeafdaec1993] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
1700108453000,[Worker-0c13cfeafdaec1993] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM

I am able to produce and consume using kafka-console-producer and kafka-console-consumer with below client.properties :

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

If you are planning to tell me add the same details in the connector configuration, I would tell you not to do so. Because when we select IAM Auth for connection, these are added automatically.

Could you please let me know, what is wrong in it ?

Thanks,

github-actions[bot] commented 7 months ago

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.

ns-se-ops commented 7 months ago

Resolution Steps for Creating AWS MSK Connect :

github-actions[bot] commented 7 months ago

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.