Open scattym opened 4 years ago
I can work around the problem by putting the following at line 306 in kinesis_consumer.py:
if iterator_args.get("ShardIteratorType", "") == 'AT_SEQUENCE_NUMBER' and iterator_args["StartingSequenceNumber"] == '':
continue
But I suspect I am missing the point about how to properly handle this scenario :)
Putting the above in results in the following set of messages:
2019-11-26 15:46:19,005 - 30652 - root - INFO - Got shard reader for shard id: shardId-000000000000
2019-11-26 15:46:19,134 - 30652 - root - ERROR - Shard closed with error:
2019-11-26 15:46:19,248 - 30652 - async_kinesis_client - WARNING - shardId-000000000000: Can not get last saved checkpointed seq!
And this loops at every rescan, from what I can tell.
Hi @scattym I'm afraid, this case is not very well tested and might be buggy. I'll look into it. The initial idea was that if resharding happens, some readers might die and new readers will be returned, but I totally didn't think about what will happen to all the checkpointing business, which is really a shame.
Resharding the kinesis stream results in a ShardClosedException. Catching this exception is fine, but I think this is resulting in the reader being added to the "dead_readers" list and then we try to reopen it. Then on line 310 of the kinesis_consumer.py, we get the following exception:
Is there a way to mark the reader as closed and not try to reopen it? Apologies if there is an obvious way to do this, I just can't see it at the moment.
I did try to stop the reader with the .stop() but I don't think that behaves as I was hoping.
I am invoking the reader with AT_TIMESTAMP and no dynamodb instance.