I'm working on a script that does some bisection on the kafka stream to find a timestamp. to this end, I need to dip in and out of consume() a bunch. Here's a couple of fixes related to that:
call rd_kafka_message_destroy right before rb_yield()
if rb_yield() ends in a "break" statement, it never returns control back to the caller; thus we'd leak the message
ensure we call consumer_consume_loop_stop at the end of the loop …
we need rb_ensure so that if the loop terminates in a "break" statement we'll still call rd_kafka_consume_stop
I'm working on a script that does some bisection on the kafka stream to find a timestamp. to this end, I need to dip in and out of
consume()
a bunch. Here's a couple of fixes related to that:if rb_yield() ends in a "break" statement, it never returns control back to the caller; thus we'd leak the message
we need rb_ensure so that if the loop terminates in a "break" statement we'll still call rd_kafka_consume_stop