ciena / afkak

Kafka client written in Twisted Python
Apache License 2.0
32 stars 22 forks source link

TestConsumerIntegration.test_large_messages failure #53

Open twm opened 5 years ago

twm commented 5 years ago
ERROR: afkak.test.test_consumer_integration.TestConsumerIntegration.test_large_messages
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 151, in maybeDeferred
    result = f(*args, **kw)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/utils.py", line 201, in runWithWarningsSuppressed
    reraise(exc_info[1], exc_info[2])
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/compat.py", line 464, in reraise
    raise exception.with_traceback(traceback)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/utils.py", line 197, in runWithWarningsSuppressed
    result = f(*a, **kw)
  File "/home/travis/build/ciena/afkak/afkak/test/testutil.py", line 88, in wrapper
    return func(self)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 169, in wrapper
    raise exc_type(exc_value).with_traceback(tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/nose/twistedtools.py", line 143, in errback
    failure.raiseException()
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 467, in raiseException
    raise self.value.with_traceback(self.tb)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/test/test_consumer_integration.py", line 109, in test_large_messages
    0, [str(x) for x in range(10)])
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/test/test_consumer_integration.py", line 48, in send_messages
    resp, = yield self.client.send_produce_request([produce])
  File "/home/travis/build/ciena/afkak/.tox/py35-int-snappy/lib/python3.5/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
    result = g.send(result)
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 465, in send_produce_request
    returnValue(self._handle_responses(resps, fail_on_error, callback))
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 566, in _handle_responses
    _check_error(resp)
  File "/home/travis/build/ciena/afkak/afkak/common.py", line 723, in _check_error
    raise error
afkak.common.UnknownTopicOrPartitionError
twm commented 5 years ago

56 will fix this.

twm commented 5 years ago

Now with PartitionUnavailableError instead:

ERROR: afkak.test.test_consumer_integration.TestConsumerIntegration.test_large_messages
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/internet/defer.py", line 151, in maybeDeferred
    result = f(*args, **kw)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/internet/utils.py", line 201, in runWithWarningsSuppressed
    reraise(exc_info[1], exc_info[2])
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/python/compat.py", line 464, in reraise
    raise exception.with_traceback(traceback)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/internet/utils.py", line 197, in runWithWarningsSuppressed
    result = f(*a, **kw)
  File "/home/travis/build/ciena/afkak/afkak/test/testutil.py", line 88, in wrapper
    return func(self)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/nose/twistedtools.py", line 169, in wrapper
    raise exc_type(exc_value).with_traceback(tb)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/nose/twistedtools.py", line 143, in errback
    failure.raiseException()
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/python/failure.py", line 467, in raiseException
    raise self.value.with_traceback(self.tb)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/test/test_consumer_integration.py", line 119, in test_large_messages
    0, [str(x) for x in range(10)])
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/test/test_consumer_integration.py", line 53, in send_messages
    [resp] = yield self.client.send_produce_request([produce])
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 463, in send_produce_request
    payloads, encoder, decoder)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 876, in _send_broker_aware_request
    payload.topic, payload.partition)
  File "/home/travis/build/ciena/afkak/.tox/py36-int-snappy-murmur/lib/python3.6/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
    result = g.send(result)
  File "/home/travis/build/ciena/afkak/afkak/client.py", line 709, in _get_leader_for_partition
    raise PartitionUnavailableError("%s not available" % str(key))
afkak.common.PartitionUnavailableError: TopicAndPartition(topic='test_large_messages-DLewfeUmbu', partition=0) not available

This is with Python 3.6 and Kafka 1.1.1.

rthille commented 5 years ago

Basically, Kafka isn't reliable around what the Metadata query reports as being true, vs what the servers themselves will respond with...

twm commented 5 years ago

Yeah, absolutely. The only way to tell if you can actually produce to or consume from a topic is to try doing so.

twm commented 5 years ago

Maybe we should change Afkak's retry strategy to compensate for this? Right now we fail eagerly with PartitionUnavailableError based on the metadata. In practice I think that this just pushes API users to add a retry loop around the produce/consume calls that calls load_metadata_for_topcs(). Afkak could handle that retry loop internally.

We might want a separate timeout for this, though, like topic_timeout_ms — time to wait for a topic to become available. You'd probably want a longer value for this than for individual requests. (Note that in #51 I remove support for timeout=None.)

rthille commented 5 years ago

I wonder how other clients deal with that. Does Afkak see that more because we have the integration tests that fire up new topics and then try to use them right away, or does everyone see it and deal with it?