This PR adds shard coordination through locking and state. Currently we support DynamoDB, but it would be trivial to support other backends if we ever required in the future.
The locking logic is:
When creating a Consumer you pass in the new kinesis.state.DynamoDB object.
When the Consumer is setting up shard readers it will first try to lock the shard in the state backend by doing a condition based atomic write to Dynamo.
If we do not acquire the lock then this consumer does not start a reader for that shard.
If we previously had a lock and lost it we stop the consumer
Otherwise we start & monitor the shard reader
Along with locking we now checkpoint our stream into the same DynamoDB table. The logic for checkpointing is:
For each record the Consumer yields we write the seq from that record into the DynamoDB document
This means once you pull a record off of the consumer it is your responsibility to ensure the durability of that record and what you do with it.
If we fail to write the updated seq back to Dynamo then we fail the whole shard reader and let the consumer try to acquire a new lock & start again
This PR adds shard coordination through locking and state. Currently we support DynamoDB, but it would be trivial to support other backends if we ever required in the future.
The locking logic is:
kinesis.state.DynamoDB
object.Along with locking we now checkpoint our stream into the same DynamoDB table. The logic for checkpointing is:
seq
from that record into the DynamoDB documentseq
back to Dynamo then we fail the whole shard reader and let the consumer try to acquire a new lock & start again