fede1024 / rust-rdkafka

A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka
MIT License
1.6k stars 270 forks source link

Subscribing after unsubscribing no longer working in v0.24.0 #294

Closed athre0z closed 3 years ago

athre0z commented 4 years ago
use futures_util::StreamExt;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{Offset, TopicPartitionList};

static TOPIC: &str = "mytopic";

#[tokio::main]
async fn main() {
    let consumer: StreamConsumer<_> = ClientConfig::new()
        .set("group.id", "blahblahgroup")
        .set("bootstrap.servers", "localhost:9092")
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "false")
        .create()
        .unwrap();

    let mut stream = consumer.start();

    let mut part_list = TopicPartitionList::new();
    part_list.add_partition_offset(TOPIC, 0, Offset::Beginning);
    consumer.assign(&part_list).unwrap();
    dbg!(stream.next().await);

    consumer.unsubscribe();

    let mut part_list = TopicPartitionList::new();
    part_list.add_partition_offset(TOPIC, 0, Offset::Beginning);
    consumer.assign(&part_list).unwrap();
    dbg!(stream.next().await);
}

Assuming mytopic exists, the code above produces:

[src/main.rs:24] stream.next().await = Some(
    Ok(
        Message { ptr: 0x7fb2e4804560 },
    ),
)
[src/main.rs:31] stream.next().await = Some(
    Ok(
        Message { ptr: 0x7fb2e4804ad0 },
    ),
)

when run with rdkafka the latest v0.23, but gets stuck after printing

[src/main.rs:24] stream.next().await = Some(
    Ok(
        Message { ptr: 0x7feb6fe040f0 },
    ),
)
^C

when using v0.24.0 or the latest commit from master as of writing.

Sample crate for repro -- just switch the version in Cargo.toml: rdkafka-debug.tar.gz

scooter-dangle commented 3 years ago

I'm debugging a similar issue where the drop implementation of the consumer is hanging. I was able to show that not dropping lets the program proceed by forget-ing the consumer. The nearest related issue I found was #48.

Apologies if this ends up being a separate issue.

benesch commented 3 years ago

@athre0z, mixing calls to assign/subscribe like that is odd. What are you trying to do, exactly?

I can reproduce this behavior on the BaseConsumer too, which suggests it's something in librdkafka. I wouldn't be surprised if edenhill/librdkafka#2455 is part of the problem. There is probably a way to do what you're after that doesn't involve following a call to unsubscribe with a call to assign.

athre0z commented 3 years ago

I pretty much have my own (ghetto) Flink-esque stream processing lib and it has a three topics for each stream: an input topic, a state topic where the stream processors full state is dumped to every 10mil events processed and one output topic. On startup, the code gathers the last message from the state topic which contains the offset to which the input topic must be rewound, then obtains the last output message written (which contains the offset of the corresponding input message) and combining this information can begin replaying messages to update it's state until it finally reaches the point where it is back in sync with the output stream and can start producing messages again.

In short: I use assign/unsubscribe to get the last message from the output & state topics & then finally assign to subscribe to the input topic. It's very much possible that this counts as "doing it wrong" and I should create separate consumers for each of these.

benesch commented 3 years ago

Ohh, I see. You're intending to use unsubscribe to undo the effects of the first call to assign, yeah? I think that's probably your issue, and you should just use assign twice.

// Load state.
let mut part_list = TopicPartitionList::new();
part_list.add_partition_offset(state_topic, 0, Offset::Beginning);
consumer.assign(&part_list).unwrap();
let state: SomeStateThing = consumer.recv().await.parse();

// Process input.
let mut part_list = TopicPartitionList::new();
part_list.add_partition_offset(input_topic, 0, Offset::Offset(state.offset));
consumer.assign(&part_list).unwrap();

loop {
    match consumer.recv().await { /* ... */ }
}

Every call to assign overwrites the previous assignment—they're not additive. So if the second call to assign does not include a toppar that was present in the first call, then that toppar will be "unassigned." (Note: not unsubscribed, as that's something slightly different.)

subscribe/unsubscribe are a sort of separate API from assign. When you use subscribe, you ask Kafka to automatically assign you a fair share of the partitions from the subscribed topics (the "consumer group" protocol). When you use assign, you're telling Kafka that you're manually assigning them yourself, and to deactivate the consumer group protocol.

This is all very confusing. It's not well explained in the librdkafka documentation because librdkafka assumes familiarity with the Java API, and it's doubly not well-explained in the rust-rdkafka documentation because we assume both familiarity with librdkafka and the Java API. But, the good news is the Java API docs are pretty good! I highly recommend reading the sections in the Java KafkaConsumer docs that pertain to topic subscriptions and manual partition assignment.

athre0z commented 3 years ago

Wow, thanks for the excellent explanation -- this whole API makes a lot more sense to me now. I had indeed assumed that assigning a second time would be additive. Closing this since it appears to have been a usage error!

benesch commented 3 years ago

Awesome, glad I could help! 🥂