confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
84 stars 892 forks source link

Consumer from unknown topic stuck #1748

Closed HenriqueMAlmeida closed 4 months ago

HenriqueMAlmeida commented 4 months ago

Description

I am trying to create a consumer subscribed to more than one topic.

Some of the topics may not exist which appears to create an issue with the remaining topics. In the example below, I have subscribed to an existing topic (already with messages in the queue) and another topic that does not exist.

I have enabled the "allow.auto.create.topics" option, which I expected it would allow the consumer to create the topic.

However, the inexistent topic does not appear to be created and the consumer sends an error message saying it found an unknown topic. The consumer also stays stuck in the polling, without reading the messages from the other topic.

Changing the order of subscription, subscribing to the existing topic last, appears to change the behavior of the consumer, and all the messages from the existing topic are read.

Is this an expected behavior, should I change any more settings to allow the consumer to create the topic himself?

Thank you!

How to reproduce

import uuid

from confluent_kafka.cimpl import KafkaError, Consumer

kafka_config = {
    "allow.auto.create.topics": True,
    "auto.offset.reset": "earliest",
    "bootstrap.servers": "servers",
    "enable.auto.commit": False,
    "group.id": str(uuid.uuid4()),
}

consumer = Consumer({**kafka_config})

consumer.subscribe(["existing-topic"])
consumer.subscribe(["inexistent-topic"])

while True:
    msg = consumer.poll(timeout=1.0)
    print(msg)

    if msg is None:
        continue

    if msg.error():
        print(msg.error())

    print(f"Received message: {msg.value()}")`

python: 3.10 version: ('2.4.0', 33816576) libversion: ('2.4.0', 33816831) Broker version: 3.0.1

Checklist

Please provide the following information: