linkedin / cruise-control

Cruise-control is the first of its kind to fully automate the dynamic workload rebalance and self-healing of a Kafka cluster. It provides great value to Kafka users by simplifying the operation of Kafka clusters.
https://github.com/linkedin/cruise-control/tags
BSD 2-Clause "Simplified" License
2.77k stars 594 forks source link

Failed to describe Kafka cluster configs when using KafkaAdminTopicConfigProvider #2153

Open marcelloromani opened 6 months ago

marcelloromani commented 6 months ago

I am deploying CC 2.5.137 on EKS, and trying to connect it to MSK

Following the "run without zookeeper" instructions, I am using the kafka admin topic config provider (instead of the non-admin one used in the zookeper setup).

Cruise Control fails during startup with this exception:

13:49:28.985 [main] ERROR com.linkedin.kafka.cruisecontrol.KafkaCruiseControlMain -- Uncaught exception on thread Thread[main,5,main]
java.lang.RuntimeException: Failed to describe Kafka cluster configs.
        at com.linkedin.kafka.cruisecontrol.config.KafkaAdminTopicConfigProvider.configure(KafkaAdminTopicConfigProvider.java:174)
        at com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfigUtils.getConfiguredInstance(KafkaCruiseControlConfigUtils.java:49)
        at com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig.getConfiguredInstance(KafkaCruiseControlConfig.java:98)
        at com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor.<init>(LoadMonitor.java:156)
        at com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor.<init>(LoadMonitor.java:125)
        at com.linkedin.kafka.cruisecontrol.KafkaCruiseControl.<init>(KafkaCruiseControl.java:126)
        at com.linkedin.kafka.cruisecontrol.async.AsyncKafkaCruiseControl.<init>(AsyncKafkaCruiseControl.java:34)
        at com.linkedin.kafka.cruisecontrol.KafkaCruiseControlApp.<init>(KafkaCruiseControlApp.java:36)
        at com.linkedin.kafka.cruisecontrol.KafkaCruiseControlServletApp.<init>(KafkaCruiseControlServletApp.java:32)
        at com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.getCruiseControlApp(KafkaCruiseControlUtils.java:926)
        at com.linkedin.kafka.cruisecontrol.KafkaCruiseControlMain.main(KafkaCruiseControlMain.java:37)

I have been trying to debug this issue for quite some time, especially checking the IAM policies, to no avail.

sappusaketh commented 6 months ago

can you share your properties file I m using against MSK with IAM auth its working fine I m also using without zookeeper config

marcelloromani commented 5 months ago
bootstrap.servers=b-1.xxx..kafka.region.amazonaws.com:9098,b-2.xxx.kafka.region.amazonaws.com:9098
metric.sampler.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.PrometheusMetricSampler
prometheus.server.endpoint=cruise-control-prometheus.cruise-control.svc.cluster.local:80
sampling.allow.cpu.capacity.estimation=true
sample.store.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore
partition.metric.sample.store.topic=KafkaCruiseControlPartitionMetricSamples
broker.metric.sample.store.topic=KafkaCruiseControlModelTrainingSamples
sample.store.topic.replication.factor=2
num.sample.loading.threads=8
metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor
metric.sampling.interval.ms=120000
partition.metrics.window.ms=300000
num.partition.metrics.windows=5
min.samples.per.partition.metrics.window=1
broker.metrics.window.ms=300000
num.broker.metrics.windows=20
min.samples.per.broker.metrics.window=1
capacity.config.file=config/capacity.json
default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.BrokerSetAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal
intra.broker.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskUsageDistributionGoal
hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
min.valid.partition.ratio=0.95
cpu.balance.threshold=1.1
disk.balance.threshold=1.1
network.inbound.balance.threshold=1.1
network.outbound.balance.threshold=1.1
replica.count.balance.threshold=1.1
cpu.capacity.threshold=0.7
disk.capacity.threshold=0.8
network.inbound.capacity.threshold=0.8
network.outbound.capacity.threshold=0.8
cpu.low.utilization.threshold=0.0
disk.low.utilization.threshold=0.0
network.inbound.low.utilization.threshold=0.0
network.outbound.low.utilization.threshold=0.0
metric.anomaly.percentile.upper.threshold=90.0
metric.anomaly.percentile.lower.threshold=10.0
proposal.expiration.ms=60000
max.replicas.per.broker=10000
num.proposal.precompute.threads=1
topics.excluded.from.partition.movement=__consumer_offsets.*|__amazon_msk_canary.*|__amazon_msk_connect.*"
num.concurrent.partition.movements.per.broker=10
max.num.cluster.partition.movements=1250
num.concurrent.intra.broker.partition.movements=2
num.concurrent.leader.movements=1000
execution.progress.check.interval.ms=10000
anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier
metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder
anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_50TH,BROKER_PRODUCE_LOCAL_TIME_MS_999TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH,BROKER_LOG_FLUSH_TIME_MS_50TH,BROKER_LOG_FLUSH_TIME_MS_999TH
self.healing.exclude.recently.demoted.brokers=true
self.healing.exclude.recently.removed.brokers=true
failed.brokers.zk.path=/CruiseControlBrokerList
topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaAdminTopicConfigProvider
cluster.configs.file=config/clusterConfigs.json
completed.kafka.monitor.user.task.retention.time.ms=86400000
completed.cruise.control.monitor.user.task.retention.time.ms=86400000
completed.kafka.admin.user.task.retention.time.ms=604800000
completed.cruise.control.admin.user.task.retention.time.ms=604800000
completed.user.task.retention.time.ms=86400000
demotion.history.retention.time.ms=1209600000
removal.history.retention.time.ms=1209600000
max.cached.completed.kafka.monitor.user.tasks=20
max.cached.completed.cruise.control.monitor.user.tasks=20
max.cached.completed.kafka.admin.user.tasks=30
max.cached.completed.cruise.control.admin.user.tasks=30
max.cached.completed.user.tasks=25
max.active.user.tasks=20
self.healing.enabled=false
webserver.http.port=9091
webserver.http.address=0.0.0.0
webserver.http.cors.enabled=true
webserver.http.cors.origin=*
webserver.http.cors.allowmethods=OPTIONS,GET,POST
webserver.http.cors.exposeheaders=User-Task-ID,Content-Type
webserver.api.urlprefix=/kafkacruisecontrol/*
webserver.ui.diskpath=./cruise-control-ui/dist/
webserver.ui.urlprefix=/kafkacruisecontrol-ui/*
webserver.request.maxBlockTimeMs=10000
webserver.session.maxExpiryTimeMs=60000
webserver.session.path=/
webserver.accesslog.enabled=false
two.step.verification.enabled=true
two.step.purgatory.retention.time.ms=1209600000
two.step.purgatory.max.requests=25
vertx.enabled=false
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
client.security.protocol=SASL_SSL
client.sasl.mechanism=AWS_MSK_IAM
client.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
kafka.broker.failure.detection.enable=true
sappusaketh commented 5 months ago

The configuration looks fine to me. Have you verified if your Cruise Control IAM role can connect to your MSK? I tried running your configuration against my MSK, and it worked.

marcelloromani commented 5 months ago

These are the MSK-related permissions attached to the IAM role used by the pod where Cruise Control is running:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "kafka:RebootBroker",
                "kafka:ListScramSecrets",
                "kafka:ListNodes",
                "kafka:ListKafkaVersions",
                "kafka:ListConfigurations",
                "kafka:ListConfigurationRevisions",
                "kafka:GetBootstrapBrokers",
                "kafka:DescribeConfiguration",
                "kafka:DescribeCluster",
                "kafka-cluster:WriteDataIdempotently",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:kafka:*:<aws_account>:cluster/<msk_cluster_name>/<msk_cluster_id>",
            "Sid": "AllowMskAccessCluster"
        },
        {
            "Action": [
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTransactionalId",
                "kafka-cluster:DescribeTopicDynamicConfiguration",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:DeleteTopic",
                "kafka-cluster:CreateTopic",
                "kafka-cluster:AlterTransactionalId",
                "kafka-cluster:AlterTopicDynamicConfiguration",
                "kafka-cluster:AlterTopic"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:kafka:*:<aws_account>:topic/<msk_cluster_name>/<msk_cluster_id>/*",
                "arn:aws:kafka:*:<aws_account>:topic/<msk_cluster_name>/*"
            ],
            "Sid": "AllowMskAccessTopic"
        },
        {
            "Action": [
                "kafka-cluster:DescribeGroup",
                "kafka-cluster:DeleteGroup",
                "kafka-cluster:AlterGroup"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:kafka:*:<aws_account>:group/<msk_cluster_name>/<msk_cluster_id>/*",
            "Sid": "AllowMskAccessGroup"
        }
    ]
}{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "kafka:RebootBroker",
                "kafka:ListScramSecrets",
                "kafka:ListNodes",
                "kafka:ListKafkaVersions",
                "kafka:ListConfigurations",
                "kafka:ListConfigurationRevisions",
                "kafka:GetBootstrapBrokers",
                "kafka:DescribeConfiguration",
                "kafka:DescribeCluster",
                "kafka-cluster:WriteDataIdempotently",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:kafka:*:<aws_account>:cluster/<msk_cluster_name>/<msk_cluster_id>",
            "Sid": "AllowMskAccessCluster"
        },
        {
            "Action": [
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTransactionalId",
                "kafka-cluster:DescribeTopicDynamicConfiguration",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:DeleteTopic",
                "kafka-cluster:CreateTopic",
                "kafka-cluster:AlterTransactionalId",
                "kafka-cluster:AlterTopicDynamicConfiguration",
                "kafka-cluster:AlterTopic"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:kafka:*:<aws_account>:topic/<msk_cluster_name>/<msk_cluster_id>/*",
                "arn:aws:kafka:*:<aws_account>:topic/<msk_cluster_name>/*"
            ],
            "Sid": "AllowMskAccessTopic"
        },
        {
            "Action": [
                "kafka-cluster:DescribeGroup",
                "kafka-cluster:DeleteGroup",
                "kafka-cluster:AlterGroup"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:kafka:*:<aws_account>:group/<msk_cluster_name>/<msk_cluster_id>/*",
            "Sid": "AllowMskAccessGroup"
        }
    ]
}
marcelloromani commented 5 months ago

Also in the Cruise Control pod logs I see:

18:52:59.443 [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin -- Successfully logged in.
18:52:59.968 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka version: 3.6.1

The MSK cluster is running Kafka 2.8.1 Maybe this Kafka version mismatch is causing issues?

marcelloromani commented 5 months ago

Related issues/docs: https://github.com/linkedin/cruise-control/issues/1415 https://github.com/linkedin/cruise-control/wiki/Run-without-ZooKeeper

sappusaketh commented 5 months ago

I m on kafka 3.6.1 unsure if we can configure kafka version for CC anywhere

marcelloromani commented 5 months ago

Thanks for confirming.