We have developed an admin script using confluent kafka python AdminClient API.
Our tool is setting desired properties using set_config() method (AdminClient).
According to the method's documentation, any configuration properties that are not included in this method will be reverted to their default values(kafka original defaults)
So in our tool, we are using describe_configs() to retrieve the current configuration and then we are re-setting the other non-default properties to preserve them after update operation.
We are using is_default() to check if the property has a default value or not. This returns False for properties that were set either at server level or at topic level. As a result, we end up setting other properties to server level configured value at topic level ((if it is not already set at topic level)
Example
Let's say we have, the following properties were already configured in server.properties file
segment.bytes = 10000
cleanup.policy = compact
retention.bytes = 1000
And the following property already configured at topic level for a specific topic
segment.bytes = 100
The original default for these properties as per kafka documentation are
segment.bytes = 1073741824
cleanup.policy = delete
retention.bytes = -1
Now, when I run the script and update cleanup.policy to compact,delete then we end-up setting following properties at topic level
cleanup.policy = delete,compact
segment.bytes = 100
retention.bytes = 1000
In this case, though we did not explicitly set retention.bytes, the script configured it to server-level configured value. But this is not the case with segment.bytes, which was a topic level configured property even before we ran our script.
Can we avoid setting server level configured properties at topic level in this case?
How to reproduce
Our method that changes configurations for a topic.
def change_config(self, topic, config_name, config_value):
"""
Changes the configuration value for the specified topic.
"""
resources = [ConfigResource(ConfigResource.Type.TOPIC, topic), ]
fs = self.ac.describe_configs(resources)
for resource, f in fs.items():
remote_config = f.result()
needs_update = False
for k, entry in remote_config.items():
if entry.name == config_name and entry.value != config_value:
needs_update = True
if needs_update:
for k, entry in remote_config.items():
if not entry.is_default:
if entry.name == config_name:
resource.set_config(k, config_value, overwrite=False)
else:
resource.set_config(k, entry.value, overwrite=False)
else:
if entry.name == config_name:
resource.set_config(k, config_value, overwrite=False)
self.ac.alter_configs([resource])
Checklist
Please provide the following information:
[ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
[ ] Apache Kafka broker version:
[ ] Client configuration: {...}
[ ] Operating system:
[ ] Provide client logs (with 'debug': '..' as necessary)
Description
We have developed an admin script using confluent kafka python AdminClient API. Our tool is setting desired properties using
set_config()
method (AdminClient). According to the method's documentation, any configuration properties that are not included in this method will be reverted to their default values(kafka original defaults) So in our tool, we are usingdescribe_configs()
to retrieve the current configuration and then we are re-setting the other non-default properties to preserve them after update operation.We are using
is_default()
to check if the property has a default value or not. This returns False for properties that were set either at server level or at topic level. As a result, we end up setting other properties to server level configured value at topic level ((if it is not already set at topic level)Example Let's say we have, the following properties were already configured in server.properties file segment.bytes = 10000 cleanup.policy = compact retention.bytes = 1000 And the following property already configured at topic level for a specific topic segment.bytes = 100 The original default for these properties as per kafka documentation are segment.bytes = 1073741824 cleanup.policy = delete retention.bytes = -1
Now, when I run the script and update cleanup.policy to compact,delete then we end-up setting following properties at topic level cleanup.policy = delete,compact segment.bytes = 100 retention.bytes = 1000
In this case, though we did not explicitly set retention.bytes, the script configured it to server-level configured value. But this is not the case with segment.bytes, which was a topic level configured property even before we ran our script.
Can we avoid setting server level configured properties at topic level in this case?
How to reproduce
Our method that changes configurations for a topic.
Checklist
Please provide the following information:
confluent_kafka.version()
andconfluent_kafka.libversion()
):{...}
'debug': '..'
as necessary)