confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
140 stars 898 forks source link

auto.offset.reset ignored when specified at top level #350

Closed mhowlett closed 6 years ago

mhowlett commented 6 years ago

Description

Specifying 'auto.offset.reset' as a top level property does not result in an error, but seems to be ignored.

How to reproduce

c = Consumer({
    'bootstrap.servers': 'SASL_SSL://<ccloud bootstrap servers>',
    'api.version.request': True,
    'broker.version.fallback': '0.10.0.0',
    'api.version.fallback.ms': 0,
    'sasl.mechanisms': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'ssl.ca.location': '/usr/local/etc/openssl/cert.pem',
    'sasl.username': '<omitted>',
    'sasl.password': '<omitted>',
    'group.id': str(uuid.uuid1()),
    'auto.offset.reset': 'smallest'
    # 'default.topic.config': {'auto.offset.reset': 'smallest'} - this works
})

c.subscribe(['python-test-topic'])

try:
    while True:
        msg = c.poll(0.1)
        if msg is None:
            continue
        elif not msg.error():
            print('consumed: {0}'.format(msg.value()))
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('end of partition: {0}/{1}'.format(msg.topic(), msg.partition()))
        else:
            print('error: {0}'.format(msg.error().str()))

except KeyboardInterrupt:
    pass

finally:
    c.close()

Checklist

Please provide the following information:

edenhill commented 6 years ago

We should merge default.topic.config with the global dict and then rely on global-topic fallthru. And deprecate default.topic.config.

phanatics commented 6 years ago

i spent 2 days on this!!!! thanks @mhowlett

@edenhill this must be somehow noted in the docs.

if in Consumer side {'auto.offset.reset': 'earliest'} doesn't work for you even tough you're on latest stable versions, use this: 'default.topic.config': {'auto.offset.reset': 'smallest'} instead.

cheers. feel like a block off my chest.

knil-sama commented 6 years ago

Both didn't work on the AvroConsumer :sob:

rnpridgeon commented 6 years ago

This should be fixed with https://github.com/confluentinc/confluent-kafka-python/pull/446 which will be in the next release. The AvroConsumer passes the value through to librdkafka so that's a bit odd you are finding this does not work.

Can you verify your consumer group does not currently have any valid offsets from which to recover from. Remember that auto.offset.reset only applies when a valid offset can't be found. If your consumer group is able to recover committed offsets within a valid offset range the reset policy is not enacted.

rnpridgeon commented 6 years ago

fixed with #446