harlow / kinesis-consumer

Golang library for consuming Kinesis stream data
MIT License
263 stars 88 forks source link

Maintain parent/child shard ordering across shard splits/merges. #155

Closed gram-signal closed 3 months ago

gram-signal commented 1 year ago

Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input pkey=x,val=1 pkey=x,val=2 pkey=x,val=3, the values 1,2,3 will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard.

However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its listShards outputs. A split shard A will create children B and C, both with ParentShardId=A. A merging of shards A and B into C will create a new shard C with ParentShardId=A,AdjacentParentShardId=B. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained.

kinesis-consumer currently doesn't do this. Instead, upon the initial (and subsequent) listShards call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard X contains a single record rX:

time ->
  B
 / \
A   D
 \ /
  C

record rD should be processed after both rB and rC are processed, and both rB and rC should wait for rA to be processed. By starting goroutines immediately, any ordering of {rA,rB,rC,rD} might occur within the original code.

This PR utilizes the AllGroup as a book-keeper of fully processed shards, with the Consumer calling CloseShard once it has finished a shard. AllGroup doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to.

This PR created a new CloseableGroup interface rather than append to the existing Group interface to maintain backwards compatibility in existing code that may already implement the Group interface elsewhere. Different Group implementations don't get the ordering described above, but the default Consumer does.

harlow commented 3 months ago

Nice work thanks for the PR!