dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.6k stars 1.4k forks source link

consumer does not work with HDP kafka 0.8.2.2.3 / HDP 2.3.0.0 #491

Closed mathewned closed 8 years ago

mathewned commented 8 years ago

Kafka version : 0.8.2.2.3 (HDP kafka_2_3_0_0_2557-0.8.2.2.3.0.0-2557.el6) kafka-python version : 0.9.5

A simple object creation fails with NotCoordinatorForConsumerCode the exception consumer = KafkaConsumer('testAnishm',group_id='my_group',bootstrap_servers=['localhost:6667'],auto_commit_enable=True,auto_commit_interval_ms=1000,auto_offset_reset='smallest')

kafka.common.NotCoordinatorForConsumerCode: OffsetFetchResponse(topic='testAnishm', partition=0, offset=-1, metadata='', error=16)

similar to #311 , i am using the latest version of kafka-python. Not sure what i a missing , any suggestions ?

mathewned commented 8 years ago

@dpkp Do you have any suggestions to get around this error.

Thanks Mathew

dpkp commented 8 years ago

This error has to do with the backing store used for offset commits. Older servers used zookeeper to store offsets, while newer server versions are transitioning to internal kafka topics to store consumer offsets. the current release of kafka-python does not support the newer kafka topic offset storage. How have you configured your server offset storage?

mathewned commented 8 years ago

We are still using zookeeper based offsets storage, haven't yet migrated to kafka based offset storage.

dpkp commented 8 years ago

Can you post the exact code, stack trace, and also debug logs? The bits above show a consumer initialized with 'topic1' but an error for topic 'testAnishm' .

I tried to reproduce crash locally by starting up a fresh 3-node 0.8.2.2 cluster and creating a consumer

consumer = KafkaConsumer('topic1',group_id='my_group',bootstrap_servers=bootstrap,auto_commit_enable=True,auto_commit_interval_ms=1000,auto_offset_reset='smallest')

but this works w/o crashes, both with no topic created and after calling client.ensure_topic_exists('topic1')

mathewned commented 8 years ago

Sorry about the topic name typo. I corrected the initial comment. Here is the exact code and debug log

import config
import logging
import sys
from kafka import KafkaConsumer
logger = logging.getLogger('kafka')
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)

consumer = KafkaConsumer('testAnishm',group_id='my_group',bootstrap_servers=['r39ln00:6667'],auto_commit_enable=True,auto_commit_interval_ms=1000,auto_offset_reset='smallest')

kafka.txt client.txt

dpkp commented 8 years ago

That looks like the bug in kafka 0.8.2-beta, discussed more here: https://github.com/dpkp/kafka-python/issues/311

What version of the server are you installing? I haven't heard of 0.8.2.2.3 (HDP 2.3)

mathewned commented 8 years ago

0.8.2.2.3 (HDP 2.3). This is Hortonworks distribution , actual package is kafka_2_3_0_0_2557-0.8.2.2.3.0.0-2557.el6.

I did check #311 but thought Hortonworks might have picked the stable version. Do you know the JIRA that have fixed the issue with the kafka 0.8.2-beta ? Hortonworks says they have kafka 0.8.2 and the following apache patches http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/bk_HDP_RelNotes/content/ch01s06s09.html in this release.

dpkp commented 8 years ago

KAFKA-1841was the fix -- among other things, it separated OffsetFetchRequest into 2 identical "versions"; v0 for zookeeper offset storage, which doesn't use a Coordinator; and v1 for kafka offset storage, which does use a Coordinator. The error logs you uploaded show a v0 OffsetFetchRequest throwing a Coordinator error, which I have only seen in the beta release before KAFKA-1841. kafka-python does not test against non-standard releases, but it shouldn't be too hard to run the full integration test suite against it if you have the install package locally. Or if you have a download link for a tarball, I can try to add support to the kafka-python test suite (build_integration.sh)

mathewned commented 8 years ago

Inside _get_commit_offsets

def _get_commit_offsets(self):
        logger.info("Consumer fetching stored offsets" + str(self.__dict__))
        for topic_partition in self._topics:
            (resp,) = self._client.send_offset_fetch_request(

If i change the above line in kafka.py to use send_offset_fetch_request_kafka it is able to consume from the topic but fails when it commits. I think this is due to the lack of support for committing to kafka __offset topic.

So is this issue due to kafka not responding correctly to a v0 OffsetFetchRequest ?

I was able to successfully use the console-consumer that comes with kafka (tried both zookeeper and kafka offset management )

Here are the links to download the rpms http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.0.0/kafka/kafka_2_3_0_0_2557-0.8.2.2.3.0.0-2557.el6.noarch.rpm

You can ignore the dependency but if needed here is the link for zookeeper and ranger packages

http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.0.0/ranger/ranger_2_3_0_0_2557-kafka-plugin-0.5.0.2.3.0.0-2557.el6.x86_64.rpm http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.0.0/zookeeper/zookeeper_2_3_0_0_2557-3.4.6.2.3.0.0-2557.el6.noarch.rpm

I also tried using their maintenance release ( but according to the release notes no changes compared to the above version ). rpm links for that is http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.2.0/kafka/kafka_2_3_2_0_2950-0.8.2.2.3.2.0-2950.el6.noarch.rpm http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.2.0/zookeeper/zookeeper_2_3_2_0_2950-3.4.6.2.3.2.0-2950.el6.noarch.rpm http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.2.0/ranger/ranger_2_3_2_0_2950-kafka-plugin-0.5.0.2.3.2.0-2950.el6.x86_64.rpm

You can ignore the hdp-select dependency.

dpkp commented 8 years ago

correct -- the other part of that bug was that the commit api changed and was not backwards compatible. I don't think kafka-python can simultaneously support the beta API and not break support for other broker versions due to the compatibility bug in that api release. The compatibility issue was fixed in KAFKA-1841.

I am also not surprised that the packaged console-consumer that you have works -- the clients distributed with that release should work with it because they were presumably built with that beta API interface. But they wont work with other versions b/c of the compatibility issue. And similarly, third-party clients generally wont work with that version.

dpkp commented 8 years ago

well lordy -- looking through hortonworks github, it looks like they are tracking apache kafka's trunk branch. And what do you know, KAFKA-1841 never made it into trunk. Indeed, apache doesn't release off of trunk, so HDP is probably in a very weird state relative to the rest of the world. In any case, KAFKA-1841 was merged into trunk later w/ KAFKA-2068. It looks like HDP has that patch in HDP-2.3.4.0 . I would upgrade if you can :)

mathewned commented 8 years ago

Thanks for your help. Looks like HDP-2.3.4.0 is the only option, i will try that. But it is based on kafka 0.9 , is kafka-python compatible with 0.9 ?

dpkp commented 8 years ago

yes -- kafka-python will work w/ 0.9.0.0 brokers just fine. It just doesn't have support for new 0.9 features yet: Consumer Groups / Coordinators, TLS/Security, and metrics.

mathewned commented 8 years ago

Thank you. Will try the new version and update

dpkp commented 8 years ago

Ok -- please reopen if that doesn't work. I sent a note to the kafka mailing lists about this. It's a real bummer that an 0.8.2 release made it into the wild w/o KAFKA-1841.