spring-projects / spring-integration-aws

212 stars 166 forks source link

Checkpointing logic resulting in lost messages in a clustered environment (M2 release) #90

Closed s-porpoise closed 6 years ago

s-porpoise commented 6 years ago

I'm using batch checkpoint mode in a clustered environment. All works fine with the M1 release, but in M2 I am seeing messages occasionally get dropped.

I think the culprit is the change in commit 2b57f34801df625587438536c66e994524899b4f to return the value of "replace(key, oldVal, newVal)" - whereas it previously always returned true when the existingSequence < sequenceNumber, but I'm not too familiar with the code so I can't be sure this is it. I'm thinking it could be a race condition in ShardCheckpointer where the value stored in DynamoDB changes between the call to getCheckpoint( ) and checkpointStore.replace(...), causing the method to return false.

All I can say for sure if that I can switch between version M1 and M2 and in a load test of ~4500 messages I will usually have a handful of messages dropped (~40). Switching back to the M1 release resolves the issue. Unfortunately this is manual performance testing that I haven't automated, so I don't have a simple test case to share.

In my clustere used case I'm happy for my application to receive some duplicates but obviously not for messages to be dropped. There's a comment in the code about an upcoming "shard leader election implementation", so I guess there are more changes coming to this area?

s-porpoise commented 6 years ago

Re-opening - I thought this was user error but I've now verified it multiple times.

artembilan commented 6 years ago

OK. Thanks.

Would you mind to re-phrase the problem a bit different way, just to be sure what and where is wrong.

The fix you show is exactly for the race condition when the same messages arrive to different consumers, so we could process them twice. With the fix we really check if if current sequenceNumber has not been stored already. In that case we assume that messages have been processed in other consumer.

Maybe your other consumer raises an exception somehow? This way the sequenceNumber is committed, but messages are not processed...

s-porpoise commented 6 years ago

I'm still digging, but the following logs show an example where my dropped message for partition key 2A2ROG 49584571934843771141903627621785355615208059945070100482 has a larger sequence number than the checkpointed value 49584571934843771141903627621784146689388445315895394306, but it's getting dropped regardless.

Lining the numbers up in ascending order: 49584571934843771141903627621780519911929601359651799042 (a message at 14:03:31.027) 49584571934843771141903627621784146689388445315895394306 (final checkpoint value) 49584571934843771141903627621785355615208059945070100482 (the dropped message)

The first log item shows when 49584571934843771141903627621780519911929601359651799042 is received. The second is a log entry I added to record the result of replace(...) when it returns false. The third log item shows us skipping the records and the checkpoint value at that point.

2018-05-18 14:03:31.027 [-kinesis-consumer-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[18358], headers={aws_shard=shardId-000000000000, id=32c01fd4-ea31-5784-7eb5-8a57cde4e71a, contentType=application/json, aws_receivedPartitionKey=ZJWEWM, aws_receivedStream=Stream-localdev-dy, aws_receivedSequenceNumber=49584571934843771141903627621780519911929601359651799042, timestamp=1526616211015}]

...

2018-05-18 14:03:33.231 [-kinesis-consumer-1] INFO  o.s.i.a.i.kinesis.ShardCheckpointer - ***** replace returned false (key, existingSequence, sequenceNumber) = (Group:Stream-localdev-dy:shardId-000000000000, 49584571934843771141903627621780519911929601359651799042, 49584571934843771141903627621787773466847289203419512834)

...

2018-05-18 14:03:33.693 [-kinesis-consumer-1] INFO  o.s.i.a.i.k.KinesisMessageDrivenChannelAdapter - The records '[
{SequenceNumber: 49584571934843771141903627621782937763568830686720688130,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=5597 cap=5597],PartitionKey: 2ATKAA,}, 
{SequenceNumber: 49584571934843771141903627621784146689388445315895394306,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1980 cap=1980],PartitionKey: Z58WC8,}, 
{SequenceNumber: 49584571934843771141903627621785355615208059945070100482,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1346 cap=1346],PartitionKey: 2A2ROG,}, 
{SequenceNumber: 49584571934843771141903627621786564541027674574244806658,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1959 cap=1959],PartitionKey: Z58PRM,},
{SequenceNumber: 49584571934843771141903627621787773466847289203419512834,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=32868 cap=32868],PartitionKey: Y8MUZY,}]' 
are skipped from processing because their sequence numbers are less than already checkpointed: 49584571934843771141903627621784146689388445315895394306

This behaviour would be fine if my other instance subsequently picked up this message, but it also skips it with similar behaviour. I suspect what's happening is each instance has pulled batches of messages that overlap in their content but aren't the same. I'm still lining up the logs between instances. Maybe this issue could be mitigated by trying again until "this.checkpointer.filterRecords(recordsToProcess);" returns an empty list and we're sure we can skip the items.

I realise some changes were made to limit the number of duplicates, but application developers in this kind of environment need to develop idempotent consumers - Kinesis itself is 'at least once' delivery, so duplicates are always a possibility. Obviously we want to limit the number of dupes as much as possible, but not at the expense of dropping messages altogether.

s-porpoise commented 6 years ago

Logs from the other instance, showing the items being picked up in a different batch four seconds later and the ultimate rejection of 2A2ROG which is then dropped.

2018-05-18 14:03:31.970 [-kinesis-consumer-1] DEBUG o.s.i.a.i.k.KinesisMessageDrivenChannelAdapter - Processing records: [{
SequenceNumber: 49584571934843771141903627621782937763568830686720688130,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=5597 cap=5597],PartitionKey: 2ATKAA,}, {
SequenceNumber: 49584571934843771141903627621784146689388445315895394306,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1980 cap=1980],PartitionKey: Z58WC8,}] 
for [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='Stream-localdev-dy', shard='shardId-000000000000', reset=false}, state=CONSUME}

...

2018-05-18 14:03:35.138 [-kinesis-consumer-1] DEBUG o.s.i.a.i.k.KinesisMessageDrivenChannelAdapter - Processing records: [{
SequenceNumber: 49584571934843771141903627621785355615208059945070100482,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1346 cap=1346],PartitionKey: 2A2ROG,}, {
SequenceNumber: 49584571934843771141903627621786564541027674574244806658,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1959 cap=1959],PartitionKey: Z58PRM,}, {
SequenceNumber: 49584571934843771141903627621787773466847289203419512834,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=32868 cap=32868],PartitionKey: Y8MUZY,}, {
SequenceNumber: 49584571934843771141903627621788982392666903901313695746,ApproximateArrivalTimestamp: Fri May 18 14:03:31 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1373 cap=1373],PartitionKey: Z9LQJL,}, {
SequenceNumber: 49584571934843771141903627621791400244306133159663108098,ApproximateArrivalTimestamp: Fri May 18 14:03:31 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=3218 cap=3218],PartitionKey: 2AV7DO,}, {
SequenceNumber: 49584571934843771141903627621792609170125747788837814274,ApproximateArrivalTimestamp: Fri May 18 14:03:31 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=10218 cap=10218],PartitionKey: 8VKO3A,}, {
SequenceNumber: 49584571934843771141903627621796235947584591745081409538,ApproximateArrivalTimestamp: Fri May 18 14:03:32 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=19917 cap=19917],PartitionKey: Y3QOH6,}, 
{SequenceNumber: 49584571934843771141903627621797444873404206374256115714,ApproximateArrivalTimestamp: Fri May 18 14:03:32 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=8316 cap=8316],PartitionKey: 2A5MGY,}, {
SequenceNumber: 49584571934843771141903627621798653799223821003430821890,ApproximateArrivalTimestamp: Fri May 18 14:03:32 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=9001 cap=9001],PartitionKey: 2AW64O,}] 
for [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='Stream-localdev-dy', shard='shardId-000000000000', reset=false}, state=CONSUME}] (min_seq, max_seq) = (49584571934843771141903627621785355615208059945070100482, 49584571934843771141903627621798653799223821003430821890)
2018-05-18 14:03:35.487 [-kinesis-consumer-1] INFO  o.s.i.a.i.kinesis.ShardCheckpointer - ***** replace returned false (key, existingSequence, sequenceNumber) = (Group:Stream-localdev-dy:shardId-000000000000, 49584571934843771141903627621784146689388445315895394306, 49584571934843771141903627621798653799223821003430821890)
2018-05-18 14:03:35.734 [-kinesis-consumer-1] INFO  o.s.i.a.i.k.KinesisMessageDrivenChannelAdapter - ***** The records '[{
SequenceNumber: 49584571934843771141903627621785355615208059945070100482,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1346 cap=1346],PartitionKey: 2A2ROG,}, {
SequenceNumber: 49584571934843771141903627621786564541027674574244806658,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1959 cap=1959],PartitionKey: Z58PRM,}, {
SequenceNumber: 49584571934843771141903627621787773466847289203419512834,ApproximateArrivalTimestamp: Fri May 18 14:03:30 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=32868 cap=32868],PartitionKey: Y8MUZY,}, {
SequenceNumber: 49584571934843771141903627621788982392666903901313695746,ApproximateArrivalTimestamp: Fri May 18 14:03:31 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=1373 cap=1373],PartitionKey: Z9LQJL,}, {
SequenceNumber: 49584571934843771141903627621791400244306133159663108098,ApproximateArrivalTimestamp: Fri May 18 14:03:31 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=3218 cap=3218],PartitionKey: 2AV7DO,}, {
SequenceNumber: 49584571934843771141903627621792609170125747788837814274,ApproximateArrivalTimestamp: Fri May 18 14:03:31 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=10218 cap=10218],PartitionKey: 8VKO3A,}, {
SequenceNumber: 49584571934843771141903627621796235947584591745081409538,ApproximateArrivalTimestamp: Fri May 18 14:03:32 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=19917 cap=19917],PartitionKey: Y3QOH6,}, {
SequenceNumber: 49584571934843771141903627621797444873404206374256115714,ApproximateArrivalTimestamp: Fri May 18 14:03:32 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=8316 cap=8316],PartitionKey: 2A5MGY,}, {
SequenceNumber: 49584571934843771141903627621798653799223821003430821890,ApproximateArrivalTimestamp: Fri May 18 14:03:32 AEST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=9001 cap=9001],PartitionKey: 2AW64O,}]' 
are skipped from processing because their sequence numbers are less than already checkpointed: 49584571934843771141903627621802280576682664959674417154
artembilan commented 6 years ago

Good, @jazir1979 .

Thank you for the investigation!

So, do I understand your correctly that we have an overlapping in different batches on different nodes? The batches may mix already processed messages and new, however according our logic in the replace() we just drop the whole batch without proper analysis its content.

Is that what you want to say?

s-porpoise commented 6 years ago

Yes that's a perfect explanation of it, thanks :)

artembilan commented 6 years ago

Great!

I will go ahead and try to fix it next week.

May I then ask you to review it and test on your environment to be sure that we are on the same page and on the proper path?

Thank you for all your feedback!

s-porpoise commented 6 years ago

Can do! I'll stick with M1 for now and I can easily switch in an updated version to verify the fix whenever it's ready. Thanks for your help.

artembilan commented 6 years ago

@jazir1979 ,

I've just started looking to this and the situation is in a dead end for me.

Am I right that we see the problem like:

The first consumer gets sequences, e.g. like 1, 2, 3. They are filtered correctly and ready to go. At the same time the second consumer gets sequences like 3,4,5. Since the first one hasn't committed its chechpoint yet, this one is good to go and commits its checkpoint and the sequence 5 and goes to the processing. At this point the first one comes to the commit point and just can't do that because the 5 is already there. This way we just lose records for sequences 1, and 2.

Am I right? If that, then all the checkpoint logic in the KinesisMessageDrivenChannelAdapter is full trash and I don't know the solution yet, unless we go ahead with the LockRegistry and Leader Election implementations for AWS in the #66 ...

We could restore the behavior in the M1 to let duplicates to pass, but this way our Consumer Group advertise is false and we have to add some Idempotent Receiver logic as well. But from here the MetadataStore doesn't have an expiration functionality.

Have I missed anything?

Thank for your time!

artembilan commented 6 years ago

Although that doesn't look so bad for DynamoDB: https://github.com/spring-projects/spring-integration-aws/issues/92

s-porpoise commented 6 years ago

Hi @artembilan,

Yes, I think what you're saying is right. It's a bit more subtle because it would take a few steps to get into that position, but the end result is the same. In your example, if the second consumer gets sequences like 3,4,5, presumably it should have already processed 1,2,3, so then nothing would have been lost. But if the consumers overlap processing in the right way, we can get into this situation, eg:

  1. Consumer A gets (1)
  2. Consumer B gets (1,2,3)
  3. Consumer A sets the checkpoint = 1
  4. Consumer B attempts to set the checkpoint to 3 and fails -- but this is okay because Consumer A will pick up these records on its next try...
  5. Consumer A gets (2,3,4,5)
  6. Consumer B gets (4,5,6)
  7. Consumer B sets the checkpoint = 6
  8. Consumer A attempts to set the checkpoint = 5 and fails -- so (2,3) have been dropped

A possible fix at step 4 and step 8 could be to try again if the checkpointing fails due to replace(...) returning false.

eg:

  1. Consumer A gets (1)
  2. Consumer B gets (1,2,3)
  3. Consumer A sets the checkpoint = 1
  4. Consumer B attempts to set the checkpoint to 3 and fails. It retries filtering the record list, which results in it rereading the current checkpoint and updating its internal state: it sees the current checkpoint value is 1 and it returns (2,3). This time when it attempts to set the checkpoint to 3 it succeeds, because the expected value (1) matches the actual value. Or, if it fails, keep retrying until filterRecords() returns an empty list, then move on.
  5. Consumer A gets (2,3,4,5) -- this gets filtered to (4,5) due to the checkpoint being 3
  6. Consumer B gets (4,5,6) ...

The problem with doing a retry is how many times will this happen, how much contention will there be? In practical terms, I think client applications could reduce the prevalence of this race condition by configuring "idleBetweenPolls" and "recordsLimit" to suit their application's profile and behaviour. Instead of a retry, another option might be to modify the current 'shard iterator' so that on the next run we will get records starting at the current checkpoint, instead of starting at where we were last up to?

In my mind, there is no way to guarantee that duplicates will never be delivered. The Kinesis documentation sets this expectation: https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html. Due to producer retries, it says "applications that need strict guarantees should embed a primary key within the record ..."

My application handles this using unique message IDs and some database processing. The ideal behaviour for me is that Spring Integration reduces the chance of duplicates to avoid contention, but I don't care about eliminating them entirely because I don't think such guarantees are possible without sacrificing performance, eg: using record-level checkpointing instead of batches, or distributed locks, etc.

If you want to try my above ideas and we can see if it works, I'm happy to help testing it out. If there's a flaw in my logic above and you are right that the checkpoint logic is total trash, then maybe reverting to the M1 behaviour is better. Duplicates, while annoying, are better than lost messages.

I'd be interested in knowing some details about what GH-66 is proposing. Does a leader election strategy mean that only one instance would be actively receiving messages at a time? If so, I'm not a fan of that either, since it limits scalability - but perhaps this is something that could be configured on or off at the developer's choice, depending on whether they're more interested in eliminating duplicates or having scalable performance?

Thanks for your help - I hope this makes sense :)

artembilan commented 6 years ago

@jazir1979 ,

Thank you for sharing your vision and experience!

Well, let me try to answer to your doubts:

A possible fix at step 4 and step 8 could be to try again if the checkpointing fails due to replace(...) returning false.

The logic there is like do not replace if the stored value is higher than we would like to save:

String existingSequence = getCheckpoint();
if (existingSequence == null ||
    new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
        if (existingSequence != null) {
            return this.checkpointStore.replace(this.key, existingSequence, sequenceNumber);
        }
        else {
            return this.checkpointStore.putIfAbsent(this.key, sequenceNumber) == null;
    }
}

Therefore if Consumer B has stored its sequence 3, we won't be able to store sequences 1 and 2 in Consumer A.

Plus there is really a race condition when we may pass the if with the < 0, but we can't replace because the value has just been changed there. According your experience in this case we lose some records, meanwhile saving unconditionally we would end up with duplicates...

"idleBetweenPolls" and "recordsLimit" to suit their application's profile and behaviour.

I think this should be a good point for the current situation, when we really configure recordsLimit to 1 and any overlapping should be eliminated.

I'd be interested in knowing some details about what GH-66 is proposing.

The idea behind that issue is similar to the Kinesis Client Library shard leasing and fully reflect Consumer Group abstraction in Spring Cloud Stream and Apache Kafka: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#consumer-groups

The point is that we have several consumers in the same group, but only one of them is on each shard. This way we guarantee not only "only-once", but also the order how records in one shard are processed.

A leader election feature should help us to come up with the rebalancing, when one of the consumers has gone and shards must be picked up by others in the group.

BTW, I started yesterday implementation for the DynamoDbLockRegistry, which is going to be as a foundation for the Leader Election: https://docs.spring.io/spring-integration/docs/5.0.5.RELEASE/reference/html/messaging-endpoints-chapter.html#leadership-event-handling

artembilan commented 6 years ago

FYI, @jazir1979 , I have just PR'ed a DynamoDbLockRegistry: #93

artembilan commented 6 years ago

@jazir1979 ,

I did some fix on the matter and now KinesisMessageDrivenChannelAdapter acquires an exclusive lock for the shard in the stream it is configured for. The checkpoint logic has been restored do not filter and do not compare sequence numbers since it does not make sense any more - only one consumer in a consumer group processes one shard in the stream.

It is not full rebalance and distribution logic yet, but at least we don't lose any records and don't have any overhead during checkpointing.

I would appreciate if you can test it with your solution. The KinesisMessageDrivenChannelAdapter has to be supplied with the LockRegistry for the locking functionality. The best choice, of course, is a DynamoDbLockRegistry implementation.

After your feedback I'll be ready to release 2.0 GA. The rest of ideas can be implemented in the next version.

Thank you!

s-porpoise commented 6 years ago

Thanks @artembilan, I'll try give it a run this week and let you know how it goes.

This new model of only one consumer per shard will impact what we had planned to do for auto-scaling in our app, but it does seem like the right approach.

artembilan commented 6 years ago

@jazir1979 ,

We are still waiting for your feedback before proceeding with release this week.

Thanks for understanding.

Auto-scaling (or rebalance) will be addressed the next version already. If we come up with the solution, of course...

s-porpoise commented 6 years ago

Sorry @artembilan I'm hoping to get to it today or tomorrow :)

s-porpoise commented 6 years ago

The new doco says the checkpoint timeToLive property has "No default - means no records expiration" but if I don't set this property I get a NullPointerException on startup from KinesisBinderConfiguration.java:129 due to autounboxing from a null Integer to the "int" required by DynamoDbMetaDataStore.

s-porpoise commented 6 years ago

Further, once I configure this property by setting "spring.cloud.stream.kinesis.binder.checkpoint.timeToLive=2000" I can start up one instance of my application, but if I start a second instance I get an error on startup: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: TimeToLive is already enabled (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: VK7VHJ2LMKKTQ4N149TJA778PRVV4KQNSO5AEMVJF66Q9ASUAAJG)

artembilan commented 6 years ago

The NPE has been just fixed: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/commit/48996618d63dff8501f8bfe7f027ea3b28bd70bd.

Thank you for feedback, @jazir1979 !

Fixing TTL on the DynamoDbMetaDataStore now...

s-porpoise commented 6 years ago

I made a local code change to get around the above errors and started my second instance. It's unable to obtain a lock on startup (expected behaviour):

2018-06-26 11:05:59.044 [-kinesis-dispatcher-1] ERROR o.s.i.a.i.k.KinesisMessageDrivenChannelAdapter - Error during locking: DynamoDbLock [lockKey=DYStream:DYStream-localdev-dy:shardId-000000000000,lockedAt=2018-06-26@11:05:49.039, lockItem=null] java.util.concurrent.TimeoutException: null at java.util.concurrent.FutureTask.get(FutureTask.java:205) at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:133) at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardLocksMonitor.tryLock(KinesisMessageDrivenChannelAdapter.java:1190) at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.lambda$populateShardsForStream$0(KinesisMessageDrivenChannelAdapter.java:572) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

However, if I shut down my other instance, I would expect it to release the locks (or the lease to expire) and the second instance to start listening to my shard. This didn't happen.

Now, even when I restart, no instance is able to obtain a lock so I am in a stuck state unless I delete the DynamoDB table. Is the lease duration (default 20 seconds) is not working properly?

artembilan commented 6 years ago

and the second instance to start listening to my shard.

No, there is no such a functionality yet..

The "no instance is able to obtain a lock" sounds like a bug. Although I'm not sure what am I missing during stop and unlocking... See ShardLocksMonitor and the finally in its run()

Have just fixed exception on the updateTimeToLive(): https://github.com/spring-projects/spring-integration-aws/commit/66e208f4f7dc8399984d676f2f9176b78fdc641e

artembilan commented 6 years ago

I can't have leader election for each shard because it is going to be very exhausting for the CPU to handle so big amount of leader threads. So, now election is just static during application start up...

s-porpoise commented 6 years ago

Debugging in AmazonDynamoDBLockClient and I see this:

Regarding the shard leader election, without that, this implementation is not usable for me. I would much rather the previous 1.0.0.M1 release with duplicates getting received (and no lost messages).

artembilan commented 6 years ago

Would you mind to share the problem with the AmazonDynamoDBLockClient in the appropriate AWS project: https://github.com/awslabs/dynamodb-lock-client ?

I'm so sorry to hear about your frustration in the shard leader election situation, but my head is really blowing and I still don't see the solution on the matter... Without a centralized control on the AWS per se, it is pretty hard to come up with something useful...

s-porpoise commented 6 years ago

I've been looking into the behaviour in the AmazonDynamoDBLockClient. I raised an issue there: https://github.com/awslabs/dynamodb-lock-client/issues/8

However, since we know that currently the acquireLock(...) method may sleep for "lease duration + some buffer", I would recommend that the "lockedFuture.get(10, TimeUnit.SECONDS)" in tryLock(...) should use some interval of "lease duration" rather than being hardcoded to 10 seconds.

I see it's difficult to do the shard leader election - I don't really have an answer. I think I'm just going to try plugging in a no-op lock registry and letting my nodes handle duplicate delivery for themselves. In the case of my app, we want to keep the number of shards low, because a single shard has high throughput and shard hours cost money - so we'll keep the # of shard receivers fixed but then scale up the number of background workers that can actually process the received messages.

artembilan commented 6 years ago

@jazir1979 ,

Thank you for your patience and experience!

Well, look: the KinesisMessageDrivenChannelAdapter can be configured without lockRegistry. This way there is not going to be any elections and all the shards will be used in all the consumers, even if it is the same group. The checkpoint logic now is like this:

this.checkpointer.setHighestSequence(records.get(records.size() - 1).getSequenceNumber());
...
if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
    this.checkpointer.checkpoint();
}

So, we process all the pulled records now without any filtering and store the highest sequneceNumber only if it is really higher than one in the store already. But again: records are processed without filtering.

Maybe this variant can fit to your requirement?

Regarding the leader election... Well, I think now that I really will spin one more thread to iterate over all the shards all the time with tryLock() on each of them. This way we should be able to pick up unlocked shards from other broker consumers. At least...

s-porpoise commented 6 years ago

Thanks @artembilan I'll give that a try with the updated checkpoint logic, I think it will work for me :)

For this app we'll probably stick with doing it that way, but I can still help to test out the leader election once that's done if you need someone to try it.

Thanks for your advice and help on this, and for all your work.

artembilan commented 6 years ago

Hello @jazir1979 !

Please, take a look into my latest change on the matter: https://github.com/spring-projects/spring-integration-aws/commit/0e02b0d47067ade4698f5268dd0ed947b573fb5f.

Any chances that you can test it from your environment?

Thank you for all the help here!

s-porpoise commented 6 years ago

Sorry for the delay @artembilan, I've been on other things, but I hope to try out the latest next week.