bluesky / bluesky-kafka

Kafka integration for bluesky
Other
5 stars 10 forks source link

How to handle duplicate documents. #18

Closed gwbischof closed 4 years ago

gwbischof commented 4 years ago

Definition: Stream processing application is a program the read from kafka- processes the data - writes the result back to kafka.

There are bunch of ways that duplicate documents can be introduced.

The producer.send() could result in duplicate writes of message B due to internal retries.

We may reprocess the input message A, resulting in duplicate B messages being written to the output, violating the exactly once processing semantics. Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.

Finally, in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly once processing semantics. We call this the problem of “zombie instances.” https://www.confluent.io/blog/transactions-apache-kafka/

Idempotent producer solves the problem of duplicates when writing to kafka, by deduplicating documents.

This is achieved by persisting the message only once. With idempotency turned on, each Kafka message gets two things: a producer id (PID) and sequence number (seq). The PID assignment is completely transparent to users and is never exposed by clients. In the case of broker failure or client failure, during retry of message send, the topic will only accept messages that have a new unique sequence number and producer id. The broker automatically deduplicates any message(s) sent by this producer, ensuring idempotency. No additional code changes are required. https://dzone.com/articles/interpreting-kafkas-exactly-once-semantics. https://www.confluent.io/blog/transactions-apache-kafka/

Kafka transactions semantics were added to enable exactly one processing. This can make reading from a topic, processing, and writing the result to a topic an atomic operation. This works for processing applications that don't require any state. This uses a Kafka transaction coordinator process to achieve atomicity.

It is worth noting that the guarantees above fall short of atomic reads. In particular, when using a Kafka consumer to consume messages from a topic, an application will not know whether these messages were written as part of a transaction, and so they do not know when transactions start or end. Further, a given consumer is not guaranteed to be subscribed to all partitions which are part of a transaction, and it has no way to discover this, making it tough to guarantee that all the messages which were part of a single transaction will eventually be consumed by a single consumer.

In short: Kafka guarantees that a consumer will eventually deliver only non-transactional messages or committed transactional messages. It will withhold messages from open transactions and filter out messages from aborted transactions. https://www.confluent.io/blog/transactions-apache-kafka/

Kafka Streams has a general exactly once solution built in, so you don't have to worry about it. https://kafka.apache.org/documentation/streams/ Kafka Streams has both a Producer and Consumer in it, it is a stream processing application. Kafka Streams can also handle stream processing with state. Because it has an additional database, and also writes some information to Kafka. And can recover the state of where it left off when the process restarts. Kafka Streams only has a unofficial python library. The official is in Java.

My conclusion, so far, is that Streams looks really useful. We may want to separate processing from writing to disk/database. And do our processing as stream processing. (Where the output is written back to kafka).

Exactly once with mongo: I don't think commiting to kafka after each document solves the exactly once problem. Because, for example, the network could fail between writing to mongo and the commit. I think that the same is true if we use mongo transactions. Mongo transactions could be good for reducing the commit overhead, but I don't think it solves the exactly once problem. So far I think the best solution is to use mongo to dedupe the documents. This is same strategy that is used by kafka to avoid duplicates from the Producer.

When we are writing to a file, we could commit to kafka only after the file is completed.

I think a little more investigation is needed. I think we should decide if the unofficial Python Streams library is useable, would it be better to do stream processing in Java? Or not use Streams at all?

jklynch commented 4 years ago

I would like to start by testing how Kafka performs on its own, without Streams, so when and if we try Streams we can easily identify whether or not it is an improvement

danielballan commented 4 years ago

Thanks for this useful summary.

To revive a point I made in a long-lost thread somewhere in Microsoft Teams, committing to Kafka and using Mongo transactions protects against a class of likely errors. It's true that if we lose network between committing a Mongo transaction and then committing to Kafka that the message has been processed, we'll have a problem. But any errors that happen inside that transaction will be correctly reported back to Kafka as a failure in need of a retry, and that is valuable. Examples of errors this would protect us from include index uniqueness violations, internal Mongo server problems, and document size overruns. In those scenarios, my understanding/expectation is that Kafka can give us some slack, absorbing a backlog for a time while we resolve the issue.

gwbischof commented 4 years ago

Streams is not for performance improvement. It is slower than using Producer and Consumer directly because of the overhead of exactly-once. I'm not suggesting that we use this now, but keep it in consideration if we want to make a stream processor, which has input and output to kafka. Also I think we should consider separating the processing step and the writing to disk/database step.

danielballan commented 4 years ago

Notes from call:

Streams are interesting for future use cases where we consume from one topic and publish to another. One example for this is database migrations. But it is not on the critical path for now.

Transactions are not actually relevant to our concerns. We are publishing and consuming documents one at a time. We never have the situation "Do this complete sequence of tasks or none at all" in the present use case.

Currently, if something goes wrong and a document cannot be saved (e.g. uid collision, over-sized document) pymongo raises an exception which kills the scan, so the user knows immediately that they are not taking data. When we insert a message bus, as long as the Kafka Producer is able to reach the Kafka Broker, the scan will proceed even if the the consumer connected to MongoDB is creating exceptions. In a worst-case scenario, the user might work through the night taking data which is not actually being saved!

In the short term (i.e. in the initial roll-out) we will issue error messages in the logs that clearly distinguish between true duplicates (tried to insert this but a document with the exact some contents already exists in the database) which indicate a message bus hiccup and uid collisions (tried to insert this but a document with different context already exists, and thus this data will not be saved) which indicate a critical failure that might be due to document manipulation upstream of the Producer. These might be ERROR and CRITICAL level log messages, respectively. As a policy, we will only subscribe Producers that write the raw documents topic directly to the RunEngine.

In the longer term, we will add some channel for feedback so that a suspender can interrupt a scan in the event that data may not be being saved.

gwbischof commented 4 years ago

This was fixed in suitcase-mongo normalized