linkedin / cruise-control

Cruise-control is the first of its kind to fully automate the dynamic workload rebalance and self-healing of a Kafka cluster. It provides great value to Kafka users by simplifying the operation of Kafka clusters.
https://github.com/linkedin/cruise-control/tags
BSD 2-Clause "Simplified" License
2.74k stars 587 forks source link

Dynamically reload metricsreporter keystore (and truststore) #1148

Open Daan12345678 opened 4 years ago

Daan12345678 commented 4 years ago

We use short lived certificates to authenticate to Kafka. Is it possible to tell the metricsreporter to refresh it's certificates (keystore/truststore) without stopping the Kafka process? We have tried the following:

/opt/kafka/bin/kafka-configs.sh --bootstrap-server ${bootstrap-server} --command-config /opt/kafka/config/kafka-client.properties --entity-type brokers --entity-name $(grep broker.id /data/kafka/meta.properties | awk -F "=" '{print$2}') --alter --add-config listener.name.ssl.truststore.location=/opt/kafka/config/kafka-keystore.jks,listener.name.ssl.keystore.location=/opt/kafka/config/kafka-keystore.jks,cruise.control.metrics.reporter.ssl.keystore.location=/opt/kafka/config/kafka-keystore.jks,cruise.control.metrics.reporter.ssl.truststore.location=/opt/kafka/config/kafka-keystore.jks

Logs:

[2020-03-17 10:10:35,126] ERROR [Producer clientId=CruiseControlMetricsReporter] Connection to node 1026 (${broker}/${broker_ip}:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)

This does work for the Kafka broker, but not for the metricsreporter. Any idea on how to do this?

Lincong commented 3 years ago

Hi @Daan12345678 , I just created a PR addressing this issue https://github.com/linkedin/cruise-control/pull/1386

The basic idea is that when producer.send(...) throws and SSL authentication exception, the metrics report creates a new producer with the same producer config and this new producer should read new certificates when it's created. I also added two configs to control whether the above logic gets triggered and how many in total it can be triggered.

Let me know if you think this implementation makes sense. Thanks!

Daan12345678 commented 3 years ago

Hi @Lincong ,

This looks like it could work for us! Looking forward to see this in the next release. Will be testing with it as soon as it is released.

Thank you for this contribution!

Ownercz commented 1 week ago

Hi, is there any progress regarding SSL cert reload? We have encountered this issue as well and in the size of 100+ Kafka clusters running SSL server/client auth, it seems that now the only way how to get it working is rolling restart of the cluster itself. We do have automatic reload of certificates on the brokers but the CCMR component will not fetch the new certificate.

Thank you for any info!

countableSet commented 1 day ago

hi 👋🏻 running into this issue as well with dynamic ssl configuration updates. I did a little digging and seems like cruise control could support this through the Reconfigurable interface, which is extended by MetricsReporter and implemented by CruiseControlMetricsReporter 🎉

I also did some testing to see what's up by implementing basic methods for reconfigurableConfigs and reconfigure

diff --git a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java
index 58ef2932..ba77ea93 100644
--- a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java
+++ b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java
@@ -204,6 +204,18 @@ public class CruiseControlMetricsReporter implements MetricsReporter, Runnable {
     }
   }

+  @Override
+  public Set<String> reconfigurableConfigs() {
+    return CruiseControlMetricsReporterConfig.configNames();
+  }
+
+  @Override
+  public void reconfigure(Map<String, ?> configs) {
+    for (Map.Entry<String, ?> entry : configs.entrySet()) {
+      LOG.info("Reconfiguring {} to {}", entry.getKey(), entry.getValue());
+    }
+  }
+
   /**
    * Starting with Kafka 3.3.0 a new class, "org.apache.kafka.server.metrics.KafkaYammerMetrics", provides the default Metrics Registry.
    *
diff --git a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterConfig.java b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterConfig.java
index e3d11946..b926b753 100644
--- a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterConfig.java
+++ b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterConfig.java
@@ -4,10 +4,7 @@

 package com.linkedin.kafka.cruisecontrol.metricsreporter;

-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -158,6 +155,12 @@ public class CruiseControlMetricsReporterConfig extends AbstractConfig {
                 CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_DOC);
   }

+  public static Set<String> configNames() {
+    Set<String> l = new HashSet<>(CONFIG.names());
+    l.addAll(CONFIGS);
+    return l;
+  }
+
   /**
    * @param baseConfigName Base config name.
    * @return Cruise Control metrics reporter config name.

compile and throw the jar into a kafka docker container and try running the alter configs command as an example

❯ kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config cruise.control.metrics.reporter.ssl.truststore.location=/opt/kafka/config/kafka-keystore.jks

and saw the logs coming through 🎉

kafka-1      | [2024-10-10 00:11:58,283] INFO Processing override for entityPath: brokers/1 with config: HashMap(cruise.control.metrics.reporter.ssl.truststore.location -> /opt/kafka/config/kafka-keystore.jks) (kafka.server.DynamicConfigManager)
[...]
kafka-1      | [2024-10-10 00:11:58,287] INFO Reconfiguring kafka.server.DynamicMetricsReporters@7d914ee3, updated configs: Set(cruise.control.metrics.reporter.ssl.truststore.location) custom configs: {conf.file=/etc/kafka/server.properties, cruise.control.metrics.reporter.ssl.truststore.location=/opt/kafka/config/kafka-keystore.jks} (kafka.server.DynamicBrokerConfig)
kafka-1      | [2024-10-10 00:11:58,289] INFO Reconfiguring log.cleaner.min.compaction.lag.ms to 0 (com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter)
kafka-1      | [2024-10-10 00:11:58,289] INFO Reconfiguring offsets.topic.num.partitions to 50 (com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter)
[...]

does this seem like a more acceptable solution compared to the original pr?