spring-cloud / spring-cloud-stream-binder-aws-kinesis

Spring Cloud Stream binder for AWS Kinesis
Apache License 2.0
99 stars 97 forks source link

lost some messages when kinesis shards are scale out automatically if kinesis stream set as on-demand model #212

Closed shentang1201 closed 11 months ago

shentang1201 commented 11 months ago

Hi teams, We have encountered an issue related to consuming messages from the an on-demand Kinesis stream associated with dynamodb table. Some messages were not consumed or lost when the new shards were scaled out. And, we observed this issue doesn't happen in provision kinesis. We used spring cloud stream binder kinesis 2.2.0. Could I know if on-demand model is supported?

Regards

shentang1201 commented 11 months ago

Btw, multiple consumer servers were started by setting instanceCount and instanceIndex.

shentang1201 commented 11 months ago

Hi, teams, If the kinesis changed to provision mode and no resharding happens, the problem looks fixed. Besides this solution, is there any other fix or workaround?

I am looking forward to your update.

Regards

lengyuewuhua commented 11 months ago

Yes, I faces the same issue.

When I set up a static shard distribution for more than 1 instances by specifying both instanceIndex and instanceCount for a kinesis with a on-deman model enabled, when resharding happened in that kinese, the rebalacing does not happen at KinesisMessageDrivenChannelAdapter, but got stoped. And for those new created shards, no instances consume messages from them.

Configuration as below - For Server1 - spring: cloud: stream: instanceCount: 2 instanceIndex: 0

For Server2 - spring: cloud: stream: instanceCount: 2 instanceIndex: 1

Logs -

2023-10-17 10:30:18,570 INFO [-kinesis-dispatcher-1] o.s.c.l.LogAccessor:Stopping the [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='test-kinesis', shard='shardId-000000000000', reset=false}, state=STOP}] on the checkpoint [9223372036854775807] because the shard has been CLOSED and exhausted.

artembilan commented 11 months ago

If you use instanceCount and instanceIndex, then it is more like static shards assignment, therefore there is no way to react to the resharding since we just are not interested in other shards at the current consumer.

lengyuewuhua commented 11 months ago

Hi @artembilan , does that mean setting up instanceCount & instanceIndex only supports provisioned kinestream but not on-demand kinesis stream ? If the answer is yes, we have not seen this documented anywhere. And it does not make sense that setting up instanceCount & instanceIndex will not be aware of and action to resharding of kinesis steram.

artembilan commented 11 months ago

I'm not sure what does not make sense, but KinesisMessageDrivenChannelAdapter is not aware about streams when we assign specific shards. Therefore it just cannot react to the resharding.

I would like to understand what are those provisioned and on-demand. Then I might look further whatever we may do. The instanceCount & instanceIndex are binder options and those shards assignment is done on startup.

lengyuewuhua commented 11 months ago

Hi @artembilan. For an on-demand data stream , it scales the shard count depending on the write throughput. So, it's better for KinesisMessageDrivenChannelAdapter to have an ability to adjust the the shard changes.

artembilan commented 11 months ago

It does have that ability, but only if it is configured with Kinesis streams, not specific shards as it is in case of instanceCount & instanceIndex properties. When you configure your stream applications to work with the same streams and in a same consumer group, the shards in those streams are distributed using locks on shards in those streams. When resharding happens on a stream, the KinesisMessageDrivenChannelAdapter re-fetches fresh shards from there and everything starts from scratch to distribute them properly. Therefore I don't recommend instanceCount & instanceIndex properties for that on-demand Kinesis configuration.

lengyuewuhua commented 11 months ago

Thanks @artembilan for the updates. The reason why we setup instanceCount & instanceIndex properties for an on-demand data stream is that we want distribution evenly among shards. But if we configure our application for a stream without setting up instanceCount & instanceIndex properties, the even distribution for shards is not guaranteed.

artembilan commented 11 months ago

Again: the instanceCount & instanceIndex config makes static, existing shards distribution. There is no way to react for a resharding after that. In the latest version of Spring Integration AWS (and therefore Kinesis Binder) we made some improvements for shards locking in provided streams. On the other hand you can try with KCL support. But again: you have to ditch those instanceCount & instanceIndex options.

lengyuewuhua commented 11 months ago

Thanks for the information, @artembilan.

shentang1201 commented 11 months ago

Hi @artembilan Got it and thank you so much for your explanation.

Regards

artembilan commented 11 months ago

Closed as Works as Designed