flier / tokio-kafka

Asynchronous Rust client for Apache Kafka
Apache License 2.0
31 stars 1 forks source link

Consumer: how to commit offsets? #14

Open polachok opened 6 years ago

polachok commented 6 years ago
let builder = KafkaConsumer::with_bootstrap_servers(kafka_hosts, handle)
        .with_client_id("tokio-kafka".to_string())
        .with_group_id("tokio-kafka-consume-produce".to_string())
        .without_key_deserializer()
        .with_auto_offset_reset(OffsetResetStrategy::Latest)
        .with_value_deserializer(BytesDeserializer::default());
    let mut consumer = builder.build().unwrap();
    let consume = consumer.subscribe(&["whatever".to_string()])
        .and_then(|topics| {
            topics.for_each(|record| {
                let v: Vec<u8> = record.value.unwrap();
                // do something with value
                Ok(()) /* i'd like to commit here, but it's not possible, `topics` is moved into for each */
            })
        })
        .map(|_| ())
        .map_err(|e| println!("error: {}", e));
flier commented 6 years ago

I've extracted the Inner from SubscribedTopics and you can commit, but it still doesn't work because the consume group haven't finished yet, you can check the document for more detail :(

Besides, old kafka (0.8) use zookeeper to maintain the consumer group, we need a special setting to support it later.

flier commented 6 years ago

I'm still working on the offset commit/fetch scenes, it seems we could support with/without consumer group in same API.

polachok commented 6 years ago

Ok, fine. Btw it's still not possible to commit inside for_each().

.and_then(|topics| {
            topics.clone().for_each(|record| {
                let v: Vec<u8> = record.value.unwrap();
                println!("Consumed: {:?}", String::from_utf8_lossy(&v[0..128]));

                topics.commit().map(|_| ())
                ^^^^^^ borrowed value does not live long enough
            })
        })