confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
81 stars 890 forks source link

Offset saved on broker are ignored when Consumer starts #176

Closed jbfuzier closed 7 years ago

jbfuzier commented 7 years ago

Hi,

I think I found an issue with the commit for consumers. On current master of confluent-kafka-python and librdkafka, Kafka v0.10. There are messages in the topic.

To reproduce the issue, run sample script, wait until auto commit get called (it does not raise any error). Stop by pressing CTRL+C (wait for closing). Launch the script again, offset are not set in assignment callback. Also if we set 'auto.offset.reset': 'error', it returns KafkaError{code=_NO_OFFSET,val=-168,str="no previously committed offset available"}.

Kafka ACLs :

# Current ACLs for resource `Topic:kafka`:
        # User:kafka has Allow permission for operations: Write from hosts: *
        # User:kafka has Allow permission for operations: Read from hosts: *
        # User:kafka has Allow permission for operations: Describe from hosts: *

# Current ACLs for resource `Group:kafka`:
        # User:kafka has Allow permission for operations: Read from hosts: *
        # User:kafka has Allow permission for operations: Describe from hosts: *

# Current ACLs for resource `Cluster:kafka-cluster`:
        # User:kafka has Allow permission for operations: Create from hosts: *

Sample script :

from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import logging
import os

librdkafka_conf = {
    'bootstrap.servers': "10.252.18.190:9092",
    # 'client.id': 'kafkaclient1',
    'group.id': "kafka",
    # 'compression.codec': "none",
    'session.timeout.ms': 6000,
    'security.protocol': "sasl_plaintext",
    "default.topic.config": {"topic.metadata.refresh.interval.ms": 20000},
    'sasl.username': "kafka",
    'sasl.password': "password",
    'sasl.mechanisms': "PLAIN",
    'statistics.interval.ms': 10000,
    'enable.partition.eof': False,
    'api.version.request': False,
    'offset.store.method': None, # Get all messages back from beginning
    'broker.version.fallback': '0.10.0.1',
    'enable.auto.commit': True,
    'auto.commit.interval.ms': 10000,
    'default.topic.config': {
        # 'auto.offset.reset': 'earliest',
        # 'auto.offset.reset': 'error', 
        # 'auto.commit.enable': False, # Not required with high level consumer
        # 'offset.store.method': "file", # By default broker is used
    }
}

class KafakConsumer():
    def __init__(self):
        librdkafka_conf['on_commit'] = self._offset_commit_cb
        librdkafka_conf['error_cb'] = self._error_cb
        self.c = Consumer(**librdkafka_conf)
        self.c.subscribe(["kafka"], on_assign=self._print_assignment, on_revoke=self._print_revoke)
        self.partitions = []

    def run(self):
        while True:
            msg = self.c.poll(timeout=30.0)
            if msg is None or msg.error():
                # Error or event
                if msg is None:
                    logging.debug("Timeout while waiting for message")
                elif  msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                elif msg.error():
                    # Error
                    logging.exception(KafkaException(msg.error()))
            else:
                # Proper message
                payload = msg.value()

    def _error_cb(self, kafka_error):
        logging.critical("Got error from kafka : %s"%kafka_error.str())

    def _offset_commit_cb(self, err, partitions):
        logging.info("Autocommit cb Position : %s"%self.c.position(partitions))
        if err and err.code() != KafkaError.NO_ERROR:
            logging.critical("Offset committed for partitions %s, result %s"%(partitions, err.str()))
        else:
            logging.info("Offset committed for partitions %s"%(partitions))

    def _print_assignment (self, consumer, partitions):
        logging.warning('Assignment: %s'%partitions)
        self.partitions = partitions
        logging.info("Assignment position : %s"%self.c.position(self.partitions))

    def _print_revoke (self, consumer, partitions):
        logging.warning('Revoke: %s'%partitions)

    def exit(self):
        logging.info("Exit position : %s"%self.c.position(self.partitions))
        try:
            self.s.close()
        except:
            pass
        logging.warning("Process exiting...")
        exit()

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    logging.debug("Main process started with pid %s"%os.getpid())
    k = KafakConsumer()
    try:
        k.run()
    except KeyboardInterrupt:
        logging.critical('Receive SIGINT')
        k.exit()
root@kafkaqrdtest:~# python kafka_offset_commit.py
DEBUG:root:Main process started with pid 15820
WARNING:root:Assignment: [TopicPartition{topic=kafka,partition=3,offset=-1001,error=None}, TopicPartition{topic=kafka,partition=4,offset=-1001,error=None}, TopicPartition{topic=kafka,partition=5,offset=-1001,error=None}]
INFO:root:Assignment position : [TopicPartition{topic=kafka,partition=3,offset=-1001,error=None}, TopicPartition{topic=kafka,partition=4,offset=-1001,error=None}, TopicPartition{topic=kafka,partition=5,offset=-1001,error=None}]
INFO:root:Autocommit cb Position : [TopicPartition{topic=kafka,partition=3,offset=3380962,error=None}, TopicPartition{topic=kafka,partition=4,offset=3297340,error=None}, TopicPartition{topic=kafka,partition=5,offset=3382540,error=None}]
INFO:root:Offset committed for partitions [TopicPartition{topic=kafka,partition=3,offset=3380962,error=None}, TopicPartition{topic=kafka,partition=4,offset=3297340,error=None}, TopicPartition{topic=kafka,partition=5,offset=3382539,error=None}]
^CCRITICAL:root:Receive SIGINT
INFO:root:Exit position : [TopicPartition{topic=kafka,partition=3,offset=3381912,error=None}, TopicPartition{topic=kafka,partition=4,offset=3298290,error=None}, TopicPartition{topic=kafka,partition=5,offset=3383390,error=None}]
WARNING:root:Process exiting...
root@kafkaqrdtest:~# python kafka_offset_commit.py
DEBUG:root:Main process started with pid 15828
WARNING:root:Assignment: [TopicPartition{topic=kafka,partition=3,offset=-1001,error=None}, TopicPartition{topic=kafka,partition=4,offset=-1001,error=None}, TopicPartition{topic=kafka,partition=5,offset=-1001,error=None}]
INFO:root:Assignment position : [TopicPartition{topic=kafka,partition=3,offset=-1001,error=None}, TopicPartition{topic=kafka,partition=4,offset=-1001,error=None}, TopicPartition{topic=kafka,partition=5,offset=-1001,error=None}]
^CCRITICAL:root:Receive SIGINT
INFO:root:Exit position : [TopicPartition{topic=kafka,partition=3,offset=3384478,error=None}, TopicPartition{topic=kafka,partition=4,offset=3300873,error=None}, TopicPartition{topic=kafka,partition=5,offset=3386057,error=None}]
WARNING:root:Process exiting...
edenhill commented 7 years ago

What is the intention with this: 'offset.store.method': None, # Get all messages back from beginning ?

Are you trying to disable the offset store? Or reverting it to default?

Also, you have duplicate keys in the librdkafka_config: default.topic.config

jbfuzier commented 7 years ago

Sorry, this was a wrong copy paste from other tests.

The proper config (without the 2 errors you noticed) has the exact same behavior.

librdkafka_conf = {
    'bootstrap.kafka': "10.252.18.190:9092",
    'group.id': "kafka",
    'session.timeout.ms': 6000,
    'security.protocol': "sasl_plaintext",
    'sasl.username': "kafka",
    'sasl.password': "password",
    'sasl.mechanisms': "PLAIN",
    'statistics.interval.ms': 10000,
    'enable.partition.eof': False,
    'api.version.request': False,
    'broker.version.fallback': '0.10.0.1',
    'enable.auto.commit': True,
    'auto.commit.interval.ms': 10000,
    'topic.metadata.refresh.interval.ms': 20000,
    'default.topic.config': {
        'auto.offset.reset': 'earliest',
    }
}
StrashkoSergey commented 7 years ago

experiencing same issue. on_assing: allways gets offset== -1001L even if submit was called manually. Also, on_submit is not called when do: consumer.commit(msg, async=False) but get called on consumer.close() and restarting after calling consumer.close() cause same behaviour: on_assing: offset = -1001L

edenhill commented 7 years ago

There is nothing in your logs that looks strange to me.

on_assign is called before committed offsets are fetched from the broker (to allow the application to use its own offsets instead), that's why all partitions have offset -1001 (which is the INVALID/unassigned/default offset constant). That is to say: the offsets are not relevant in on_assign, unless you update the offsets yourself and call assign() explicitly from on_assign.

position() reports the current fetch position, but the fetcher has not yet been started in on_assign since assign() hasn't been called yet: assign() will be called automatically when on_assign/on_revoke returns unless the application calls assign() from those callbacks.

sync commits with commit(async=False) will not trigger the on_commit callback, only automatic and async commits will do that (since that's the only way to find out the commit results).

StrashkoSergey commented 7 years ago

@edenhill thanks for your response. I got it working.

jbfuzier commented 7 years ago

thanks for the explanation

ParkerHao commented 7 years ago

@edenhill I have a related question: will auto.commit handle the case when consumer.close() is called? In other words, do we need manual on_revoke handlers if we are using auto commit?

Thanks advanced!

edenhill commented 7 years ago

@ParkerHao final offsets will be committed automatically on close() regardless of on_revoke.

madtimber commented 4 years ago

on_assign is called before committed offsets are fetched from the broker (to allow the application to use its own offsets instead), that's why all partitions have offset -1001 (which is the INVALID/unassigned/default offset constant). That is to say: the offsets are not relevant in on_assign, unless you update the offsets yourself and call assign() explicitly from on_assign.

position() reports the current fetch position, but the fetcher has not yet been started in on_assign since assign() hasn't been called yet: assign() will be called automatically when on_assign/on_revoke returns unless the application calls assign() from those callbacks.

This would be AMAZING to have in the documentation for on_assign. I've been banging my head against a wall wondering why a callback named on_assign wasn't giving me all the details about the partitions the consumer was assigned to.

Is there a "lifecycle" diagram for a Consumer (and Producer for that matter) we could reference?

When in the lifecycle of a consumer is the earliest possible chance to know which offset of a partition the consumer will start at?