Closed chadlagore closed 4 years ago
Agreed. Here is an example of a consumer that is working from a set of empty shards running this code (these are the persisted checkpoint offsets):
# batch3 (no data)
v1
{"batchWatermarkMs":0,"batchTimestampMs":1599006537913,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"4"}}
{"metadata":{"streamName":"stream_name","batchId":"3"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1599005556576"}}
{"metadata":{"streamName":"stream_name","batchId":"3"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1599005556576"}}
# batch 4 (data appears)
v1
{"batchWatermarkMs":1599005056342,"batchTimestampMs":1599006671256,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"4"}}
{"metadata":{"streamName":"stream_name","batchId":"4"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49609690170106349127857093631690192845261414844357148674"}}
{"metadata":{"streamName":"stream_name","batchId":"4"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1599005556576"}}
# batch 5
v1
{"batchWatermarkMs":1599005056342,"batchTimestampMs":1599006691777,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"4"}}
{"metadata":{"streamName":"stream_name","batchId":"5"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49609690170106349127857093631690192845261414844357148674"}}
{"metadata":{"streamName":"stream_name","batchId":"5"},"shardId-000000000000":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1599005556576"}}
Notice at batch 4, data appears and we move to the sequence number of the incoming data.
@chadlagore @elainearbaugh
In the code, I am handling the following scenario - Say we have started reading from time_horizon. So we need to make multiple get-records API calls to reach to a point where kinesis has data in it. (unfortunately unlike other data sources, Kinesis streams won't give the first available record in 1 API call). And if we dont reach the point where kinesis has data on it within the specified maxFetchTimeInMs
, we will have a similar problem in the next micro-batch which will again start reading from trim_horizon. And this loop with continue and we would not process any data from that particular stream.
I agree that the current approach violates the meaning of maxFetchTimeInMs and will lead to AWS throttling when we are already on the tip of the stream and there is no new data to read.
Do you have any good ideas in handling the above-mentioned scenario?
Thanks for merging this @itsvikramagr. I believe the answer to your question is that in a call at TRIM_HORIZON
(or any timestamp prior to the retention period), and with an otherwise empty stream, Kinesis is still able to return the new record at the tip of the stream, then this library moves the sequence number to the that point. The > 1 call GetRecords API Kinesis has is a bit odd, but I think that is characteristic of Kinesis addressed by the maxFetchTimeInMs
param.
Fixes bug https://github.com/qubole/kinesis-sql/issues/87.
This condition causes an infinite loop and throttling from AWS when the shard is empty. In my example on the ticket, I show it hitting the Kinesis API iteratively for minutes before giving up on the shard. We should more gracefully handle empty shards which admit no
lastReadSequenceNumber
no matter how many times you hit them sequentially.Moreover, I believe the condition is unnecessary, because one can just increase
maxReadTimeInMs
if you want to spend a longer time reading on the shard.