confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
95 stars 894 forks source link

Can you consume without using a consumer group? #250

Closed zhaojack closed 7 years ago

zhaojack commented 7 years ago

Is there a way to use confluent-kafka-python to consume without setting group.id?

The platform documentation said "You should always configure group.id unless you are using the simple assignment API and you don’t need to store offsets in Kafka" and didn't hint anything's different about the python API (or maybe I was just blind). In the python API I see I can call consumer.assign. I tried, and it gave me an _UNKNOWN_GROUP error.

edenhill commented 7 years ago

While the group.id is technically not required from a Kafka standpoint until you want to commit offsets, this client implementation requires the group.id to be set. But if you are not going to commit/retrieve offsets and only use the assign() API you can set the group.id to anything.

zhaojack commented 7 years ago

So, if I set group.id to say, testgroup12345, as long as I don't touch the .subscribe or .commit functions, the server would never hear testgroup12345 so I wouldn't need to worry about littering the overall list of consumer groups with random test groups. Am I understanding this right?

Thanks for the quick answer by the way.

edenhill commented 7 years ago

Exactly, with two additions:

zhaojack commented 7 years ago

Awesome. I will give that a try. Thank you!

zhaojack commented 7 years ago

Behavior confirmed. Closing.

swenzel commented 4 years ago

I'm afraid this will not bypass all the consumer group mechanisms. If you have ACL enabled and your kafka user is not allowed to create consumer groups with the given id, you cannot consume:

%7|1598524567.315|CGRPQUERY|rdkafka#consumer-9| [thrd:main]: sasl_plaintext://broker03:9093/6: Group "swenzelgroup": querying for coordinator: intervaled in state query-coord
%7|1598524567.315|CGRPSTATE|rdkafka#consumer-9| [thrd:main]: Group "swenzelgroup" changed state query-coord -> wait-coord (v3, join-state assigned)
%7|1598524567.315|BROADCAST|rdkafka#consumer-9| [thrd:main]: Broadcasting state change
%7|1598524567.315|SEND|rdkafka#consumer-9| [thrd:sasl_plaintext://broker03:]: sasl_plaintext://broker03:9093/6: Sent FindCoordinatorRequest (v2, 36 bytes @ 0, CorrId 9)
%7|1598524567.377|RECV|rdkafka#consumer-9| [thrd:sasl_plaintext://broker03:]: sasl_plaintext://broker03:9093/6: Received FindCoordinatorResponse (v2, 18 bytes, CorrId 9, rtt 61.97ms)
%7|1598524567.377|CGRPCOORD|rdkafka#consumer-9| [thrd:main]: sasl_plaintext://broker03:9093/6: Group "swenzelgroup" FindCoordinator response error: GROUP_AUTHORIZATION_FAILED: Broker: Group authorization failed
%7|1598524567.377|CGRPSTATE|rdkafka#consumer-9| [thrd:main]: Group "swenzelgroup" changed state wait-coord -> query-coord (v3, join-state assigned)
%7|1598524567.377|BROADCAST|rdkafka#consumer-9| [thrd:main]: Broadcasting state change
%7|1598524568.314|CGRPQUERY|rdkafka#consumer-9| [thrd:main]: sasl_plaintext://broker03:9093/6: Group "swenzelgroup": querying for coordinator: intervaled in state query-coord
%7|1598524568.314|CGRPSTATE|rdkafka#consumer-9| [thrd:main]: Group "swenzelgroup" changed state query-coord -> wait-coord (v3, join-state assigned)
...
deeTEEcee commented 3 years ago

with the latest version (1.5.0), i followed https://github.com/confluentinc/confluent-kafka-python/issues/250#issuecomment-331377925 but ill still get errors requiring group to be set up when i run consumer.poll.

edenhill commented 3 years ago

@deeTEEcee You can, but you are required to supply a group.id anyway, but it won't be used unless you call subscribe(), committed() or commit(), so if you're not using any of those functions you can simply generate a unique group.id (e.g. using uuid4())

deeTEEcee commented 3 years ago

sorry, to be more specific, ill get the error even if i added in the group and disabled it.

current source code just to make things easier to see:

config = {
    "bootstrap.servers": rawq_brokers or os.environ.get("RAWQ_BROKERS"), # e.g: rawq-prod-0.rawq.sightmachine.io:9093
    "sasl.username": os.environ.get("RAWQ_USERNAME"),
    "sasl.password": os.environ.get("RAWQ_PASSWORD"),
    "sasl.mechanisms": "SCRAM-SHA-256",
    "security.protocol": "SASL_SSL",
    "group.id": 'random',
    "auto.offset.reset": 'earliest',
    "enable.auto.commit": False # we have to manually maintain this unless we add groups in the ACL.
    # "ssl.ca.location": "/etc/ssl/certs"
}

def basic_consume_loop(consumer):
    running = True
    records = []
    try:

        while running:
            if len(records) == 5:
                break
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                records.append(msg)
                # msg_process(msg)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()
    print(records)

consumer = Consumer(config)
topics = consumer.list_topics().topics
partitions = [TopicPartition(my_topic, partition=partition, offset=0) for partition in list(topics[my_topic].partitions.keys())]
consumer.assign(partitions)

basic_consume_loop(consumer)
edenhill commented 3 years ago

What errors are you seeing?

deeTEEcee commented 3 years ago
/Users/dtcsight/.pyenv/versions/ftx/bin/python /opt/sightmachine/factorytx-core/scripts/rawq/confluent_kafka-test.py
Traceback (most recent call last):
  File "/opt/sightmachine/factorytx-core/scripts/rawq/confluent_kafka-test.py", line 74, in <module>
    basic_consume_loop(consumer)
  File "/opt/sightmachine/factorytx-core/scripts/rawq/confluent_kafka-test.py", line 56, in basic_consume_loop
    raise KafkaException(msg.error())
cimpl.KafkaException: KafkaError{code=GROUP_AUTHORIZATION_FAILED,val=30,str="FindCoordinator response error: Not authorized to access group: Group authorization failed."}
edenhill commented 3 years ago

Ah, right, we still look up the coordinator to be able to commit offsets, and that lookup will fail in this case.

Is it necessary to have this group acl in place?

deeTEEcee commented 3 years ago

Not sure if that was a question for me but yeah, I do need the group acl activated for this. Otherwise, it breaks.

deeTEEcee commented 3 years ago

@edenhill I just learned today we also have this ticket which describes a very similar situation: https://github.com/edenhill/librdkafka/issues/3261

My team also uses a Java client for kafka and we were pretty confused why group.id was needed on the python consumer side even with the same calls we use for Java WITHOUT a group.id but I think I've done enough attempts to say "hey, we're stuck with group.id for now"

ethanttbui commented 1 week ago

@edenhill May I ask if there is any update to this topic? I tested the behavior mentioned by @deeTEEcee myself and found that ACL error does not happen to me. I am using confluent-kafka==2.5.3