elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
109 stars 3.51k forks source link

Kafka read balance between logstash instances #4397

Open kkpapa opened 8 years ago

kkpapa commented 8 years ago

I'm using logstash2.1(logstash-2.1.1-1.noarch.rmp). I hit a issue about the balancing between consumer threads of multiple logstash instance. I have 1 topic with 24 partitions, 3 logstash instances to consume this topic. The kafka input config on each logstash instance is:

input {
    kafka {
        zk_connect => "x.x.x.x:2181, y.y.y.y:2181, z.z.z.z:2181"
        topic_id => "test"
        consumer_threads => 8
        fetch_message_max_bytes => 20971520
    }
}

However, I found the topic was consumed not evenly from Kafka:

$./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group logstash --topic test
Group           Topic                          Pid Offset          logSize         Lag             Owner
logstash        test                           0   15915           16401           486             host1
logstash        test                           1   17535           20657           3122            host1
logstash        test                           2   9833            9833            0               host1
logstash        test                           3   12822           12871           49              host1
logstash        test                           4   7599            7705            106             host1
logstash        test                           5   18055           18057           2               host1
logstash        test                           6   14273           14395           122             host1
logstash        test                           7   15870           16292           422             host1
logstash        test                           8   2211            2211            0               host2
logstash        test                           9   4442            4442            0               host2
logstash        test                           10  8854            8854            0               host2
logstash        test                           11  5618            5618            0               host2
logstash        test                           12  4460            4460            0               host2
logstash        test                           13  6607            6607            0               host2
logstash        test                           14  4432            4432            0               host2
logstash        test                           15  8882            8882            0               host2
logstash        test                           16  4536            6347            1811            host2
logstash        test                           17  8867            41720           32853           host2
logstash        test                           18  5420            10002           4582            host2
logstash        test                           19  5106            8267            3161            host2
logstash        test                           20  18694           18694           0               host3
logstash        test                           21  9054            9054            0               host3
logstash        test                           22  4444            4444            0               host3
logstash        test                           23  4400            4402            2               host3

host1 consumed 8 partitions, but host2 consumed 12 partitions, it should be 8 partitions. So host3 could only consume 4 partitions. We could also see that the additional 4 partitions consumed by host2 have many messages lagged behind.

Any idea to balance consumers across multiple hosts?

Thanks, Kenny

Xylakant commented 8 years ago

we set the number of consumer_threads for the kafka input so that (num_processes * num_consumer_threads) == num_partitions. That is, in your case set the number of threads to 8. Each thread will read from a single partition.

talevy commented 8 years ago

this should be fixed in the latest release. have you tried upgrading to the latest version of logstash-input-kafka?