Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

Support for exactly once processing guarantee #875

Closed rubinatorz closed 4 years ago

rubinatorz commented 6 years ago

Hi @emmett9001

I was wondering if pykafka 2.8.0 supports the exactly once processing guarantee which is part of Kafka since 0.11 release (just read https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/).

The reason for asking is that I'm having a Kafka streaming application running with "processing.guarantee=exactly_once" enabled, which reads from one topic and branches into multiple topics based on some predicates. But when consuming from one of this topics (the ones that are branched, so the ones that are filled by the streaming application) the SimpleConsumer fails with this error:

Traceback (most recent call last): File "consume.py", line 47, in <module> p.start() File "consume.py", line 38, in start msg = self._consumer.consume(block=True) File "/usr/local/lib/python2.7/dist-packages/pykafka/simpleconsumer.py", line 483, in consume self._raise_worker_exceptions() File "/usr/local/lib/python2.7/dist-packages/pykafka/simpleconsumer.py", line 276, in _raise_worker_exceptions reraise(*self._worker_exception) File "/usr/local/lib/python2.7/dist-packages/pykafka/simpleconsumer.py", line 440, in fetcher self.fetch() File "/usr/local/lib/python2.7/dist-packages/pykafka/simpleconsumer.py", line 804, in fetch min_bytes=self._fetch_min_bytes File "/usr/local/lib/python2.7/dist-packages/pykafka/broker.py", line 45, in wrapped return fn(self, *args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/pykafka/broker.py", line 327, in fetch_messages return future.get(response_class, broker_version=self._broker_version) File "/usr/local/lib/python2.7/dist-packages/pykafka/handlers.py", line 76, in get return response_cls(self.response, **response_kwargs) File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/fetch.py", line 216, in __init__ broker_version=broker_version) File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/fetch.py", line 165, in __init__ broker_version=broker_version), File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/fetch.py", line 172, in _unpack_message_set message_set = MessageSet.decode(buff, partition_id=partition_id) File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/message.py", line 272, in decode partition_id=partition_id) File "/usr/local/lib/python2.7/dist-packages/pykafka/protocol/message.py", line 109, in decode (key, val) = struct_helpers.unpack_from('YY', buff, offset) File "/usr/local/lib/python2.7/dist-packages/pykafka/utils/struct_helpers.py", line 49, in unpack_from output = _unpack(fmt, buff, offset, 1)[0] File "/usr/local/lib/python2.7/dist-packages/pykafka/utils/struct_helpers.py", line 96, in _unpack items.extend(struct.unpack_from('!' + ch, buff, offset)) struct.error: unpack_from requires a buffer of at least 1088403925 bytes

I'm running Kafka 1.1.0 cluster with 3 nodes and with replication factor 3. I'm also providing the right broker_version to the KafkaClient.

I've been debugging this for a while and I found out that this "processing.guarantee=exactly_once" in the streaming app is causing the struct.error. When removing this setting, SimpleConsumer is consuming the branched topics like a charm. I also added some debug lines in the pykafka code, and it seems that there are some extra bytes in the buffer which could not be parsed to a message. So that brings me to my initial question, if pykafka is supporting this exactly once feature.

Furthermore another related issue is that I've seen gaps in the offsets of the branched topics, which pykafka cannot handle well: the enqueue function in SimpleConsumer handles it for non compacted topics like this:

(not self._is_compacted_topic and message.offset != self.next_offset):

Which skips items when there's a gap. message.offset goes for example from 3 to 5 but next_offset expects 4 and this never happens and skips all messages because message.offset will never turn to 4. This only occurs on those branched topics by the streaming app. Just mentioning because it may ring a bell.

emmettbutler commented 6 years ago

Thanks for the report, @rubinatorz. There are a few things interacting here. Most importantly, full exactly-once message delivery is a feature of Kafka Streams, an API that pykafka has chosen not to support at this time. Faust is a Python library that attempts to replicate the behavior of Kafka Streams that might be worth checking out if you require complete exactly-once guarantees.

Idempotent message delivery (producer config enable.idempotence), one of the components of exactly-once semantics, is meant to be implemented in the producer. At the moment, pykafka is blocked on this by lacking support for the RecordBatch message format introduced in 0.11. It's in progress in https://github.com/Parsely/pykafka/pull/844, and contributions to that effort are quite welcome.

My understanding from your post is that your producer is a Kafka Streams application, and thus is sending RecordBatch-formatted messages. If this is the case, you'll find that unfortunately pykafka does not support such messages at this time, and this is a likely cause of the error you're seeing. If you're interested in contributing to that effort, please reach out via #844.

The issue you mention of "gaps" in "branched topics" is something I'm unfamiliar with. Can you explain in more detail what you mean by a "gap"? I have a suspicion that this might also be specific to the Kafka Streams API but I'm not sure.

Thanks again for your contribution.

jeffwidman commented 6 years ago

@emmett9001 thanks for the pointer to Faust, I hadn't seen that before. Very nifty!