redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.09k stars 815 forks source link

aws_kinesis input: shards are not processed if they are closed #2649

Open j0shthomas opened 3 months ago

j0shthomas commented 3 months ago

When kinesis a kinesis shard is split (eg. when scaling) it marks the parent shard as CLOSED and client writes start going to the child shards. Benthos does not appear to be reading from CLOSED shards (unless shards are explicitly set on the input) due to this line https://github.com/redpanda-data/connect/blob/main/internal/impl/aws/input_kinesis.go#L630.

There may still be data from the parent shards which has not yet been processed so this data will be "lost". With kinesis you can tell once all the data has been read by the EndingSequenceNumber . https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html

Jeffail commented 3 months ago

Hey @j0shthomas, just to confirm my understanding, would the correct behaviour here be to continue reading the shard until our current sequence number matches that of the EndingSequenceNumber?

j0shthomas commented 3 months ago

Yes that is my understanding, that would mean that all the data has been read from the shard and no new data will be written

Jeffail commented 3 months ago

Okay, I've had a little refresher course on the code. It looks as though this is specifically an issue when bootstrapping shard allocation after one of these splits. Currently, we check for the presence of that ending sequence number as a way of detecting finished shards, as otherwise we would be constantly busy-looping allocations to those finished shards.

Unfortunately, when we're scanning for any unallocated shards we don't have the context as to what the final sequence was from our checkpointers, as that's stored in dynamodb. A naive approach would be to call dynamodb for every single unallocated and "ended" shard each time we rebalance in order to determine if they're fully consumed or not. Since that would be extremely wasteful we would instead need to do some in-memory caching of that information such that subsequence shard listings would reuse that information.

To reproduce this issue one would need to:

  1. Populate one or more shards with data
  2. Split the shards (without any benthos consumers running)
  3. Run the benthos consumers

If you have benthos instances running during the splitting then it's likely the data would still be fully consumed (unless there's a severe backlog), which is probably why this hasn't been noticed in the wild.

It still continues to amaze me just how much logic a consumer of kinesis needs to implement just for the most basic use cases, and how outstandingly inadequate the documentation is for doing exactly that. If anyone is willing to take on this work then please let us know in this issue, otherwise I'll try and tackle it soon.

j0shthomas commented 3 months ago

If the shard has ended but still has a claim on it and enough time has passed since the LeaseTimeout, it will be kept as an unclaimed shard.

However, if the service is shutdown, the client IDs are removed from dynamodb so then it won't be picked up the next time the service starts. Does the service need to remove the client IDs when shutting down? If they were kept I don't think we would run into this issue.

Once a shard is fully consumed it is deleted from dynamodb (it would help debugging if this was a soft delete), so RunExplicitShards will re consume from the beginning but RunBalancedShards will ignore it.