fede1024 / rust-rdkafka

A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka
MIT License
1.62k stars 274 forks source link

Discovering the partition count in the subscripted topic #511

Open metesynnada opened 1 year ago

metesynnada commented 1 year ago

I try to discovery the partitions inside the topics I subscribed. Even if it looks simple, I could not manage it with this code

#[test]
fn test_partition() -> KafkaResult<()>{
    let mut client_config = ClientConfig::new();
        client_config.set("group.id", "try_me")
        .set("bootstrap.servers", "localhost:9092")
        .set("enable.partition.eof", "false")
        .set("enable.auto.commit", "true")
        .set("auto.offset.reset", "smallest");
    client_config.set_log_level(RDKafkaLogLevel::Debug);
    let consumer = BaseConsumer::from_config(&client_config)?;
    consumer.subscribe(&["quickstart-events"])?;
    let topic_partition_list = consumer.subscription().unwrap();
    println!("{:?}", topic_partition_list);
    Ok(())
}

which prints

>> TPL {(quickstart-events, -1): Invalid, }

However,

kafka-topics --describe --bootstrap-server localhost:9092 --topic quickstart-events

gives the

Topic: quickstart-events    TopicId: 3IXtzC7TRfOjxclDWklNFA PartitionCount: 6   ReplicationFactor: 1    Configs: 
    Topic: quickstart-events    Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: quickstart-events    Partition: 1    Leader: 1   Replicas: 1 Isr: 1
    Topic: quickstart-events    Partition: 2    Leader: 1   Replicas: 1 Isr: 1
    Topic: quickstart-events    Partition: 3    Leader: 1   Replicas: 1 Isr: 1
    Topic: quickstart-events    Partition: 4    Leader: 1   Replicas: 1 Isr: 1
    Topic: quickstart-events    Partition: 5    Leader: 1   Replicas: 1 Isr: 1

I am not sure what I am doing wrong, any guess? I could not find anything on the documentation as well.

mfelsche commented 1 year ago

You need to use another API, as a subscription is only created asynchronously. When you call consumer.subscribe() it does not block until the subscription is successful. The signal you need to wait for here is the first rebalance call with a Rebalance::Assign(...) argument in the ConsumerContext: https://docs.rs/rdkafka/0.29.0/rdkafka/consumer/trait.ConsumerContext.html#method.post_rebalance If you received this, you usually know you are successfully subscribed. AFAIK, there is no other way to find this out. Also AFAIK the consumer.subscription() will not list all partitions, but only the topics. consumer.assignment() will only list the partitions assigned to this member in the consumer group, so you need to use the following instead:

You can ask the cluster about your topics via: consumer.client().fetch_metadata(Some("quickstart-events"), None) even without subscribing to the topic: https://docs.rs/rdkafka/0.29.0/rdkafka/client/struct.Client.html#method.fetch_metadata