Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

The ManagedBalancedConsumer is not resilient to kafka node failures #517

Open thebigw4lrus opened 8 years ago

thebigw4lrus commented 8 years ago

PyKafka version: master ( commit - 9ae97cbcad4198b1b1ef58c25e4f51657c135688 ) Kafka version: 0.9.0

We have Kafka configured with a 3 node cluster. Also we have a producer producing messages, and a managed consumer consuming them. It turned out that whenever a kafka node fails for whatever reason(including because we stop one of them due to maintenance), the client is not able to overcome this.

This is the way we reproduce this problem:

from pykafka import PyKafkaClient
from time import sleep
HOSTS = 'server1:9092,server2:9092,server3:9092'

client = PyKafkaClient(HOSTS)
consumer = client.topics['test_topic'].get_balanced_consumer(consumer_group='my_consumer',
                                                             managed=True,
                                                             auto_commit=False)

for message in consumer.messages():
    print(message.value)
    consumer.commit_offsets()
    print(consumer.consumer.held_offsets)
    sleep(4)

And the error that we are actually seeing is:

ERROR:pykafka.balancedconsumer:Exception encountered in worker thread:
  File "/usr/local/lib/python2.7/dist-packages/pykafka/managedbalancedconsumer.py", line 218, in fetcher                                                                  
    self._consumer_id)                                                                                                                   
    ReplicaFetcherManager)
  File "/usr/local/lib/python2.7/dist-packages/pykafka/broker.py", line 458, in heartbeat                                                                                 
  ReplicaFetcherManager)
    return future.get(HeartbeatResponse)                                                                                                                                 
    ReplicaFetcherManager)
  File "/usr/local/lib/python2.7/dist-packages/pykafka/handlers.py", line 65, in get                                                                                    
  ReplicaFetcherManager)
    raise self.error

So it seems that the broker's heartbeat method is not able to handle when a broker goes down. We were expecting the heartbeat method (and the client in general) be resilient to a kafka node failure (having in mind that we have another 2 nodes in a good state).

Also we tried pykafka 2.2.1, and we saw a similar problem, but we got SocketDisconnectedError raised by this method. This we managed to fix by choosing a random broker for each retry. The latest pykafka version, however, deals with broker failures slightly differently.

emmettbutler commented 8 years ago

Thanks for noting this @thebigw4lrus. The ManagedBalancedConsumer is currently in its first versioned release, so I'd expect there to be some issues of this nature. This is one of the top-priority issues at the moment.

miketonks commented 8 years ago

+1 this is causing problems for us.

emmettbutler commented 8 years ago

Locally, I get this error occasionally when I take down a node in my three-node cluster. I wonder if anyone else has seen this particular traceback. It seems like a race caused by the heartbeat sending before the consumer has joined the group.

INFO:pykafka.managedbalancedconsumer:Sending heartbeat from consumer 'pykafka-6ca58945-383f-495a-8216-664b83430859'
INFO:pykafka.managedbalancedconsumer:Error code 25 encountered on heartbeat.
DEBUG:pykafka.managedbalancedconsumer:Heartbeat worker exiting
ERROR:pykafka.balancedconsumer:Exception encountered in worker thread:
  File "/home/emmett/git/parsely/pykafka/pykafka/managedbalancedconsumer.py", line 226, in fetcher
    self._handle_error(res.error_code)
  File "/home/emmett/git/parsely/pykafka/pykafka/managedbalancedconsumer.py", line 330, in _handle_error
    raise ERROR_CODES[error_code]()

Traceback (most recent call last):
  File "../notes/kafka_test_managed_consumer.py", line 13, in <module>
    message = cns.consume(block=False)
  File "/home/emmett/git/parsely/pykafka/pykafka/balancedconsumer.py", line 726, in consume
    self._raise_worker_exceptions()
  File "/home/emmett/git/parsely/pykafka/pykafka/balancedconsumer.py", line 271, in _raise_worker_exceptions
    raise ex
pykafka.exceptions.UnknownMemberId
emmettbutler commented 8 years ago

More local logs with comments inline. Normal operation:

INFO:pykafka.managedbalancedconsumer:Sending heartbeat from consumer 'pykafka-489ad73d-92db-4a15-a710-ae9cef677688'
INFO:pykafka.managedbalancedconsumer:Sending heartbeat from consumer 'pykafka-489ad73d-92db-4a15-a710-ae9cef677688'
INFO:pykafka.managedbalancedconsumer:Sending heartbeat from consumer 'pykafka-489ad73d-92db-4a15-a710-ae9cef677688'
INFO:pykafka.managedbalancedconsumer:Sending heartbeat from consumer 'pykafka-489ad73d-92db-4a15-a710-ae9cef677688'
INFO:pykafka.managedbalancedconsumer:Sending heartbeat from consumer 'pykafka-489ad73d-92db-4a15-a710-ae9cef677688'

A FetchRequest gets NotLeaderForPartition, cluster update starts

INFO:pykafka.simpleconsumer:Updating cluster in response to NotLeaderForPartition
DEBUG:pykafka.cluster:Updating cluster, attempt 1/3
DEBUG:pykafka.connection:Connecting to localhost:9092
DEBUG:pykafka.connection:Successfully connected to localhost:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.cluster:Discovered 3 brokers
INFO:pykafka.cluster:Broker emmett-debian:9092 metadata unchanged. Continuing.
INFO:pykafka.cluster:Broker emmett-debian:9093 metadata unchanged. Continuing.
INFO:pykafka.cluster:Reconnecting to broker id 2: emmett-debian:9094
DEBUG:pykafka.connection:Connecting to emmett-debian:9094
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9094
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.cluster:Discovered 2 topics
INFO:pykafka.topic:Adding 10 partitions
INFO:pykafka.partition:Updating leader for <pykafka.partition.Partition at 0x7f0b6878ce10 (id=7)> from broker 1 to broker 2
INFO:pykafka.partition:Updating in sync replicas list for <pykafka.partition.Partition at 0x7f0b6878ce10 (id=7)>
INFO:pykafka.cluster:Attempting to discover offset manager for consumer group 'testgroup'
INFO:pykafka.cluster:Found coordinator broker with id 1

Cluster update complete, next heartbeat fails with UnknownMemberId

INFO:pykafka.managedbalancedconsumer:Sending heartbeat from consumer 'pykafka-489ad73d-92db-4a15-a710-ae9cef677688'
INFO:pykafka.managedbalancedconsumer:Error code 25 encountered on heartbeat.
DEBUG:pykafka.managedbalancedconsumer:Heartbeat worker exiting

Theory: the generation ID is incremented not only when the Join Group phase is complete, but also when partition leadership in the Kafka cluster changes.

emmettbutler commented 8 years ago

I put together some thoughts in #568, though that branch doesn't yet solve the issue.

messense commented 8 years ago

Same issue with error code 25. But I didn't take down any Kafka node. It just runs into it.

messense commented 7 years ago

I wonder how should we handle ConsumerStoppedException/UnknownMemberId/SocketDisconnectedError etc. when consuming messages?

emmettbutler commented 7 years ago

@messense the simple answer is to stop() and start() the consumer in repsonse to these errors. This is definitely suboptimal and isn't guaranteed to make the issue go away, but it will at least unstick your message consumption for some amount of time. The optimal fix is, of course, to get to the bottom of why the ManagedBalancedConsumer doesn't handle node failures properly.

emmettbutler commented 7 years ago

Parse.ly does not currently use the ManagedBalancedConsumer in production, so fixes on that API are lower priority than ones on APIs we do use. I'll continue working toward full support for all new Kafka APIs, but progress is a bit slower on these than on the ones Parse.ly uses internally. Given this, code contributions from people who are actually working on using the ManagedBalancedConsumer in production are welcome and valued very highly.