ozangunalp / kafka-native

Kafka broker compiled to native using Quarkus and GraalVM.
Apache License 2.0
103 stars 13 forks source link

Controller-only mode support #88

Closed robobario closed 1 year ago

robobario commented 1 year ago

Hi,

We are interested in using the native image to test a cluster with some controller-only nodes, so process.roles=controller. We have:

  1. node-0=broker,controller
  2. node-1:controller
  3. node-2:controller

Here's our server.properties for one of the controller-only containers:

node.id=1
offsets.topic.num.partitions=1
metrics.jmx.exclude=.*
listeners=CONTROLLER\://0.0.0.0\:9091
transaction.state.log.replication.factor=1
group.initial.rebalance.delay.ms=0
offsets.topic.replication.factor=1
transaction.state.log.min.isr=1
controller.quorum.voters=0@//broker-0\:9091,1@//broker-1\:9091,2@//broker-2\:9091
controller.listener.names=CONTROLLER
process.roles=controller
broker.id=1
early.start.listeners=CONTROLLER
listener.security.protocol.map=CONTROLLER\:PLAINTEXT

When we start it up we see a failure like:

docker run -it --rm -p 19092:9091 \
   -v $(pwd):/conf \
   -e SERVER_PROPERTIES_FILE=/conf/server.properties \
   quay.io/ogunalp/kafka-native:latest
2023-08-24 02:21:49,407 ERROR [io.qua.run.Application] (main) Failed to start application (with profile [prod]): java.lang.IllegalArgumentException: No security protocol defined for listener PLAINTEXT
    at kafka.cluster.EndPoint$.$anonfun$createEndPoint$2(EndPoint.scala:49)
    at scala.collection.immutable.Map$Map1.getOrElse(Map.scala:248)
    at kafka.cluster.EndPoint$.securityProtocol$1(EndPoint.scala:49)
    at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:54)
    at kafka.utils.CoreUtils$.$anonfun$listenerListToEndPoints$6(CoreUtils.scala:270)
    at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
    at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
    at scala.collection.mutable.ArraySeq.map(ArraySeq.scala:37)
    at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:270)
    at kafka.server.KafkaConfig.effectiveAdvertisedListeners(KafkaConfig.scala:2034)
    at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2148)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2107)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1534)
    at kafka.server.KafkaConfig.fromProps(KafkaConfig.scala:1457)
    at com.ozangunalp.kafka.server.EmbeddedKafkaBroker.start(EmbeddedKafkaBroker.java:182)
    at com.ozangunalp.kafka.server.Startup.startup(Startup.java:36)
    at com.ozangunalp.kafka.server.Startup_Observer_startup_ba142133319fbb85a5c3c9a901d8261b72ab2d8b.notify(Unknown Source)

It looks like advertised.listeners is being defaulted here if we don't specify it, using PLAINTEXT as listener name by default. So it fails when it tries to lookup the security protocol.

I tried setting advertised listeners to match listeners via the env var, and then get a failure:

docker run -it --rm -p 19092:9091 \
    -v $(pwd):/conf  \
    -e KAFKA_ADVERTISED_LISTENERS="CONTROLLER://0.0.0.0:9091" \
    -e SERVER_PROPERTIES_FILE=/conf/server.properties \
    quay.io/ogunalp/kafka-native:latest
2023-08-24 02:23:05,459 ERROR [io.qua.run.Application] (main) Failed to start application (with profile [prod]): java.lang.IllegalArgumentException: requirement failed: The advertised.listeners config must be empty when process.roles=controller
    at scala.Predef$.require(Predef.scala:337)
    at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2222)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2107)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1534)
    at kafka.server.KafkaConfig.fromProps(KafkaConfig.scala:1457)
    at com.ozangunalp.kafka.server.EmbeddedKafkaBroker.start(EmbeddedKafkaBroker.java:182)
    at com.ozangunalp.kafka.server.Startup.startup(Startup.java:36)
    at com.ozangunalp.kafka.server.Startup_Observer_startup_ba142133319fbb85a5c3c9a901d8261b72ab2d8b.notify(Unknown Source)

And I couldn't see a way to unset it, smallrye complains if you set the env var to an empty string.

docker run -it --rm -p 19092:9091 \
    -v $(pwd):/conf  \
    -e KAFKA_ADVERTISED_LISTENERS="" \
    -e SERVER_PROPERTIES_FILE=/conf/server.properties \
    quay.io/ogunalp/kafka-native:latest
2023-08-24 02:10:12,680 ERROR [io.qua.run.Application] (main) Failed to start application (with profile [prod]): java.util.NoSuchElementException: SRCFG00040: The config property kafka.advertised.listeners is defined as the empty String ("") which the following Converter considered to be null: io.smallrye.config.Converters$BuiltInConverter
    at io.smallrye.config.SmallRyeConfig.convertValue(SmallRyeConfig.java:300)
    at io.smallrye.config.SmallRyeConfig.getValue(SmallRyeConfig.java:242)
    at io.smallrye.config.SmallRyeConfig.getValue(SmallRyeConfig.java:168)
    at com.ozangunalp.kafka.server.BrokerConfig.providedConfig(BrokerConfig.java:190)

So it appears advertised.listeners is incompatible with controller only mode, would it make sense to add a check here to make it not set up the default advertised.listeners if the only role is controller?

robobario commented 1 year ago

FYI I'm Happy to contribute this.

I wonder if there should also be a flag to disable the convenient overriding of properties and exclusively use the properties file.

ozangunalp commented 1 year ago

That'd be cool! I'd add an option to ServerConfig and use it in https://github.com/ozangunalp/kafka-native/blob/4bfe084759cf26cb6553b445b06ecc56c2eceabc/kafka-server/src/main/java/com/ozangunalp/kafka/server/EmbeddedKafkaBroker.java#L175-L177 to disable all three config options to use the server.properties only.