zendesk / ruby-kafka

A Ruby client library for Apache Kafka
http://www.rubydoc.info/gems/ruby-kafka
Apache License 2.0
1.27k stars 340 forks source link

Consumer does not consume messages before all leader brokers receive a message #365

Closed headshota closed 7 years ago

headshota commented 7 years ago

Experiencing strange bug/behavior from consumer (group API) with the following setup:

1 consumer subscribed to a topic. 3 brokers, 10 partitions, replication factor of 2.

When message is appended, the consumer does not consume it immediately. Only when all 3 brokers get at least one message does the consumer start consuming. (min_bytes is set to 1)

Further testing with 2 partition only, revealed the following: if there is 1 leader for both partitions, messages are consumed immediately. If partitions have different leaders, messages are not consumed until both leaders get the messages.

Consumer example:

...
consumer = kafka.consumer(group_id: group_id)

trap('INT') { consumer.stop } # ctrl-c
trap('TERM') { consumer.stop } # kill PID

consumer.subscribe(
  topic,
  start_from_beginning: false
)

consumer.each_batch(min_bytes: 1, max_wait_time: 5) do |batch|
  batch.messages.each do |message|
     puts message.value
  end
end

Expected behavior: Consumer should start consuming immediately after message is appended.

Note: Seems like an issue in the gem, because when using a cli consumer (kafka-console-consumer.sh), messages are consumed immediately after being appended.

headshota commented 7 years ago

Setting min_bytes to 0 seems to solve the issue, the messages are consumed immediately. But the server gets flooded with offset commit requests.

michaelsauter commented 7 years ago

One issue with setting min_bytes to 0 is that the response from the server happens immediately, and then we have a lot more requests than we would want as we never wait for max_wait_time.

@dasch Is the behaviour we see to be expected or does it look more like a bug?

dasch commented 7 years ago

Do you mean that the messages are never consumed or that you have to wait for max_wait_time to pass before any of the messages are consumed? If it's the latter case, that's a well-known limitation of the current, rather simplistic, approach taken by ruby-kafka when consuming messages.

I plan on rewriting the consumer to address this and other limitations, but I'm not sure when I'll have the time to do it.

michaelsauter commented 7 years ago

I meant that the messages are never returned, which is why I find it surprising. My understanding of max_wait_time: X was that, whatever happens, after time X, all messages (or how many fit into max_bytes_per_partition) are returned to the consumer (even if min_bytes is not satisfied). What we are seeing however is, with min_bytes: 1, all leaders of the topic need to have at least one message before any message is returned. If only one leader has one message in it, and there are other leaders that do not have a message, then you wait forever and no message is consumed.

It only works as expected if there is a topic with just one partition, and one message is appended, then that message is read immediately (as it exceeds min_bytes). If there are more partitions, then there are more leaders (we have 3 brokers), and it looks like nothing happens until all leaders have at least one message.

dasch commented 7 years ago

@michaelsauter can you post more details on your setup, including the Kafka broker version you're running with? My understanding of the API is what you're describing: after max_wait_time, all brokers should return possibly empty message batches.

michaelsauter commented 7 years ago

We have 3 brokers, version 0.10.1.0. The topic in question has 10 partitions, and 2 replicas (Kafka writes to a EBS volume, so we don't need 3).

The gem version is 0.3.17.

The consumer is a single process in a consumer group.

dasch commented 7 years ago

@michaelsauter this could be a 0.10.1.0 issue.

Can you try with the most recent beta release of ruby-kafka?

tcrayford commented 7 years ago

@michaelsauter you should know that kafka 0.10.1.0 had quite a few deadlocks and other issues that might result in problems like this. I'd recommend upgrading off it.

michaelsauter commented 7 years ago

@dasch I'll give 0.3.18.beta2 a try tomorrow and report back.

@tcrayford Do you mean upgrading the brokers? I re-checked and noticed we're actually on 0.10.1.1. From there I only see the option to go to 0.10.2.1 or 0.11.0.0, but I thought there might be even more issues with this gem as it advertises 0.9 compatibility?

tcrayford commented 7 years ago

@michaelsauter ah, yes. But you're already off 0.10.1.0, so it's not that.

headshota commented 7 years ago

As mentioned above, we tested it through kafka cli tools and there it seems to work as expected.

dasch commented 7 years ago

I'd be interested in seeing if this is also a problem on 0.9 – it could be that the broker's backwards compatibility layer is the source of the bug.

The client doesn't really do anything besides declaring the behavior it wants of the broker, so unless the issue is on the client IO side (which seems unlikely) I think we should look at the brokers.

michaelsauter commented 7 years ago

@dasch We tried with 0.3.18.beta2, but it does not solve the problem.

dasch commented 7 years ago

Is this something you can reproduce locally? If so, could you try with Kafka 0.9 just to be sure?

michaelsauter commented 7 years ago

@dasch Hmm. We could try but it is a bit of work to get everything setup, so I'd like to know what the implications would be. Assuming this issue does not exist with 0.9 brokers, what could we do to fix the problem? Downgrading our brokers to 0.9 is not an option, so is it possible for the gem not to use this compatibility layer?

I guess that implies cutting 0.9 support - how far are we from that / is there anything we could help to get closer to that?

dasch commented 7 years ago

My plan is to move the main line of development to only support 0.10 and have a stable branch with bug and security fixes for the 0.9 compatible version. The timing of that is mainly dependent on when Zendesk is done upgrading our clusters – I need to have access to a good testing ground.

Once the client uses the 0.10 versions of the APIs, no compatibility mapping will be done by the brokers.

I would like to pinpoint if this issue is caused by using 0.10 or by a bug in ruby-kafka, though.

michaelsauter commented 7 years ago

@dasch Sounds good :+1:

I'll try to reproduce locally with 0.9 brokers.

Is it worth testing also against 0.10 with one of the "0.10 support pull requests" or are they out of date / not working?

dasch commented 7 years ago

@michaelsauter I think they're very out of date, unfortunately, but you could give it a try.

michaelsauter commented 7 years ago

Okay, unfortunately we have been able to reproduce this locally under 0.9.0.1 as well.

Setup: 3 brokers (0.9.0.1, no SSL), 1 topic with 10 partitions and replication factor 2 Gem versions: 0.3.17 and 0.3.18beta2

dasch commented 7 years ago

Thanks! Okay, then I'll have to dig in and see what ruby-kafka does wrong.

Do you have log snippets I could look at?

headshota commented 7 years ago

Here's a log output from the consumer. Notice that initially there are no messages processed and in the end when all broker leaders get at least one message it starts consuming them.

15:08:37 [   27] Fetching batch from test/8 starting at offset 1
15:08:37 [   27] Fetching batch from test/2 starting at offset 1
15:08:37 [   27] Fetching batch from test/5 starting at offset 1
15:08:37 [   27] Fetching batch from test/4 starting at offset 2
15:08:37 [   27] Fetching batch from test/7 starting at offset 2
15:08:37 [   27] Fetching batch from test/1 starting at offset 2
15:08:37 [   27] Fetching batch from test/9 starting at offset 2
15:08:37 [   27] Fetching batch from test/3 starting at offset 1
15:08:37 [   27] Fetching batch from test/6 starting at offset 2
15:08:37 [   27] Fetching batch from test/0 starting at offset 1
15:08:37 [   27] Fetching cluster metadata from kafka://kafka1.dev:9092
15:08:37 [   27] Opening connection to kafka1.dev:9092 with client id pjpp...
15:08:37 [   27] Sending request 1 to kafka1.dev:9092
15:08:37 [   27] Waiting for response 1 from kafka1.dev:9092
15:08:37 [   27] Received response 1 from kafka1.dev:9092
15:08:37 [   27] Discovered cluster metadata; nodes: kafka1.dev:9092 (node_id=1), kafka2.dev:9092 (node_id=2), kafka3.dev:9092 (node_id=3)
15:08:37 [   27] Closing socket to kafka1.dev:9092
15:08:37 [   27] Opening connection to kafka2.dev:9092 with client id pjpp...
15:08:37 [   27] Sending request 1 to kafka2.dev:9092
15:08:37 [   27] Waiting for response 1 from kafka2.dev:9092
15:08:38 [   27] Received response 1 from kafka2.dev:9092
15:08:38 [   27] Sending request 16 to kafka1.dev:9092
15:08:38 [   27] Waiting for response 16 from kafka1.dev:9092
15:08:39 [   27] Timed out while waiting for response 16
15:08:39 [   27] Closing socket to kafka1.dev:9092
15:08:39 [   27] Connection error while fetching messages: Connection error: Connection timed out
15:08:39 [   27] Fetching batch from test/8 starting at offset 1
15:08:39 [   27] Fetching batch from test/2 starting at offset 1
15:08:39 [   27] Fetching batch from test/5 starting at offset 1
15:08:39 [   27] Fetching batch from test/4 starting at offset 2
15:08:39 [   27] Fetching batch from test/7 starting at offset 2
15:08:39 [   27] Fetching batch from test/1 starting at offset 2
15:08:39 [   27] Fetching batch from test/9 starting at offset 2
15:08:39 [   27] Fetching batch from test/3 starting at offset 1
15:08:39 [   27] Fetching batch from test/6 starting at offset 2
15:08:39 [   27] Fetching batch from test/0 starting at offset 1
15:08:39 [   27] Fetching cluster metadata from kafka://kafka1.dev:9092
15:08:39 [   27] Opening connection to kafka1.dev:9092 with client id pjpp...
15:08:39 [   27] Sending request 1 to kafka1.dev:9092
15:08:39 [   27] Waiting for response 1 from kafka1.dev:9092
15:08:39 [   27] Received response 1 from kafka1.dev:9092
15:08:39 [   27] Discovered cluster metadata; nodes: kafka1.dev:9092 (node_id=1), kafka2.dev:9092 (node_id=2), kafka3.dev:9092 (node_id=3)
15:08:39 [   27] Closing socket to kafka1.dev:9092
15:08:39 [   27] Sending request 2 to kafka2.dev:9092
15:08:39 [   27] Waiting for response 2 from kafka2.dev:9092
15:08:39 [   27] Received response 2 from kafka2.dev:9092
15:08:39 [   27] Opening connection to kafka1.dev:9092 with client id pjpp...
15:08:39 [   27] Sending request 1 to kafka1.dev:9092
15:08:39 [   27] Waiting for response 1 from kafka1.dev:9092
15:08:40 [   27] Timed out while waiting for response 1
15:08:40 [   27] Closing socket to kafka1.dev:9092
15:08:40 [   27] Connection error while fetching messages: Connection error: Connection timed out
15:08:40 [   27] Fetching batch from test/8 starting at offset 1
15:08:40 [   27] Fetching batch from test/2 starting at offset 1
15:08:40 [   27] Fetching batch from test/5 starting at offset 1
15:08:40 [   27] Fetching batch from test/4 starting at offset 2
15:08:40 [   27] Fetching batch from test/7 starting at offset 2
15:08:40 [   27] Fetching batch from test/1 starting at offset 2
15:08:40 [   27] Fetching batch from test/9 starting at offset 2
15:08:40 [   27] Fetching batch from test/3 starting at offset 1
15:08:40 [   27] Fetching batch from test/6 starting at offset 2
15:08:40 [   27] Fetching batch from test/0 starting at offset 1
15:08:40 [   27] Fetching cluster metadata from kafka://kafka1.dev:9092
15:08:40 [   27] Opening connection to kafka1.dev:9092 with client id pjpp...
15:08:40 [   27] Sending request 1 to kafka1.dev:9092
15:08:40 [   27] Waiting for response 1 from kafka1.dev:9092
15:08:40 [   27] Received response 1 from kafka1.dev:9092
15:08:40 [   27] Discovered cluster metadata; nodes: kafka1.dev:9092 (node_id=1), kafka2.dev:9092 (node_id=2), kafka3.dev:9092 (node_id=3)
15:08:40 [   27] Closing socket to kafka1.dev:9092
15:08:40 [   27] Sending request 3 to kafka2.dev:9092
15:08:40 [   27] Waiting for response 3 from kafka2.dev:9092
15:08:40 [   27] Received response 3 from kafka2.dev:9092
15:08:40 [   27] Opening connection to kafka1.dev:9092 with client id pjpp...
15:08:40 [   27] Sending request 1 to kafka1.dev:9092
15:08:40 [   27] Waiting for response 1 from kafka1.dev:9092
15:08:40 [   27] Received response 1 from kafka1.dev:9092
15:08:40 [   27] Sending request 2 to kafka3.dev:9092
15:08:40 [   27] Waiting for response 2 from kafka3.dev:9092
15:08:40 [   27] Received response 2 from kafka3.dev:9092
15:08:40 [   27] Received batch: test/8
15:08:40 [   27] Batch size: 1
15:08:40 [   27] Batch offset lag: 0
15:08:40 [   27] Processing message: test
15:08:40 [   27] Marking test/8:1 as committed
15:08:40 [   27] Committing offsets: test/2:1, test/1:2, test/0:1, test/8:2, test/4:2, test/7:2, test/9:2, test/3:1, test/6:2, test/5:1
15:08:40 [   27] Sending request 2 to kafka1.dev:9092
15:08:40 [   27] Waiting for response 2 from kafka1.dev:9092
15:08:40 [   27] Received response 2 from kafka1.dev:9092
15:08:40 [   27] Received batch: test/2
15:08:40 [   27] Batch size: 1
15:08:40 [   27] Batch offset lag: 0
15:08:40 [   27] Processing message: test
15:08:40 [   27] Marking test/2:1 as committed
15:08:40 [   27] Received batch: test/5
15:08:40 [   27] Batch size: 1
15:08:40 [   27] Batch offset lag: 0
15:08:40 [   27] Processing message: test
15:08:40 [   27] Marking test/5:1 as committed
15:08:40 [   27] Received batch: test/4
15:08:40 [   27] Batch size: 1
15:08:40 [   27] Batch offset lag: 0
15:08:40 [   27] Processing message: test
15:08:40 [   27] Marking test/4:2 as committed
15:08:40 [   27] Received batch: test/3
15:08:40 [   27] Batch size: 1
15:08:40 [   27] Batch offset lag: 0
15:08:40 [   27] Processing message:
15:08:40 [   27] Marking test/3:1 as committed
15:08:40 [   27] Received batch: test/0
15:08:40 [   27] Batch size: 1
15:08:40 [   27] Batch offset lag: 0
15:08:40 [   27] Processing message: test
15:08:40 [   27] Marking test/0:1 as committed
dasch commented 7 years ago

What timeouts have you set? (connect_timeout, socket_timeout) and what's the max_wait_time? This could be caused by the socket/connect timeout being shorter than max_wait_time.

michaelsauter commented 7 years ago

Oh stupid us. connect_timeout and socket_timeout was 1, max_wait_time was 5. Setting socket_timeout to e.g. 6 solves the issue.

However, I'm still a bit confused:

But that aside, we have a solution now :)

Thanks for looking into this so quickly and sorry for setting socket_timeout to a stupid value :/ Should we contribute a check to emit a warning if someone sets socket_timeout lower than max_wait_time?

dasch commented 7 years ago

Happens to the best of us – I've made far worse mistakes in the past :-)

Also, we really should have better checks in place for that.

Why does this issue appear only if we have multiple brokers? Probably because a broker with messages ready to be consumed will return before the timeout – with just a single broker, you're almost guaranteed to have messages waiting when you send a fetch request. Note that ruby-kafka sends a single request per broker, containing sub-requests for every partition owned by that broker. So a single byte in a single partition may be enough.

We had min_bytes: 1 set, so the brokers should not have waited for max_wait_time anyway? max_wait_time is just that – a max. If you want to make the brokers return larger batches, you'll have to increase min_bytes.

It would be great to have a warning (or even an exception) in case max_wait_time is set larger than the socket timeouts. It may be a bit tricky to ferry those values to the right locations, though.

Try taking a look at Racecar, our soon-to-be-announced consumer framework – I think that may be a better place to do these kinds of checks. Also, please let me know if you think Racecar has any kinks.

mensfeld commented 7 years ago

@dasch @michaelsauter we do that in Karafka framework (basics but will be extended) https://github.com/karafka/karafka/blob/0.6.0wip/lib/karafka/schemas/config.rb

I will make sure to apply checking for this case.

michaelsauter commented 7 years ago

@dasch Thanks. I'll see how tricky it is to add a warning/exception.

Racecar looks really nice! I was thinking about writing some small abstraction on top of ruby-kafka when we started out, then decided to do only the bare minimum in the beginning. But we will consider Racecar once it is announced. I had a brief look already through the code and the runner looks almost identical to our executables which have a lot of copy-paste between them ;)

One thing I noticed though is that we ended up using batch processing in most cases ... maybe that is sth. that Racecar could consider as well. Anyway, that is a topic for another thread :) I think this one can be closed.

dasch commented 7 years ago

@michaelsauter :+1:

I plan to add batch processing support once I've gathered more use-cases. Batch processing is a bit more tricky since there's less interleaving of ruby-kafka and client code, and so fewer opportunities to mark messages as processed, commit offsets, and send heartbeats. It's also more difficult to pinpoint the message that caused a processing error.

Even the per-message API is batched behind the scenes, though, so the batch API is really only useful if you need to do whole-batch processing.

Racecar has already been pushed to Rubygems, I just haven't had time to write a blog post – feel free to try it out.

dasch commented 7 years ago

@michaelsauter fixed in Racecar: https://github.com/zendesk/racecar/commit/127c1d0ede45ede98a7a1950f763a5794e791ded

michaelsauter commented 7 years ago

@dasch Nice. In ruby-kafka this looks a lot trickier indeed. It would be kind of odd to pass it through all the way until it can be checked. Maybe it's best to just add a note in the comments (https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/consumer.rb#L168 and https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/consumer.rb#L235)?

dasch commented 7 years ago

Yup, that's probably the best thing to do. ruby-kafka is a sharp knife – most people will want to use something like Racecar or Karafka when writing consumers.