dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.62k stars 1.41k forks source link

Cleanup check_version() / API_VERSION in KafkaClient/BrokerConnection #927

Open jeffwidman opened 7 years ago

jeffwidman commented 7 years ago

Skimming the code for KafkaClient and BrokerConnection, it looks like there's some cleanup possible:

1) I don't see why KafkaClient needs a check_version()... why not just completely delegate this to BrokerConnection?

2) There's a docstring for BrokerConnection param api_version that lists potential values, but these are never actually checked. Instead, the allowed values are listed in KafkaClient.API_VERSION. Can this whitelist be moved down the stack to BrokerConnection? That way it's only in one place, no need to maintain multiple lists.

3) Do we even need the API_VERSION white list? Why not just be a little more permissive and allow users to specify their own API version tuples? That way they can write wrapper code that pins api_version to a version that hasn't yet been added to kafka-python, knowing that they can safely upgrade kafka-python underneath their wrapper because if kafka-python adds a feature that their broker supports, they'll get it for free, while simultaneously protecting against enabling new features that aren't supported by their broker. Personally, in production I'd rather pin my versions more conservatively than that, but I don't think this use case should be prevented like it currently is.

4) I don't understand the purpose of this line of code in KafkaClient.check_version(): try_node = node_id or self.least_loaded_node() Why check the least_loaded_node() at all? I would think you'd want to check the version of only the broker that you're trying to talk to. Mixed-version clusters occur during rolling upgrades, so it's best not to assume that the version you get back from another broker is the same as the version of the current broker you're talking to.

mpastecki commented 5 years ago

I have seen the behavior as described in point 4 during rolling upgrade. What happens is that some of the consumers were using API version higher than what non upgraded brokers supported and thus they could not join the Consumer Group.

The API versions should be taken either from the broker given consumer talks to or it should be set to maximum supported version from broker running the oldest kafka version that is used in the cluster.