Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

Auto-created topics partitions incorrect / divide by zero #941

Open rjemanuele opened 5 years ago

rjemanuele commented 5 years ago

PyKafka version: 2.8.0 Kafka version: 2.2.0

When producing to an auto-created topic a divide by zero exception can occur.

Traceback (most recent call last):
  File "/opt/system/project/usr/share/signal_generator.py", line 464, in emit
    producer.produce(j.encode())
  File "/usr/local/lib/python3.7/site-packages/pykafka/producer.py", line 385, in produce
    partition_id = self._partitioner(partitions, partition_key).id
  File "/usr/local/lib/python3.7/site-packages/pykafka/partitioners.py", line 47, in __call__
    self.idx = (self.idx + 1) % len(partitions)
ZeroDivisionError: integer division or modulo by zero

My thought is that the partition list may be incomplete when a message is produced. (eg. Kafka hasn't fully created the partition list or pyKafka hasn't updated it yet.)

rjemanuele commented 5 years ago

@emmett9001 Hi Emmett, Could you have a look at this one? I'm not sure what the best course of action is here. Thanks.

emmettbutler commented 5 years ago

Assuming your topic is actually being created with the proper number of partitions, a workaround could be to initialize a producer with a preexisting topic name instead of relying on autocreation.

rjemanuele commented 5 years ago

So I found a retry and update_cluster does work. These topics are low volume, single partition.

    retry = 0
    topic = self.kafka.topics[mytopic]
    while retry < self.retry_topic_partitions_found:
        if len(topic.partitions):
            break
        logging.warning("No partitions found on retry %d for %s" % (
            retry, mytopic))
        retry += 1
        time.sleep(.25)
        self.kafka.update_cluster()
        topic = self.kafka.topics[mytopic]

    if len(topic.partitions) == 0:
        logging.error("No partitions found after retries for %s" % (
            mytopic))

    self.producer = topic.get_producer()

For this, I've only ever seen a single retry.