Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

synchronous producer hangs when kafka is unreachable #449

Closed emmettbutler closed 6 years ago

emmettbutler commented 8 years ago

_This issue was brought to my attention by Michael Kowalchik in the mailing list_

Steps to reproduce

The expected result is that the producer's main thread should raise a SocketDisconnectedError from produce().

The following sequence of events causes this bug. When a broker is unavailable, SocketDisconnectedError is raised and caught. If there are no brokers available, this call to _update() raises an exception. This in turn kills the worker thread, meaning that code after the _update call, including pushing the original SocketDisconnectedError onto the delivery reports queue, does not execute. On the main thread, produce() is still waiting for the delivery report, which will never come because the worker has died.

Possible solutions include ignoring certain exception types from the _update() call, or catching any exception that happens between to_retry being populated and the delivery report being enqueued. I'd lean toward simply catching and warning about this specific exception, maybe giving it a more specific name in the process.

emmettbutler commented 8 years ago

I've opened #450, which depends on #451. Once #451 is in master, I can rebase this branch, making it testable.

emmettbutler commented 8 years ago

Duplicated by #456

emmettbutler commented 7 years ago

I wanted to remove the try...except introduced in #450 as a fix for #629, and I realized that I'm actually not able to reproduce this issue anymore. It seems that this commit actually fixed this issue after the fact by ensuring that produce() didn't block the main thread in cases where there were exceptions pending. The try...except introduced in #450 had the unfortunate side effect of making NoBrokersAvailable result in all worker threads being permanently killed, thus I think it makes sense to remove it.

beniji commented 7 years ago

Using pykafka 2.6.0.dev2 (Python3 on MacOS X, Kafka server 0.10.1.1) I see the sync producer still hangs when trying to send a message (i.e. in the producer.produce() function) after the Kafka server has been terminated (using docker stop kafka) and re-started (using docker start kafka). This behaviour is fully reproducible, using just topic.get_sync_producer(max_retries=5) and producer.produce(message).

When killed, the stacktrace is:

File "/usr/local/lib/python3.6/site-packages/pykafka/producer.py", line 336, in produce
    reported_msg, exc = self.get_delivery_report(timeout=1)
  File "/usr/local/lib/python3.6/site-packages/pykafka/producer.py", line 359, in get_delivery_report
    return self._delivery_reports.queue.get(block, timeout)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/queue.py", line 173, in get
    self.not_empty.wait(remaining)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 299, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

NB The client DOES recover however if produce() is not called while the Kafka server is down: in this case, when Kafka comes back up, the next call to produce() does succeed.

zhanghan987 commented 7 years ago
  while True: 
    try: 
        report = self.get_delivery_report(timeout=1)
        break
   except Empty:
        continue

Hi, we met the same issue that sync producer hangs when kafka dead and recovery soon. I think the code above will cause this issue: When kafka broker dead and recovery, the dilivery report response message from kafka broker to pykafka may be lost. And if lost response message, the code above would get into dead loop. How could we handle this situation? Should we have 'timeout' parameter in 'produce' function?

emmettbutler commented 7 years ago

Sounds like this needs another look.

emmettbutler commented 7 years ago

@beniji @zhanghan987 Can you be more specific about the exact producer and broker settings you're using to reproduce this issue, as well as the specific sequence of steps you're performing? I'm unable to replicate it. In my local tests, I'm using a single broker with an unreplicated topic. I produce repeatedly to it with a synchronous consumer in a while True loop. As that loop is running, producing one message each iteration, I kill the broker. As soon as the broker dies, the consumer retries the connection a few times in quick succession before throwing an exception and dying. That is, it doesn't hang, it just dies immediately, which is the expected behavior.

How can I adjust my local test to see the behavior you're seeing?

zhanghan987 commented 6 years ago

@emmett9001 Hi, thanks for your reply. We have one broker and one producer in our environment as same as your tests, but our producer and broker are in the different network segment from different IDC room. And when the network between the two IDC not stabilization like lost connection for a while (The broker and producer are both alived), this issue happens. It happens for several times, sorry for that I don't know how to reproduce it in your local test.

emmettbutler commented 6 years ago

I think I finally got my head around the situation that can cause this hang - essentially it's when mark_as_delivered is never called for a given message in sync mode. https://github.com/Parsely/pykafka/pull/769 makes the producer resilient to that case.