Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

It will cause continuous continue when offset in message > next_offset #989

Open yulezheng opened 4 years ago

yulezheng commented 4 years ago

pykafka version: 2.8.0

from pykafka import KafkaClient
import logging
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S',
                    )

def connect_kafka():
    client = KafkaClient(hosts="127.0.0.1:9092", broker_version="2.1.0")   
    topic_name='test_topic'
    topic = client.topics[topic_name]
    consumer = topic.get_simple_consumer(
        consumer_group=b'pytest',
        auto_commit_enable=False, 
           consumer_id=b'test1',      
    )
    return consumer
consumer=connect_kafka()
offset = 228704876
consumer.commit_offsets(partition_offsets=[(consumer.partitions[0], offset)])
print consumer.fetch_offsets()[0][1].offset
consumer.reset_offsets(partition_offsets=[(consumer.partitions[0], offset)])
print consumer.fetch_offsets()[0][1].offset

When run the code above, it will cause many logs like this: Fri, 03 Jan 2020 16:22:43 simpleconsumer.py[line:996] DEBUG Skipping enqueue for offset (228704877) not equal to next_offset (228704876)

This will not happen when deleting any of the last four rows.

debug log in code here: https://github.com/Parsely/pykafka/blob/ebbc5c70901237e60bd6654336675886793fb8d9/pykafka/simpleconsumer.py#L994