aws / aws-msk-iam-auth

Enables developers to use AWS Identity and Access Management (IAM) to connect to their Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters.
Apache License 2.0
145 stars 69 forks source link

Getting `Timed out waiting for a node assignment. Call: listTopics` with simple setup #47

Closed talanta closed 3 years ago

talanta commented 3 years ago

Hi,

Starting from the https://kafka.apache.org/quickstart binaries, I tried to connect to a simple MSK cluster with IAM enabled

I ran the following command:

CLASSPATH=${PWD}/aws-msk-iam-auth-1.1.1-all.jar bin/kafka-topics.sh --list \
  --bootstrap-server=b-1.xxxx:9098 
  --command-config=cli.properties

I end up with that error:

Error while executing topic command : Timed out waiting for a node assignment. Call: listTopics 
[2021-11-10 07:37:35,675] ERROR org.apache.kafka.common.errors.TimeoutException: Timed out waiting
for a node assignment. Call: listTopics  
kafka.admin.TopicCommand$) 

Here is the content of the cli.properties:

bootstrap.servers=b-2.xxxx:9098,b-3.xxxx:9098,b-1.xxxx:9098
sasl.mechanism=AWS_MSK_IAM
security.protocol=SASL_SSL
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

The EC2 instance has a role containing the following policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:AlterCluster"
            ],
            "Resource": "arn:aws:kafka:*:<#accountid#>:cluster/*/*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": "*",
            "Resource": "*"
        }
    ]
}

I test with SASL/SCRAM, and the command worked. So I am not sure what I do wrong. Also this might probably be related to https://github.com/aws/aws-msk-iam-auth/issues/22. In that case could someone pls provide a basic example with a truststore generation that can make it work ?

sayantacC commented 3 years ago

@talanta Could you set log level to debug and run the command again and provide the debug log ? It will help us find what is actually happening.

talanta commented 3 years ago

@sayantacC I am getting the same output. FYI here is the log4j.properties file

This is what comes out of the box. I simply changed the first line from INFO to DEBUG

log4j.rootLogger=DEBUG, stdout, kafkaAppender

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# Change the line below to adjust ZK client logging
log4j.logger.org.apache.zookeeper=INFO

# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO

# Change to DEBUG or TRACE to enable request logging
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

# Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output
# related to the handling of requests
#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=INFO, stateChangeAppender
log4j.additivity.state.change.logger=false

# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
sayantacC commented 3 years ago

Is the new log4j.properties file being picked up by kafka-topics while running ? I normally specify the path to the log4j.properties in the KAFKA_OPTS environment variable:

export KAFKA_OPTS="-Dlog4j.configuration=file:/<path-to-log4j.properties file>log4j.properties
talanta commented 3 years ago

Thank you @sayantacC for the hint. Here are the logs I got from there

2021-11-10 19:20:49,352] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2021-11-10 19:20:49,742] INFO AdminClientConfig values:
        bootstrap.servers = [b-2.xxxxxxxxxxxxx.us-west-2.amazonaws.com:9098]
        client.dns.lookup = use_all_dns_ips
        client.id =
        connections.max.idle.ms = 300000
        default.api.timeout.ms = 60000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = class software.amazon.msk.auth.iam.IAMClientCallbackHandler
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = AWS_MSK_IAM
        security.protocol = SASL_SSL
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
 (org.apache.kafka.clients.admin.AdminClientConfig)
[2021-11-10 19:20:49,805] DEBUG Number of options to configure credential provider 1 (software.amazon.msk.auth.iam.internals.MSKCredentialProvider)
[2021-11-10 19:20:50,084] DEBUG IAMLoginModule initialized (software.amazon.msk.auth.iam.IAMLoginModule)
[2021-11-10 19:20:50,085] INFO Successfully logged in. (org.apache.kafka.common.security.authenticator.AbstractLogin)
[2021-11-10 19:20:50,269] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-11-10 19:20:50,269] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)
[2021-11-10 19:20:50,269] INFO Kafka startTimeMs: 1636572050266 (org.apache.kafka.common.utils.AppInfoParser)
[2021-11-10 19:20:50,335] DEBUG Setting SASL/AWS_MSK_IAM client state to SEND_CLIENT_FIRST_MESSAGE (software.amazon.msk.auth.iam.internals.IAMSaslClient)
[2021-11-10 19:20:59,186] DEBUG Setting SASL/AWS_MSK_IAM client state to SEND_CLIENT_FIRST_MESSAGE (software.amazon.msk.auth.iam.internals.IAMSaslClient)
[2021-11-10 19:21:20,276] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
[2021-11-10 19:21:21,484] DEBUG Setting SASL/AWS_MSK_IAM client state to SEND_CLIENT_FIRST_MESSAGE (software.amazon.msk.auth.iam.internals.IAMSaslClient)
[2021-11-10 19:21:48,970] DEBUG Setting SASL/AWS_MSK_IAM client state to SEND_CLIENT_FIRST_MESSAGE (software.amazon.msk.auth.iam.internals.IAMSaslClient)
[2021-11-10 19:21:50,278] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
Error while executing topic command : Timed out waiting for a node assignment. Call: listTopics
[2021-11-10 19:21:50,280] ERROR org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listTopics
 (kafka.admin.TopicCommand$)
[2021-11-10 19:21:50,281] INFO App info kafka.admin.client for adminclient-1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2021-11-10 19:21:50,281] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
[2021-11-10 19:21:50,282] INFO [AdminClient clientId=adminclient-1] Timed out 1 remaining operation(s) during close. (org.apache.kafka.clients.admin.KafkaAdminClient)
[2021-11-10 19:21:50,294] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2021-11-10 19:21:50,294] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2021-11-10 19:21:50,294] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
talanta commented 3 years ago

Nevermind.. I figured that there was an ACL restriction on the subnets and the 9098 was not explicitly authorized, meaning that the brokers were not reachable. After adding the rule for that port, it's working well now Thank you @sayantacC for your help

arung777 commented 1 week ago

Nevermind.. I figured that there was an ACL restriction on the subnets and the 9098 was not explicitly authorized, meaning that the brokers were not reachable. After adding the rule for that port, it's working well now Thank you @sayantacC for your help

I am also getting same error while listing topics in containerized tool

how did you resolved that ????