wbarnha / kafka-python-ng

Fork for Python client for Apache Kafka
https://wbarnha.github.io/kafka-python-ng/
Apache License 2.0
78 stars 11 forks source link

KAFKA-3177: consumer hangs when assigned to partition that doesn't exist #124

Open wbarnha opened 8 months ago

wbarnha commented 8 months ago

Hi guys, the code below doesn't work. I'm probably doing something wrong, but I couldn't make it to work. Kafka-python version is 1.1.1 and Kafka is 0.8.2.2.

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer, KafkaProducer, TopicPartition

def test_it():
    consumer = KafkaConsumer(bootstrap_servers="localhost:9092", consumer_timeout_ms=100)
    consumer.assign([TopicPartition("frontera-todo", 1)])  # make sure frontera-todo has only one partition

    m = next(consumer)  # hang

    consumer.close()

test_it()

Kafka broker output [2016-05-11 14:46:14,669] ERROR Closing socket for /0:0:0:0:0:0:0:1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 16 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:64) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50) at kafka.network.Processor.read(SocketServer.scala:450) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745)

Traceback on KeyboardInterrupt

Traceback (most recent call last):
  File "/Users/sibiryakov/src/sh/crawl-frontier/frontera/tests/test_kafka2.py", line 25, in <module>
    test_it()
  File "/Users/sibiryakov/src/sh/crawl-frontier/frontera/tests/test_kafka2.py", line 11, in test_it
    m = next(consumer)
  File "/usr/local/lib/python2.7/site-packages/six.py", line 558, in next
    return type(self).__next__(self)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 844, in __next__
    return next(self._iterator)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 779, in _message_generator
    self._update_fetch_positions(partitions)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 762, in _update_fetch_positions
    self._fetcher.update_fetch_positions(partitions)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/fetcher.py", line 162, in update_fetch_positions
    self._reset_offset(tp)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/fetcher.py", line 188, in _reset_offset
    offset = self._offset(partition, timestamp)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/fetcher.py", line 221, in _offset
    self._client.poll(future=refresh_future, sleep=True)
  File "/usr/local/lib/python2.7/site-packages/kafka/client_async.py", line 430, in poll
    responses.extend(self._poll(timeout, sleep=sleep))
  File "/usr/local/lib/python2.7/site-packages/kafka/client_async.py", line 445, in _poll
    for key, events in self._selector.select(timeout):
  File "/usr/local/lib/python2.7/site-packages/kafka/selectors34.py", line 598, in select
    kev_list = self._kqueue.control(None, max_ev, timeout)
KeyboardInterrupt`

It certainly shouldn't hang, instead raise an exception.