twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.7+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.78k stars 182 forks source link

topic creation configs #688

Closed yakirSalt closed 7 months ago

yakirSalt commented 7 months ago

Hi,

I'm trying to create a topic with specific configs such as cleanup.policy, but get the error INVALID_CONFIG: Configuration is invalid. what am I doing wrong? this is a code example:

client, err := kgo.NewClient(kgo.SeedBrokers("localhost:9092"))
if err != nil {
  panic(err)
}
admClient := kadm.NewClient(client)
m := map[string]*string{
  "cleanup.policy":  pointy("delete"),
  "retention.bytes": pointy("1000000000"),
  "retention.ms":    pointy("1000000"),
}
res, err := admClient.CreateTopic(ctx, -1, -1, m, topicName)

thanks for the help

twmb commented 7 months ago

Are you talking to Kafka, or something else (Redpanda, AWS MSK, ...)? Is there some policy clamp on the broker side of things? Do you have access to broker logs and, if so, do they indicate the problem?

That looks fine so it should work imo.

yakirSalt commented 7 months ago

Hey, I'm talking about Kafka. I try to run it locally with docker, this is the error I see in the broker logs:

2024-03-07 14:00:41 [2024-03-07 12:00:41,767] INFO [Admin Manager on Broker 1]: Error processing create topic request CreatableTopic(name='my-dlq', numPartitions=-1, replicationFactor=-1, assignments=[], configs=[CreateableTopicConfig(name='delete.retention.ms', value='5368709120'), CreateableTopicConfig(name='cleanup.policy', value='5368709120'), CreateableTopicConfig(name='retention.ms', value='5368709120'), CreateableTopicConfig(name='retention.bytes', value='5368709120')]) (kafka.server.ZkAdminManager)
2024-03-07 14:00:41 org.apache.kafka.common.config.ConfigException: Invalid value 5368709120 for configuration cleanup.policy: String must be one of: compact, delete
2024-03-07 14:00:41     at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:964)
2024-03-07 14:00:41     at org.apache.kafka.common.config.ConfigDef$ValidList.ensureValid(ConfigDef.java:940)
2024-03-07 14:00:41     at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:502)
2024-03-07 14:00:41     at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:486)
2024-03-07 14:00:41     at org.apache.kafka.storage.internals.log.LogConfig.validate(LogConfig.java:632)
2024-03-07 14:00:41     at kafka.zk.AdminZkClient.validateTopicCreate(AdminZkClient.scala:165)
2024-03-07 14:00:41     at kafka.server.ZkAdminManager.$anonfun$createTopics$1(ZkAdminManager.scala:202)
2024-03-07 14:00:41     at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
2024-03-07 14:00:41     at scala.collection.immutable.List.prependedAll(List.scala:153)
2024-03-07 14:00:41     at scala.collection.immutable.List$.from(List.scala:684)
2024-03-07 14:00:41     at scala.collection.immutable.List$.from(List.scala:681)
2024-03-07 14:00:41     at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
2024-03-07 14:00:41     at scala.collection.immutable.Iterable$.from(Iterable.scala:35)
2024-03-07 14:00:41     at scala.collection.immutable.Iterable$.from(Iterable.scala:32)
2024-03-07 14:00:41     at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
2024-03-07 14:00:41     at scala.collection.IterableOps.map(Iterable.scala:682)
2024-03-07 14:00:41     at scala.collection.IterableOps.map$(Iterable.scala:682)
2024-03-07 14:00:41     at scala.collection.AbstractIterable.map(Iterable.scala:933)
2024-03-07 14:00:41     at kafka.server.ZkAdminManager.createTopics(ZkAdminManager.scala:166)
2024-03-07 14:00:41     at kafka.server.KafkaApis.handleCreateTopicsRequest(KafkaApis.scala:1987)
2024-03-07 14:00:41     at kafka.server.KafkaApis.$anonfun$handle$12(KafkaApis.scala:199)
2024-03-07 14:00:41     at kafka.server.KafkaApis.$anonfun$handle$12$adapted(KafkaApis.scala:199)
2024-03-07 14:00:41     at kafka.server.MetadataSupport.maybeForward(MetadataSupport.scala:69)
2024-03-07 14:00:41     at kafka.server.MetadataSupport.maybeForward$(MetadataSupport.scala:61)
2024-03-07 14:00:41     at kafka.server.ZkSupport.maybeForward(MetadataSupport.scala:74)
2024-03-07 14:00:41     at kafka.server.KafkaApis.handle(KafkaApis.scala:139)
2024-03-07 14:00:41     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:153)
2024-03-07 14:00:41     at java.base/java.lang.Thread.run(Thread.java:829)
yakirSalt commented 7 months ago

nvm, found the issue. it works now. I accidentally passed the wrong values due to pointer issues in Go