dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.52k stars 1.39k forks source link

How to use ClusterMetaData Class to get some info about the kafka cluster #2363

Open rujutashinde opened 1 year ago

rujutashinde commented 1 year ago

Hello,

I was trying to use ClusterMetaData as below , to see if any of its methods return anything useful, but i am getting most return values as None or empty sets. The kafka server is working and healthy as i can create producers/consumers and get correct topics and other info

def kafka_with_cluster_metadata_method():
    print("in  kafka_with_cluster_metadata_method")
    try:
        ko = cluster.ClusterMetadata(bootstrap_servers=settings.kafka_server,
                             metadata_max_age_ms=100000,
                             retry_backoff_ms=60)

        print(f"ko obj is {ko}")
        print(f"ko.config {ko.config}")
        print("broker info")
        print(f"ko.brokers : {ko.brokers}")
        print(f"ko.broker_metadata(0) {ko.broker_metadata(0)}")
        print("topic info")
        print(f"ko.topics: {ko.topics}")
        print(f"ko.need_all_topic_metadata {ko.need_all_topic_metadata}")
        print("partition info")

        print(f"ko.partitions_for_topic('some_existing_topic') {ko.partitions_for_topic('some_existing_topic')}")
    except Exception as e:
        raise e

Output of the above print messages are:

in  kafka_with_cluster_metadata_method
ko obj is ClusterMetadata(brokers: 0, topics: 0, groups: 0)
ko.config {'retry_backoff_ms': 60, 'metadata_max_age_ms': 100000, 'bootstrap_servers': '********'}
broker info
ko.brokers : <bound method ClusterMetadata.brokers of <kafka.cluster.ClusterMetadata object at 0x7f54362c7e50>>
ko.broker_metadata(0) None
topic info
ko.topics: <bound method ClusterMetadata.topics of <kafka.cluster.ClusterMetadata object at 0x7f54362c7e50>>
ko.need_all_topic_metadata False
partition info
ko.partitions_for_topic('some_existing_topic') None

Following this documentation https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html

I tried initializing KafkaClient and then calling KafkaClient.cluster to see if this some how initializes the clusetermetadata class, but i am not seeing any difference here either.

Can someone help out here?

Jijun-TANG commented 1 year ago

Hello, there is a hidden method in API "KafkaAdminClient" called "_get_cluster_metadata": Code of KafkaAdminClient, check line 491.

So what you can do is to connect to kafka broker using two APIs:

kafka_client = kafka.KafkaClient(**yourConf)

admin_client = kafka.KafkaAdminClient(**yourConf)

metadataResponse = admin_client._get_cluster_metadata()

clusterMetadata = kafka_client.cluster

clusterMetadata.update_metadata(metadataResponse)

And then you can have your clusterMetadata up to date.

Hope this can help

0x91 commented 2 months ago

Hello, there is a hidden method in API "KafkaAdminClient" called "_get_cluster_metadata": Code of KafkaAdminClient, check line 491.

So what you can do is to connect to kafka broker using two APIs:

kafka_client = kafka.KafkaClient(**yourConf)

admin_client = kafka.KafkaAdminClient(**yourConf)

metadataResponse = admin_client._get_cluster_metadata()

clusterMetadata = kafka_client.cluster

clusterMetadata.update_metadata(metadataResponse)

And then you can have your clusterMetadata up to date.

Hope this can help

For me this throws an exception on 2.0.2:

>>> cluster_metadata.update_metadata(metadata_response)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "./venv/lib/python3.11/site-packages/kafka/cluster.py", line 280, in update_metadata
    for p_error, partition, leader, replicas, isr in partitions:
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: too many values to unpack (expected 5)
Jijun-TANG commented 2 weeks ago

Hello, this is a problem caused by the upgrade of python. Since there is no new releases on this project for almost 4 years, you need to change "./venv/lib/python3.11/site-packages/kafka/cluster.py" manually and unpack it with the right number of variables.

Hello, there is a hidden method in API "KafkaAdminClient" called "_get_cluster_metadata": Code of KafkaAdminClient, check line 491. So what you can do is to connect to kafka broker using two APIs:

kafka_client = kafka.KafkaClient(**yourConf)

admin_client = kafka.KafkaAdminClient(**yourConf)

metadataResponse = admin_client._get_cluster_metadata()

clusterMetadata = kafka_client.cluster

clusterMetadata.update_metadata(metadataResponse)

And then you can have your clusterMetadata up to date. Hope this can help

For me this throws an exception on 2.0.2:

>>> cluster_metadata.update_metadata(metadata_response)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "./venv/lib/python3.11/site-packages/kafka/cluster.py", line 280, in update_metadata
    for p_error, partition, leader, replicas, isr in partitions:
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: too many values to unpack (expected 5)