Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

Zookeeper topic and consumer names with b literal #888

Open Atheuz opened 5 years ago

Atheuz commented 5 years ago

I think a problem that I am facing relates to these two issues:

https://github.com/Parsely/pykafka/issues/567 https://github.com/Parsely/pykafka/pull/569

Specifically, my problem is that Zookeeper is saving the b literal inside the string of topic and consumer group names, see the log line below:

2018-11-09 18:18:31     INFO [pykafka.balancedconsumer] Rebalancing consumer "b'DESKTOP-K7VPQI3:f1ea4868-71c1-44f2-a55c-00e7a908b1dc'" for topic "b'this-is-a-topic'".

and:

2018-11-09 18:22:59     INFO [pykafka.cluster] Attempting to discover offset manager for consumer group 'b'this-is-a-consumer-group''

My initialization of pykafka looks like this:

    self.kazoo = KazooClient(hosts=self.zookeeper_hosts)
    self.kazoo.start()
    while not self.kazoo.connected:
        continue

    # Set up the client with the hosts
    self.client = pykafka.KafkaClient(
        zookeeper_hosts=self.zookeeper_hosts,
        socket_timeout_ms=self.socket_timeout_ms,
        broker_version=self.broker_version,
    )

    return self.client.topics[self.topic].get_balanced_consumer(
        consumer_group=self.group_id,
        zookeeper=self.kazoo,
        consumer_timeout_ms=self._consumer_timeout_ms,
        auto_offset_reset=OffsetType.LATEST,
        auto_commit_enable=self._auto_commit_enable,
        auto_commit_interval_ms=self._auto_commit_interval_ms,
    )

If I query Zookeeper with Kazoo with this bit of code:

 print(kazoo.get_children("/consumers/this-is-a-consumer-group/owners"))

I get this:

["b'this-is-a-topic'"]

I don't know if this is an issue with my set up being wrong, but it's not obvious to me why it should be behaving like this. I can't get it to become part of the correct partition and start consuming from the latest offset there, because of this.

PyKafka version: 2.8.0 Kafka version: 0.8.2 Python version: 3.6.6

emmettbutler commented 5 years ago

Thanks for reporting this issue, @Atheuz. What are the types of self.topic and self.group_id? Since https://github.com/Parsely/pykafka/pull/760, they should be str instances.

Atheuz commented 5 years ago

I've tried with both str and b literal strings, and the behaviour is the same. If I run a consumer with either:

1) It does not appear to commit the offset of its messages as the lag for that consumer group just grows unbounded, and it doesn't appear that any offsets are committed. 2) It doesn't matter if I use b literals or str instances, the behaviour is the same. 3) It's giving me these partitions:

`My partitions: ["b'my-unused-topic'-4-112", "b'my-unused-topic'-3-63", "b'my-unused-topic'-5-125", "b'my-unused-topic'-4-40", "b'my-unused-topic'-2-110", ...]`

Maybe I'm just not using it correctly, can you tell me where ZK/pykafka saves the partitions and offsets? Is it in /consumers/my-unused-group/offsets/my-unused-topic? That's where my lag metric is being read from. I saw a post where you mentioned that the offset is not being stored in ZK, but in kafka, but I'd assume it'd have to be replicated on ZK given that I don't think the partitions can be stored in Kafka in 0.8.2 and I know those offsets don't match.

emmettbutler commented 5 years ago

Regarding the b literal in zookeeper, I think that's a bug that is definitely annoying but doesn't strictly preclude use of the consumer. Pykafka stores offset information in Kafka itself via the OffsetCommitRequest and OffsetFetchRequest interface - it does not use Zookeeper for offset storage in any capacity. Pykafka's BalancedConsumer, which you're using, does use Zookeeper for the storage of partition ownership information, which is used to balance consumer groups and is distinct from offset information. Thus I believe your assertion that offset information is "replicated in ZK" is not correct. The code you've provided appears to be correct with respect to automatic offset committing, assuming that you're setting auto_commit_enable to True and auto_commit_interval_ms to a reasonable number of milliseconds. If that's the case, it's hard to debug further without additional code. You mention that "the lag for that consumer group just grows unbounded" - what do you mean by that? More information about exactly how you're checking for committed offsets would be helpful in fixing your issue.

Atheuz commented 5 years ago

@emmett9001 my mistake. The problem I was having was that I could not get it to report offset lag as I was expecting it to, I changed over to the method used in kafka_tools.py and that way I was getting the correct offset lag - so more precisely the problem was that the offsets were not being stored in Zookeeper where I was expecting them to be stored.

I still get the weird b literal in in the pykafka logs, and I'm not sure what to make of it, but it seems to be consuming/producing messages properly.

This is not a 'bug' for me any longer, I just had to fully grok what pykafka was actually doing.