dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.61k stars 1.41k forks source link

Get size of topic #2163

Open simonstumpf opened 3 years ago

simonstumpf commented 3 years ago

Hi All

Is there a way to get the size of a partition with this library? I know its possible via commandline with the kafka-log-dirs command.

jeffwidman commented 3 years ago

By size, I assume you mean bytes since you referred to a partition.

No, that's not currently available, and I don't know if it's even available via the Kafka Admin API... if there's now a KIP that added it, then we'd happily accept a PR adding support.

cjw0202 commented 3 years ago

waiting for the feature

Courouge commented 2 years ago

Hi, I know it is available in Java client here By reading doc in apache kafka repo, this is possible in kafka protocol here

DescribeLogDirs API (Key: 35)

Requests:

DescribeLogDirs Request (Version: 0) => [topics] 
  topics => topic [partitions] 
    topic => STRING
    partitions => INT32

Responses:

DescribeLogDirs Response (Version: 0) => throttle_time_ms [results]
  throttle_time_ms => INT32
  results => error_code log_dir [topics] 
    error_code => INT16
    log_dir => STRING
    topics => name [partitions] 
      name => STRING
      partitions => partition_index partition_size offset_lag is_future_key 
        partition_index => INT32
        partition_size => INT64
        offset_lag => INT64
        is_future_key => BOOLEAN

I already create an issue in confluent-kafka-python here

I try some test with kafka-python lib by adding the following code

kafka/protocol/admin.py

class DescribeLogDirsResponse_v0(Response):
    API_KEY = 35
    API_VERSION = 0
    FLEXIBLE_VERSION = True
    SCHEMA = Schema(
        ('throttle_time_ms', Int32),
        ('log_dirs', Array(
            ('error_code', Int16),
            ('log_dir', String('utf-8')),
            ('topics', Array(
                ('name', String('utf-8')),
                ('partitions', Array(
                    ('partition_index', Int32),
                    ('partition_size', Int64),
                    ('offset_lag', Int64),
                    ('is_future_key', Boolean)
                ))
            ))
        ))
    )

class DescribeLogDirsRequest_v0(Request):
    API_KEY = 35
    API_VERSION = 0
    RESPONSE_TYPE = DescribeLogDirsResponse_v0
    SCHEMA = Schema(
                     ('topics', Array(
                         ('topic', String('utf-8')),
                         ('partitions', Int32)
                         ))
                 )

DescribeLogDirsResponse = [
    DescribeLogDirsResponse_v0,
]

DescribeLogDirsRequest = [
    DescribeLogDirsRequest_v0,
]

kafka/admin/client.py

def describe_log_dirs(self, topic_name, partition_id):
version = self._matching_api_version(DescribeLogDirsRequest)
if version <= 1:
request = DescribeLogDirsRequest[version]([(topic_name, partition_id)])
future = self._send_request_to_node(self._client.least_loaded_node(), request)
self._wait_for_futures([future])
else:
raise NotImplementedError(
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return future.value

I try a simple client

from kafka import KafkaAdminClient
a = KafkaAdminClient(bootstrap_servers='0.0.0.0:9092')
topic_test = a.describe_log_dirs("test",0)
print(topic_test)

==> DescribeLogDirsResponse_v0(throttle_time_ms=0, log_dirs=[(error_code=0, log_dir='/var/lib/kafka/data', topics=[])])

I don't understand why test topic is empty ?

In kafka logdir:

du -h  /var/lib/kafka/data
12K     /var/lib/kafka/data/test-0
Courouge commented 2 years ago

It finally works! I did a PR #2278 @jeffwidman

Simple exemple

from kafka import KafkaAdminClient
a = KafkaAdminClient(bootstrap_servers='0.0.0.0:9092')
DescribeLogDirsResponse= a.describe_log_dirs()

DescribeLogDirsResponse

DescribeLogDirsResponse_v0(throttle_time_ms=0, log_dirs=[(error_code=0, log_dir='/var/lib/kafka/data', topics=[(name='test2', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='test3', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='test', partitions=[(partition_index=0, partition_size=2050711233, offset_lag=0, is_future_key=False)]), (name='test1', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='__confluent.support.metrics', partitions=[(partition_index=0, partition_size=3789, offset_lag=0, is_future_key=False)])])])
du -h  /var/lib/kafka/data
2.0G    /var/lib/kafka/data/test-0
24K     /var/lib/kafka/data/__confluent.support.metrics-0
8.0K    /var/lib/kafka/data/test2-0
8.0K    /var/lib/kafka/data/test3-0
8.0K    /var/lib/kafka/data/test1-0
2.0G    /var/lib/kafka/data

Assuming that "empty partitions" equal to 8.0 kB on disk but in protocol response partition_size value is 0 Byte. For other topics that are not empty it's work fine.

tommy04062019 commented 2 years ago

It finally works! I did a PR #2278 @jeffwidman

Simple exemple

from kafka import KafkaAdminClient
a = KafkaAdminClient(bootstrap_servers='0.0.0.0:9092')
DescribeLogDirsResponse= a.describe_log_dirs()

DescribeLogDirsResponse

DescribeLogDirsResponse_v0(throttle_time_ms=0, log_dirs=[(error_code=0, log_dir='/var/lib/kafka/data', topics=[(name='test2', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='test3', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='test', partitions=[(partition_index=0, partition_size=2050711233, offset_lag=0, is_future_key=False)]), (name='test1', partitions=[(partition_index=0, partition_size=0, offset_lag=0, is_future_key=False)]), (name='__confluent.support.metrics', partitions=[(partition_index=0, partition_size=3789, offset_lag=0, is_future_key=False)])])])
du -h  /var/lib/kafka/data
2.0G    /var/lib/kafka/data/test-0
24K     /var/lib/kafka/data/__confluent.support.metrics-0
8.0K    /var/lib/kafka/data/test2-0
8.0K    /var/lib/kafka/data/test3-0
8.0K    /var/lib/kafka/data/test1-0
2.0G    /var/lib/kafka/data

Assuming that "empty partitions" equal to 8.0 kB on disk but in protocol response partition_size value is 0 Byte. For other topics that are not empty it's work fine. Hi, I try to apply your patch, it works but somehow, it doesn't show enough the numbers of topics. Ex: The cluster have 17 topics but when call describe_log_dirs it only shows info of 14 topics

Courouge commented 2 years ago

Hi @tommy04062019

Thank for your feed back, can you give me more info about your configuration and how can I reproduce your issue ?

tommy04062019 commented 2 years ago

HI @Courouge Bellow is the steps I did

  1. Install kafka-python
  2. Download and copy your patch files:
    RUN cp -f patch/client.py /usr/local/lib/python3.7/site-packages/kafka/admin/client.py
    RUN cp -f patch/admin.py /usr/local/lib/python3.7/site-packages/kafka/protocol/admin.py
  3. Call funtion
    self.admin = KafkaAdminClient(bootstrap_servers=brokers,
                                          security_protocol="SASL_PLAINTEXT",
                                          sasl_mechanism="SCRAM-SHA-512",
                                          sasl_plain_username=f"{username}",
                                          sasl_plain_password=f"{password}",
                                          request_timeout_ms=20000
                                          )
    self.admin.describe_log_dirs()

    The output show the same as you showed above, but the total of topics in output don't match no.of topics that the cluster have.

Courouge commented 2 years ago

Hi @tommy04062019, I try SCRAM user with similar config without issues.

admin = KafkaAdminClient(
  bootstrap_servers='broker1:9093',
  security_protocol="SASL_SSL",
  sasl_plain_username="scram-user",
  sasl_plain_password="scram-password",
  sasl_mechanism="SCRAM-SHA-512")

DescribeLogDirsResponse = admin.describe_log_dirs()
vgvineet4 commented 2 years ago

Is there a forked released version with the changes done by @Courouge which I can include in my requirements.txt?

lariskovski commented 2 years ago

I've tested it, it works perfectly! Thank you, Courouge!!!

hilia commented 1 year ago

Thanks a lot @Courouge!

I did a little change because if you have multiple Kafka nodes, you may not see all the partitions because they are not hosted on all brokers. For example from a given topic you could have:

So I just add the possiblity to specifiy the broker id instead of using _self._client.least_loadednode():

def describe_log_dirs(self, broker_id=None):
    """Send a DescribeLogDirsRequest request to a broker.
    :return: A message future
    """
    version = self._matching_api_version(DescribeLogDirsRequest)
    if version <= 1:
        request = DescribeLogDirsRequest[version]()
        future = self._send_request_to_node(broker_id, request)
        self._wait_for_futures([future])
    else:
        raise NotImplementedError(
            "Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
                .format(version))
    return future.value

I asked myself if it could be better to do a loop on all broker by default if I don't set a specific broker.

And to use this function I do:

    admin_client = KafkaAdminClient(
        bootstrap_servers   = config_file_value['bootstrap_servers'],
        security_protocol   = config_file_value['security_protocol'],
        sasl_plain_username = secret_file_value['sasl_plain_username'],
        sasl_plain_password = secret_file_value['sasl_plain_password'],
        sasl_mechanism      = config_file_value['sasl_mechanism']
        )
    kafka_client = KafkaClient(
        bootstrap_servers   = config_file_value['bootstrap_servers'],
        security_protocol   = config_file_value['security_protocol'],
        sasl_plain_username = secret_file_value['sasl_plain_username'],
        sasl_plain_password = secret_file_value['sasl_plain_password'],
        sasl_mechanism      = config_file_value['sasl_mechanism']
        )

    partition_list = []
    for broker in kafka_client.cluster.brokers():

        # get all data: topic name + partition + partition size
        describe_log_dirs_response = admin_client.describe_log_dirs(broker.nodeId)

Here is the full code in order to get a list with topic_name, partition_id, size :

    admin_client = KafkaAdminClient(
        bootstrap_servers   = config_file_value['bootstrap_servers'],
        security_protocol   = config_file_value['security_protocol'],
        sasl_plain_username = secret_file_value['sasl_plain_username'],
        sasl_plain_password = secret_file_value['sasl_plain_password'],
        sasl_mechanism      = config_file_value['sasl_mechanism']
        )

    kafka_client = KafkaClient(
        bootstrap_servers   = config_file_value['bootstrap_servers'],
        security_protocol   = config_file_value['security_protocol'],
        sasl_plain_username = secret_file_value['sasl_plain_username'],
        sasl_plain_password = secret_file_value['sasl_plain_password'],
        sasl_mechanism      = config_file_value['sasl_mechanism']
        )

    # detail from topic as retention.ms, segment.ms, etc.
    #broker = admin_client.describe_configs(config_resources=[ConfigResource(ConfigResourceType.TOPIC, "my_topic_name_here")])
    #config_list = broker[0].resources[0][1]

    partition_list = []
    for broker in kafka_client.cluster.brokers():

        # get all data: topic name + partition + partition size
        describe_log_dirs_response = admin_client.describe_log_dirs(broker.nodeId)

        # number of topic per BROKER    
        #print(len(describe_log_dirs_response.log_dirs[0][2]))

        # list topics from describe_log
        for count_topic_log in range(0, len(describe_log_dirs_response.log_dirs[0][2])):

            topic_log_name = describe_log_dirs_response.log_dirs[0][2][count_topic_log][0]

            partition_log_count = len(describe_log_dirs_response.log_dirs[0][2][count_topic_log][1])

            total_topic_octet_size = 0
            for partition_log in range(0, partition_log_count):

                partition_log_id    = describe_log_dirs_response.log_dirs[0][2][count_topic_log][1][partition_log][0]
                partition_log_size  = describe_log_dirs_response.log_dirs[0][2][count_topic_log][1][partition_log][1]

                full_topic_partition = topic_log_name + '-' + str(partition_log_id)

                # skip if the record previously exists (each topic-partition is on multiple brokers)
                if any(full_topic_partition in sublist for sublist in partition_list):
                    continue
                else:
                    # add to a list with (topic-partition, size)
                    partition_list.append([full_topic_partition, partition_log_size])

Now you'll have a list with this pattern:

[
['topic_name-partition_id', size],
[...],
]

And in a real world:

[
['my_fun_topic_name-0', 25040],
['my_fun_topic_name-1', 24590],
['my_fun_topic_name-2', 26799],
['my_fun_topic_name-3', 22245],
[...],
]

Now you could use the list:

        for each_partition in partition_list:

            # split topic and partition from list
            from_kafka_split_topic_partition   = each_partition[0].rsplit('-', 1)
            from_kafka_topic_name              = from_kafka_split_topic_partition[0]
            from_kafka_partition_id            = from_kafka_split_topic_partition[1]

Hope it helps :)

thom-vend commented 10 months ago

@hilia yes you right, the proposed PR doesn't give you all partitions in a multiple broker cluster. I found that kafka_client.cluster.brokers() isn't returning all brokers, just the one I'm connecting to admin_client.describe_cluster() Will give me the full list of brokers.

I'll update this comment if I've a better solution to offer

Edit: I ended-up using it like that for now: (In a monitoring script, very close to Hilia's suggestion 👍 ) https://gist.github.com/thom-vend/1563b53bf9f9af2ebe01a55d7551cbde