confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.59k stars 652 forks source link

Can't get allow.auto.create.topics=true to work #615

Open tobgu opened 3 years ago

tobgu commented 3 years ago

Description

After upgrading to v1.6.1 (from v1.1) topics are no longer auto created when subscribing to a topic that does not exist since before.

Previously, as part of test setups for example, I was relying on setting up a subscription and then waiting until a kafka.PartitionEOF was received from consumer.Poll to know that the consumer had received an assignment.

Due to the changes in https://github.com/edenhill/librdkafka/issues/1540 this no longer works which is fine (but a tad backwards incompatible). Instead an error is received, kafka.ErrUnknownTopicOrPart.

I updated the consumer config map to include the new parameter allow.auto.create.topics set to true and expected things to work like before. I did not however, I cannot see any difference whatsoever compared to not specifying the parameter. I still receive the same error with the same error code. When listing the topics from the broker, using ./kafka-topics.sh --bootstrap-server localhost:9092 --list the topic in question does not show up.

It could be that I'm misunderstanding something fundamental here or that I've made a silly mistake somewhere.

How to reproduce

Pseudo code for how to reproduce:

consumer, err := kafka.NewConsumer(&kafkaConfig)
if err != nil {
   panic(err)
}

rebalanceCallback := func(c *kafka.Consumer, event kafka.Event) error {
   log.Printf("Rebalanced: %s", event.String())
   return nil
}

if err := c.consumer.Subscribe(c.topic, rebalanceCallback); err != nil {
   panic(err)
}

for {
    ev := consumer.Poll(c.config.KafkaPollTimeoutMs)
    switch e := ev.(type) {
        case kafka.PartitionEOF:
            // This never happens under v1.16.1, works fine with v1.4.2 for example.
            c.logger.Printf("PartitionEOF: %s", e.String())
        case kafka.Error:
            log.Printf("Kafka error: %s, %s", e.Code(), e.String())
        default:
            c.logger.Printf("Ignored kafka msg: %s\n", e.String())
    }   
}

Checklist

Please provide the following information:

edenhill commented 3 years ago

Subscribed topics will not be auto-created, regardless of allow.auto.create.topics, the topic would be empty anyway. Topic auto creation really only has value on the producer.

The recommendation is not to rely on auto topic creation at all, but to use the Admin API to create topics.

tobgu commented 3 years ago

OK, thanks. Isn't the documentation for the property terribly misleading then? https://github.com/edenhill/librdkafka/blame/master/CONFIGURATION.md#L120

It's stated to be a consumer property and the first sentence says: "Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics."

I tried using the admin client as a workaround, which worked fine, but got a bit stuck on #616 which caused some inconvenience.

john-larson commented 3 years ago

OK, thanks. Isn't the documentation for the property terribly misleading then? https://github.com/edenhill/librdkafka/blame/master/CONFIGURATION.md#L120

It's stated to be a consumer property and the first sentence says: "Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics."

I tried using the admin client as a workaround, which worked fine, but got a bit stuck on #616 which caused some inconvenience.

True. @edenhill What would be the reason of this discrepancy between the documentation and the application behavior?

meichengkeji commented 2 years ago

The same question, why disable this property

duxing commented 2 years ago

agreed with the above. the discrepancy between documentation and actual behavior still exists as of 06/2022.

any chance this can be corrected? at least update the document so it's not misleading

kwongpan commented 2 years ago

I have auto.create.topics.enable=true on the broker and allow.auto.create.topics=true on the consumer.

I think the bug is actually on the fetcher. I could see that from the log where the fetcher is removed for partitions immediately.

[2022-06-09 08:43:10,304] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test_topic_1654764189837-0, test_topic_1654764189837-1) (kafka.server.ReplicaFetcherManager)

[2022-06-09 08:43:10,276] INFO Sent auto-creation request for Set(test_topic_1654764189837) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) [2022-06-09 08:43:10,277] INFO [Controller 1] CreateTopics result(s): CreatableTopic(name='test_topic_1654764189837', numPartitions=2, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,277] INFO [Controller 1] Created topic test_topic_1654764189837 with topic ID zj7lYO0rR6WQAj_ir-YNFg. (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,277] INFO [Controller 1] Created partition test_topic_1654764189837-0 with topic ID zj7lYO0rR6WQAj_ir-YNFg and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,278] INFO [Controller 1] Created partition test_topic_1654764189837-1 with topic ID zj7lYO0rR6WQAj_ir-YNFg and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,304] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test_topic_1654764189837-0, test_topic_1654764189837-1) (kafka.server.ReplicaFetcherManager) [2022-06-09 08:43:10,307] INFO [LogLoader partition=test_topic_1654764189837-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$) [2022-06-09 08:43:10,308] INFO Created log for partition test_topic_1654764189837-0 in /tmp/kraft-combined-logs/test_topic_1654764189837-0 with properties {} (kafka.log.LogManager) [2022-06-09 08:43:10,309] INFO [Partition test_topic_1654764189837-0 broker=1] No checkpointed highwatermark is found for partition test_topic_1654764189837-0 (kafka.cluster.Partition) [2022-06-09 08:43:10,310] INFO [Partition test_topic_1654764189837-0 broker=1] Log loaded for partition test_topic_1654764189837-0 with initial high watermark 0 (kafka.cluster.Partition) [2022-06-09 08:43:10,316] INFO [LogLoader partition=test_topic_1654764189837-1, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$) [2022-06-09 08:43:10,317] INFO Created log for partition test_topic_1654764189837-1 in /tmp/kraft-combined-logs/test_topic_1654764189837-1 with properties {} (kafka.log.LogManager) [2022-06-09 08:43:10,317] INFO [Partition test_topic_1654764189837-1 broker=1] No checkpointed highwatermark is found for partition test_topic_1654764189837-1 (kafka.cluster.Partition) [2022-06-09 08:43:10,317] INFO [Partition test_topic_1654764189837-1 broker=1] Log loaded for partition test_topic_1654764189837-1 with initial high watermark 0 (kafka.cluster.Partition)

lipeicarta commented 3 weeks ago

Is there any update to this issue?