dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.62k stars 1.41k forks source link

KafkaConsumer using group_id with manually assigned partitions can raise unexpected IllegalStateError #1112

Closed dcrosta closed 4 years ago

dcrosta commented 7 years ago

Kafka 0.10.0.1 Kafka-Python 1.3.3

I create a consumer like:

consumer = KafkaConsumer(
    group_id="the-group",
    bootstrap_servers=[...],
    consumer_timeout_ms=30000,
)

partitions = [
    TopicPartition("the-topic", partition_num)
    for partition_num in consumer.partitions_for_topic(settings.kafka.topic)
    if partition_num % 2 == 0
]
consumer.assign(partitions)

And then consume from it using for message in consumer: in the usual way.

Some time later (a few seconds after beginning consuming in the for loop), I get:

Traceback (most recent call last):
  File "/usr/local/opt/pypy/site-packages/kafka/future.py", line 79, in _call_backs
    f(value)
  File "/usr/local/opt/pypy/lib_pypy/_functools.py", line 45, in __call__
    return self._func(*(self._args + fargs), **fkeywords)
  File "/usr/local/opt/pypy/site-packages/kafka/coordinator/consumer.py", line 547, in _handle_offset_commit_response
    self._subscription.mark_for_reassignment()
  File "/usr/local/opt/pypy/site-packages/kafka/consumer/subscription_state.py", line 172, in mark_for_reassignment
    raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions.

I don't call subscribe() anywhere, nor assign() anywhere else or again. This looks like it's coming from _handle_offset_commit_response asynchronously, but I don't know why it would be.

Mostly I want to understand if I am supposed to do anything with this error, and, given that it's happening asynchronously, if I even can do anything with it (will it bubble up in a try: block anywhere? I think not, if it's on another thread, for instance)

dcrosta commented 7 years ago

I should be clear, I'm seeing that in the output of a logging.error call:

2017-05-30 19:17:00,136 [kafka.future:ERROR] Error processing callback
Traceback (most recent call last):
...

I've just restarted the process again, and did not observe this error logged again. From a glance at the code in https://github.com/dpkp/kafka-python/blob/master/kafka/coordinator/consumer.py#L540-L559, it looks like this error is being 'handled' by saying, "it happened, not much we can do about it", and moving on with our lives. Presumably future offset commits will happen at their regularly scheduled intervals.

dpkp commented 7 years ago

This looks like a bug caused by using a group_id while assigning partitions manually. Our test suite doesn't cover that use case particularly well yet. I suspect there are a few edge cases here and there that need to get handled in this scenario. Nonetheless, it should be fine to move on with your lives in this case :)

dcrosta commented 7 years ago

OK thanks -- let me know if I can help here at all.

dpkp commented 7 years ago

I believe this is fixed by #1266

ryanjmccall commented 6 years ago

I have a similar issue where I have some processes consuming a topic in subscribe mode. Then I stop those processes (invoking close()). Shortly afterwards, if I try to advance the offsets (in a separate script) on the same topic I get the IllegalStateError asynchronously and the process hangs. I am basically doing the following:

    consumer.assign(topicPartitions)
    consumer.seek_to_end(*topicPartitions)
    for tp in topicPartitions:
        consumer.commit({tp: OffsetAndMetadata(consumer.position(tp), None)})

The issue disappears after several minutes.

dpkp commented 4 years ago

Fixed in #1364