awslabs / amazon-kinesis-client

Client library for Amazon Kinesis
Apache License 2.0
640 stars 463 forks source link

KCL does not work safely in a scaling group #11

Closed llamahunter closed 6 months ago

llamahunter commented 9 years ago

The KCL does not seem designed to work safely in a scaling group. I have evidence that new workers create new readers for the same shard as existing workers before waiting for those readers to be shut down. This causes multiple simultaneous processing of the same events in separate EC2 instances. If there are shared resources between the EC2 instances, consistency checks are violated.

Scaling should be done with cooperation of all existing healthy workers such that load is balanced across all EC2 instances without duplication, not blindly.

llamahunter commented 9 years ago

I've been looking at the source code for lease stealing. It appears to not give any time for the worker from which the lease was stolen to notice that the lease is gone and to stop processing before the new worker starts duplicating work and wreaking havoc on shared resources.

At the very least, after any leases are stolen (as opposed to adopted expired leases) , the new worker should wait at least LeaseTaker.leaseDurationNanos before attempting to process any records for that shard so that the old owner has an opportunity to know what's going on.

gauravgh commented 9 years ago

The KCL provides at least once semantics. In cases such as fail over and load balancing, data can be delivered more than once, and there can be a (typically small) window of time when there are multiple record processors assigned to a shard. We recommend designing your application to be idempotent.

In your scenario, is it possible for your record processor to sleep (up to fail over time) as part of initialize() when you detect that the old record processor may still be using the resource?

I'd recommend starting a thread on our forum (https://forums.aws.amazon.com/forum.jspa?forumID=169) with some more details about your use case/application. We are better able to answer application design questions via the forums.

Sincerely, Gaurav

llamahunter commented 9 years ago

"At least once" delivery to a single instance of a record processor is one thing. That can easily be handled by checkpointing, etc. Having multiple active readers on the same data at the same time is entirely different. It requires having some sort of database interlock between all potential readers on any output operation that can't be undone.

Consider, for instance, a simple copier that reads from one kinesis stream and outputs to another. There is no way to make such an operation idempotent.

If there is only one stream reading from each input shard, it is straightforward with a second dynamo table to keep track of the last input sequence number that was written to the output stream. If the current input sequence number is less than the last sequence number, it must be a restart from checkpoint, and so the output can be discarded.

However, if there are multiple readers on the same shard of the input stream, this straightforward approach is not possible, as they will be competing with one another to write the same data to the output. Expensive inter instance critical sections need to be written to ensure that no other machines are able to check the last sequence number, write to the output stream, and update the last sequence number in a non-atomic way.

gauravgh commented 9 years ago

Even with a single reader, you have to design for scenarios where the reader may have produced an output but then died before it was able to advance the checkpoint (or the second dynamo table you mention). Once you've designed for it, then you can typically use the same mechanism to handle the multiple readers scenario.

In the simple copier scenario you mention above, you'll want the consumer of your second stream to be idempotent as well. For example, if the copier makes a call to Kinesis to put a record and the call times out you'll want to retry (to avoid data loss). If your first call had succeeded, then the retry will put a duplicate record. You'll want to design the consumer to handle these. Once the consumer is idempotent, then you can rely on that idempotency mechanism to handle the multiple reader/copier scenario.

Sincerely, Gaurav