confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
276 stars 3.15k forks source link

Separate partition queues no longer working in v1.6.0 #3231

Open benesch opened 3 years ago

benesch commented 3 years ago

Description

We're seeing test failures in Materialize with librdkafka v1.6.0 that weren't present in v1.5.3 regarding split partition queues. Basically, we want n different queues to poll given a topic with n partitions. So we create a consumer, assign it to the partitions we want, then call something like:

rd_kafka_assign(rdk, assignment);
rd_kafka_queue_t q = rd_kafka_queue_get_partition(rdk, ...);
rd_kafka_queue_forward(q, NULL);

The code is in Rust and goes through the rust-rdkafka wrapper, so the reality is more complicated.

The problem is that the queue forwarding does not seem to take effect, at least not immediately. The next call to rd_kafka_poll(rdk) on the main consumer, which we expect to serve only callbacks and not return any messages, will return a message from the queue we just forwarded!

How to reproduce

The failing dependency bump PR is here, if it's of interest: https://github.com/MaterializeInc/materialize/pull/5478

I don't have a nice tight reproduction at the moment, but I did manage to bisect the problem down to e4c24e95e89694b6f8e67d6bcf16d9108cdc51ba. The title of that commit is "Rewrote assignment handling and move it out of the cgrp", so definitely seems related.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

edenhill commented 3 years ago

Are you checking the error return of rd_kafka_assign()? its error enforcement has been improved.

benesch commented 3 years ago

Ah, sorry, I simplified a bit too much. The call to rd_kafka_assign goes through the Rust wrapper which does check errors:

https://github.com/fede1024/rust-rdkafka/blob/41ccf186c0e0bcd3894b6de1beb10ef7ad8557dd/src/consumer/base_consumer.rs#L311-L319

benesch commented 3 years ago

I also assume that the assignment must be taking effect, right, because we do manage to get a message after the call to assign—just from the wrong queue!

benesch commented 3 years ago

Ok, I think I see the issue. rd_kafka_cgrp_assign now works like this:

rd_kafka_assignment_clear();
rd_kafka_assignment_serve();
rd_kafka_assignment_add(new_assignment);

So it seems like now after every assignment, existing partition queues are destroyed and new ones are created in their place? And these new queues do not maintain the forwarding of the original queues.

benesch commented 3 years ago

Ok, so this turned out to be easy enough to work around. Now after every call to assign we are careful to destroy our old partition queue references, get new queue references, and unforward those queues. The downstream PR is here, if you're curious: https://github.com/MaterializeInc/materialize/pull/5478/files#diff-ad5d38bf37b32dbbd4546b0fcd7f9b6daf573be8d5392847e1ed39c09b6c9f5cR579-R587

I don't have enough context to say whether librdkafka's new behavior is right or wrong, but it is most definitely different! Would be great if the documentation could be updated accordingly if you plan to stick with the new behavior. Roughly speaking, I think every call to rd_kafka_assign invalidates any previous queue references and undoes any previous rd_kafka_queue_forward calls. I haven't though through or tested whether this applies to assignments triggered by cgrp rebalancing too.

edenhill commented 2 years ago

Great analysis, @benesch ! Would you like to submit a PR to update the docstring for the relevant rd_kafka_queue.. functions and rd_kafka_assign*() for this behaviour?

benesch commented 2 years ago

Apologies, but I'm spinning down my involvement with rdkafka and I won't have the time!