Closed Ashish123gs closed 7 years ago
Yes, we can add something to allow you to revert back to old method. However if you are running multiple instances, each KM is not going to get all the messages from offset topic, that is why this was changed so multiple instances would have unique consumer group name.
I understand .
We have multiple instances of Kafka Manager serving unique clusters . If KM has a unique consumer group name then its operationally easy to provide ACLs while provisioning new clusters . also , it will help if we move KM from one host to another ( we don't have to worry about ACLs for new KM host )
We could allow you to provide a consumer name in config and if none is provided, it defaults to the one we have now.
That will help , thanks much .
I just noticed you can override the group.id by setting it in the consumer.properties file. So the functionality is already there.
Awesome , thanks a lot
In the new release of 1.3.3.7 , i see we are using the group.id as KMOffsetCache-$hostname This becomes challenging if the cluster is secured and if we have to provide the ACLs for the group used by Kafka manager If there are multiple kafka manager instances running on different hosts then we have to give acls based on the hostname.
Also , this is a breaking change and users who have already given acls for group 'KafkaManagerOffsetCache' will have issues ..
Can we revert this change and use the previous group id name ?
KafkaStateActor.scala
//..//
private[this] def createKafkaConsumer(): Consumer[Array[Byte], Array[Byte]] = { val hostname = InetAddress.getLocalHost.getHostName val props: Properties = new Properties() props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, s"KMOffsetCache-$hostname") props.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapBrokerList.list.map(bi => s"${bi.host}:${bi.port}").mkString(",")) props.put(org.apache.kafka.clients.consumer.ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") props.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") props.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") consumerProperties.foreach {
//..//