dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.54k stars 1.39k forks source link

what's the problem with consumer group #199

Closed ghost closed 9 years ago

ghost commented 9 years ago

I use the group as the right way, but it just can't do the way i want. here is my code,

#!/usr/bin/env python

import sys

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

def main():
    if len(sys.argv) != 2:
        sys.exit(0)

    kafka = KafkaClient("localhost:9092")
    if sys.argv[1] == "put":
        producer = SimpleProducer(kafka)
        resp = producer.send_messages("my-topic", "some message")
        print resp
    elif sys.argv[1] == "get":
        consumer = SimpleConsumer(kafka, "my-foo-group", "my-topic")
        for message in consumer:
            print message

if __name__ == "__main__":
    main()

What i want is , if i send "my-topic" a message, only one consumer can get this message from the group("my-foo-group") However, what i found out is, no matter how many consumer process i start, all of them will get this message at the end. Am i wrong or it's the problem of kafka python client ?

wizzat commented 9 years ago

Consumer groups are more about offset management than about preventing double consumption between consumers. There is a concept called coordinated consumer groups, but that is not available in non-JVM clients.

The way that I handle this is to spin up a python consumer per partition instead of having every consumer read every partition.

On Aug 21, 2014, at 3:18, cc notifications@github.com wrote:

I use the group as the right way, but it just can't do the way i want. here is my code,

!/usr/bin/env python

import sys

from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer, KeyedProducer

def main(): if len(sys.argv) != 2: sys.exit(0)

kafka = KafkaClient("localhost:9092") if sys.argv[1] == "put": producer = SimpleProducer(kafka) resp = producer.send_messages("my-topic", "some message") print resp elif sys.argv[1] == "get": consumer = SimpleConsumer(kafka, "my-foo-group", "my-topic") for message in consumer: print message if name == "main": main()

What i want is , if i send "my-topic" a message, only one consumer can get this message from the group("my-foo-group") However, what i found out is, no matter how many consumer process i start, all of them will get this message at the end. Am i wrong or it's the problem of kafka python client ?

— Reply to this email directly or view it on GitHub.

rqc commented 9 years ago

@morndust I started using kafka-python just recently as well and found out that this is a feature/limitation as per #173. I ended up using kafka-python for my producers and https://github.com/bpot/poseidon, a kafka ruby client, to accomplish what you are asking in the consumer side.

Basically my use case is that I want to scale horizontally the consumption of messages in a consumer topic by spinning more consumer processes in different VMs, not just the current one, which is what MultipleProcessConsumer accomplishes. Kafka-python is not supporting this because once you turn on a new process then it starts from the beginning instead from the real offset, and the same messages are consumed by all the consumers within that topic.

Regardless, kafka-python really works well for everything else. Thanks to everyone involved in this project, it is really a relief not to have to deal with the JVM.

wizzat commented 9 years ago

To be clear: Kafka-Python supports offset management and resumption. It does not support having C consumers and P partitions and automatically distributing load without duplicate readers for a message. If you need help getting resuming from an offset working, we'd be glad to help you out.

mumrah commented 9 years ago

@morndust the group param in SimpleConsumer (my-foo-group) is only used for offset storing and retrieval, not coordinated consumption. Coordinated consumers (aka high-level consumers or "balanced" consumers) are only available to JVM clients and a few non-JVM clients, but not kafka-python.

@wizzat maybe we should put a note in the README to make this clear?

ghost commented 9 years ago

@mumrah @wizzat

In summarize, the only solution to use coordinated consumer groups is split consumer to different partition ?

ps: complete the README would be appriciate for new user of kafka like me.

ghost commented 9 years ago

One more question.... "There is a concept called coordinated consumer groups, but that is not available in non-JVM clients" by @wizzat why this kind of thing happen? can you explain a little bit ? thanks in advance !

mumrah commented 9 years ago

Currently, the "high-level" JVM consumers use ZK to coordinate which partitions are read by which threads. Each consuming thread in the JVM consumer will be reading from at least one partition, and these consumer threads can exist across multiple JVMs. This means you can create one logical "consumer group" that consists of several threads across several JVMs, e.g. a topic with 32 partitions could be read by 4 JVMs with 8 threads each and the data would be evenly distributed among the consumers.

The reason we haven't added this feature is that there is a complex algorithm involving ZooKeeper to make sure a thread is consuming the correct partition at the correct offset. There are plans to redesign this "coordinated consumption" in Kafka so that it does not depend on ZooKeeper. This will make it easier for clients like kafka-python to do this kind of thing.

So, in other words, we'll have it eventually.

HTH

ghost commented 9 years ago

thank you so much ! @mumrah

ddieterly commented 9 years ago

I understand that kafka-python does not do consumer rebalancing (consumer failover) when consumers come and go. That has been made clear by this discussion thread. How does kafka-python handle broker rebalancing? Does it handle broker rebalancing?

From https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design we see that the high-level Java Consumers do the following:

1) Auto/Hidden Offset Management 2) Auto(Simple) Partition Assignment 3) Broker Failover => Auto Rebalance 4) Consumer Failover => Auto Rebalance

We know that kafka-python does not do #4, the consumer failover. Which of the others does kafka-python handle? Or conversely, which of the others does kakfa-python not handle? It appears to be handling #1 but none of the others. Can we get confirmation on that?

Thanks.