ciena / afkak

Kafka client written in Twisted Python
Apache License 2.0
32 stars 22 forks source link

Non recursive consumer message processing #96

Closed jcabmora closed 5 years ago

jcabmora commented 5 years ago

This addresses the issue identified in #93 which is caused by the way _process_messages uses a callback chain to itself to process a large batch of messages. This logic has been changed to use a simple while loop. Since we want to avoid processing a batch of messages until the current batch has been committed, the while loop yields until the Deferred returned by the current processor has fired.

Unit tests required one change caused by the fact that now _process_messages returns a Deferred since it is decorated with inlineCallbacks

The debug statement that logs the result of the processor was moved to a callback function since otherwise it prints a Deferred object, not the actual return value of the processor function. This change is not part of the fix of the issue, but something additional that I noticed that seemed wrong to me.

jcabmora commented 5 years ago

I just forced pushed the last commit because an integration test failed, although the previous run succeeded (and the only change between them was a change on a unit test?). Since I know the integration tests are flaky trying to get a clean build. We should probably open an issue about the reliability of the integration tests.

jcabmora commented 5 years ago

@twm and @rthille I have addressed the review comments. Thanks a lot to both for the review!

jcabmora commented 5 years ago

Thank you @twm !