Scope is to transition consumers from at-least-once to exactly-once, by default. This builds off of the approach and work undertaken with liveramp/factable
Basic implementation sketch:
Message interface is extended to support settable ProducerID and SeqNo fields which are marshaled with the user-defined message type. Messages can opt-out of exactly-once by making these no-ops. Low bits of SeqNo distinguish "uncommitted", "acknowledged", and "apply outside transaction" states.
During a consumer transaction, published messages are tagged with appropriate ascending uncommitted SeqNo. When the transaction prepares to commit, acknowledgement "intents" (empty messages that ACK messages that came before) are check-pointed with the commit, and once committed are then written to respective journals. Or, if a new processor recovers a checkpoint, the processor also writes the ACKs.
Readers must sequence read-uncommitted messages into read-committed ones. For efficiency, readers will buffer some number of recent uncommitted messages in hopes of a forthcoming ACK, but failing that they'll re-read portions of the journal to process the now-committed messages. This allows readers to read acknowledged messages without head-of-line blocking.
The broker append API is to be extended with a "registers" concept: very small key/value pairs which participate in the append transaction machinery. Append RPCs may update register values, or expect particular values (and fail if the expectation is not met). Journal registers allow for implementing write fences. recoverylog will be updated to install fence with InjectHandoff, and to require a fence value within Recorder.
Where today only read-through offsets are check-pointed with consumer transactions, for exactly-once semantics a shard checkpoint will include:
Read-through offsets of source journals (unchanged).
Producer states for sequencing read-uncommitted messages => read-committed.
The last SeqNo published by the shard.
ACK intents which acknowledge messages published by this transaction.
I'm also targeting improvements which allow shards to opt-out of recovery logs altogether, utilizing an external transactional store instead. I think it's possible for transactions to remain fully pipelined in this case also, which would be awesome for performance.
Scope is to transition
consumers
from at-least-once to exactly-once, by default. This builds off of the approach and work undertaken withliveramp/factable
Basic implementation sketch:
Message
interface is extended to support settableProducerID
andSeqNo
fields which are marshaled with the user-defined message type. Messages can opt-out of exactly-once by making these no-ops. Low bits ofSeqNo
distinguish "uncommitted", "acknowledged", and "apply outside transaction" states.SeqNo
. When the transaction prepares to commit, acknowledgement "intents" (empty messages that ACK messages that came before) are check-pointed with the commit, and once committed are then written to respective journals. Or, if a new processor recovers a checkpoint, the processor also writes the ACKs.recoverylog
will be updated to install fence withInjectHandoff
, and to require a fence value withinRecorder
.Where today only read-through offsets are check-pointed with consumer transactions, for exactly-once semantics a shard checkpoint will include:
SeqNo
published by the shard.I'm also targeting improvements which allow shards to opt-out of recovery logs altogether, utilizing an external transactional store instead. I think it's possible for transactions to remain fully pipelined in this case also, which would be awesome for performance.