Closed jfinzel closed 3 years ago
Topic deletions are asynchronous in Kafka, it will take some time (perhaps a long time) for the delete to propagate through the cluster if any of topic's partition replicas are unavailable.
It is generally recommended to not delete and re-create topics in Kafka since there is no way for a client to understand that the new topic is not the same as the old one (with regards to partition count, offsets, messages, etc)
@edenhill the use case is for testing a data pipeline. I need to be able to reset the scenario. Actually I can think of many use cases for deleting a topic in a data pipeline, where I actually don't care about the offsets, etc. So, if it isn't recommended, why is the method provided?
admin.delete_topics(TOPICS)
works for me, but I put in a time.sleep(3)
before recreating the deleted topics.
For more robust tests it might be better to use topics with a random ID suffix, and then delete them at the end of the test.
@jfinzel
So, if it isn't recommended, why is the method provided?
It is the quick topic delete and re-create cycle that is not recommended.
@edenhill and @candlerb I have never quickly re-created the topic, and a 3 second sleep is nowhere near sufficient to actually show the topic as deleted. I have waited a period of minutes with the topic metadata still showing up.
It works for me, although I'm only testing small topics with a few hundred messages, on a one-node cluster (running confluentinc/cp-kafka:5.2.1
)
@jfinzel That doesn't sound right. Are all the brokers in the cluster up and operational? How many partitions are there in the topic? How large are the partitions? approximate size in gigs. Can you show me your delete code?
@jfinzel
So, if it isn't recommended, why is the method provided?
It is the quick topic delete and re-create cycle that is not recommended.
@edenhill Do you mean deletion must be fully asynchronous ?
Does kafka-python provide a way to "wait" for this deletion anyway ?
There's no rock solid way for a client to know when a topic has been truly deleted throughout the cluster, it is thus not recommended to delete and re-create topics with the same name.
KIP-516 sets out to address this, but it is not yet implemented.
Thank you @edenhill. It seems I've found a way to achieve this without getting an error :+1:
client = KafkaClient(bootstrap_servers=kafka_servers)
# [...] getting broker_topics
admin_client = KafkaAdminClient(bootstrap_servers=kafka_servers)
if topic_name in broker_topics:
deletion = admin_client.delete_topics([topic_name])
try:
future = client.cluster.request_update()
client.poll(future=future)
except KafkaError as e:
print(e)
pass
admin_client.create_topics(new_topics=topic_list, validate_only=False)
These two lines seem to make it work :
future = client.cluster.request_update()
client.poll(future=future)
Is it normal or do you think it's a side effect for the code to "wait", but not exactly waiting for deletion ?
That's kafka-python, not confluent-kafka-python
@edenhill will support for KIP-516 be coming soon? Just noticed that it has been supported since Apache Kafka 2.8.0 in April this year.
Description
The delete_topics method does not work. After it has been run, you can still view the topic with the ccloud cli. It is only by using the ccloud cli that you can actually delete the topic.
How to reproduce
Create a kafka topics and run KafkaAdminClient delete_topics using that topic name. The method will succeed. Check if the topic still exists using the ccloud cli, and it is still there.
Checklist
Please provide the following information:
confluent_kafka.version()
andconfluent_kafka.libversion()
):{...}
'debug': '..'
as necessary)Please let me know if you need any of the other above information if necessary.