aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.08k stars 224 forks source link

admin client - failure to create topics (error code 41) #995

Closed kbhatiya999 closed 2 months ago

kbhatiya999 commented 3 months ago

Describe the bug Creating topic fails with topic error 41

Expected behaviour A topic should be created

Environment (please complete the following information):

Reproducible example

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka1:29092,OUTSIDE://localhost:9092 
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE 
    depends_on:
      - zookeeper

  kafka2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka2
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka2:29093,OUTSIDE://localhost:9093
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29093,OUTSIDE://0.0.0.0:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE 
    depends_on:
      - zookeeper

  kafka3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka3
    ports:
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka3:29094,OUTSIDE://localhost:9094
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29094,OUTSIDE://0.0.0.0:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE 
    depends_on:
      - zookeeper
import asyncio
import uuid

from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic

async def main():
    random_topic_name = "topic-" + str(uuid.uuid4())
    # create a new topic
    admin = AIOKafkaAdminClient(bootstrap_servers="localhost:9092")
    await admin.start()
    iteration = 0
    while True:
        response = await admin.create_topics(
            [NewTopic(name=random_topic_name, num_partitions=1, replication_factor=1)]
        )
        print(f"----------{iteration=}-----------")
        # if error_code is not 0, then topic creation failed
        if any(
            error_code for (topic, error_code, error_message) in response.topic_errors
        ):
            print(
                f"Error creating topic as request sent to non-controller Node :: error_code='{response.topic_errors[0][1]}' error_message='{response.topic_errors[0][2]}'"
            )
            print("Retrying topic creation until successful...")

        # if error_code is 0, then topic creation was successful
        if not any(
            error_code for (topic, error_code, error_message) in response.topic_errors
        ):
            print(
                f"Topic '{random_topic_name}' created successfully. Request sent to controller Node."
            )
            break
        iteration += 1

    await asyncio.sleep(5)
    # delete the topic
    response = await admin.delete_topics([random_topic_name])
    print(f"Response from delete_topics: {response}")

async def check_version():
    admin: AIOKafkaAdminClient = AIOKafkaAdminClient(
        bootstrap_servers="localhost:9092",
    )
    await admin.start()
    version = await admin._get_cluster_metadata()
    print(version)

# asyncio.run(check_version())
asyncio.run(main())
ods commented 3 months ago

Hi @kbhatiya999, thank you very much for reproducing the problem! I'll definitely find time to dive into. There can be some delay though, as things are too busy today.

ods commented 2 months ago

Hi @kbhatiya999, could you please check this pull request? It's no finished yet: we also have to identify other methods to apply the same approach.

kbhatiya999 commented 2 months ago

Yes let me take a look