aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.08k stars 224 forks source link

IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol. #1008

Open mjunaidca opened 1 month ago

mjunaidca commented 1 month ago

Describe the bug When trying to create a topic using AIOKafkaAdminClient, the following error is encountered:

IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol.

Expected behaviour The topic should be created successfully without throwing an IncompatibleBrokerVersion error.

Environment (please complete the following information):

Here's the compose.yaml file: https://github.com/mjunaidca/kafka-playground/blob/main/python-kafka/compose.yml

Reproducible example

import logging
from aiokafka.admin import AIOKafkaAdminClient, NewTopic
from aiokafka.errors import KafkaError, TopicAlreadyExistsError

# Kafka settings (replace with actual values)
KAFKA_BOOTSTRAP_SERVER = 'broker:19092'

# Logging configuration
logging.basicConfig(level=logging.INFO)

async def startup_topic_event():
    admin_client = AIOKafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVER)
    try:
        topic1 = NewTopic(
            name="PET",
            num_partitions=3,
            replication_factor=1,
        )
        # Create topic if it doesn't exist
        await admin_client.create_topics(
            new_topics=[topic1], validate_only=False
        )
        logging.info(f"Topic 'PET' created successfully.")
    except TopicAlreadyExistsError:
        logging.info(f"Topic 'PET' already exists.")
    except KafkaError as e:
        logging.error(f"Failed to create topic 'PET': {e}")
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
    finally:
        await admin_client.close()

# Note: This function is not used in the current implementation due to the compatibility issue.
# The good thing is that the producer/consumer auto-creates the topic.

Additional context The auto-creation of topics by producers/consumers works as expected. However, explicit creation using the admin client fails due to the broker version compatibility issue. This was tested with the Kafka broker version specified in the environment section.

ods commented 1 month ago

AIOKafkaAdminClient must be bootstrapped before use, the preferred way is

async with AIOKafkaAdminClient(...) as client:
    ...