joekiller / logstash-kafka

Kafka plugin for Logstash
Apache License 2.0
206 stars 63 forks source link

Producer loses a message when broker goes away #4

Closed bodgit closed 10 years ago

bodgit commented 10 years ago

With a simple producer, if I bounce the Kafka broker the first message sent after that is lost before the producer code works out the broker has disappeared and attempts a reconnect.

Here's my test steps:

  1. Broker running, start producer. lsof reports no TCP connection
  2. Produce a message. lsof now shows an ESTABLISHED connection to Kafka. Consumer emits message
  3. Restart broker. Connection is now in CLOSE_WAIT state
  4. Produce another message, doesn't show up in consumer. Connection is marked CLOSED
  5. Produce another message, this shows up in consumer. There's now a new ESTABLISHED connection to the broker

I'm also testing using a partitioned topic, (currently split into three partitions across three brokers with an extra replica), the consumer seems to keep connected to the partition leaders if I restart a single broker but the producer shows the above behaviour before it picks another broker to connect to.

Incidentally, should there be some way of configuring how the producer works with a partitioned topic?

joekiller commented 10 years ago

I think I need to expose request.required.acks in jruby-kafka and let you select its value. Right now the value defaults to 0. I'll try to get that option exposed and configurable by the plugin.

Here is kafka's docs about this (http://kafka.apache.org/documentation.html#producerconfigs):

This value controls when a produce request is considered completed. Specifically, how many other brokers must have committed the data to their log and acknowledged this to the leader? Typical values are

0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.

As for your question about the partitioning I'm not sure what you are asking so please let me know if: are you saying that the messages are not being evenly distributed across the partitions? Are asking why the producer stays connected to other brokers when only the first is specified? Are you asking how to configure the producer to select which partitions to hit?

Sorry for the counter questions, just trying to make sure I understand what you are asking.

bodgit commented 10 years ago

I guess at the moment I don't have any visibility on how messages are being distributed across partitions. I'm not familiar with all of the Kafka tools so I don't know if there's a tool there that can show the distribution, or maybe it's something that could be added to the kafka field that is added to events and then it would be easy to visualise through Elasticsearch.

I (perhaps mistakenly) assumed that a producer must be connected to each partition leader in order to publish to each partition, much like the consumer appears to be.

Producers also seem to be able to specify the partition key themselves according to the docs and this example. Obviously creating custom partitioners within Logstash would get ugly, but maybe allowing a simple partition => [0-n] config option would be useful?

joekiller commented 10 years ago

by default kafka uses a serializer your class that does a hash off of a key of your message.

 serializer.class   kafka.serializer.DefaultEncoder The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
key.serializer.class        The serializer class for keys (defaults to the same as for messages if nothing is given).

I think, not positive, If you don't provide a key it uses a hash of the message. This default setting should distribute the messages evently amongst the primary partitions on the primary broker. I'm not sure if you are aware of this but when Kafka connects to a broker it actually receives metadata about all the breakers and where all the partitions are so you only have to provide the metadata of one broker for to know about the entire Kafka cluster. I'm not sure if you can't really tell the producer to only produce to a particular partition in a particular topic you may do better to just make a topic with just one partition if you want all the data to sit in that one spot.