qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

Not honoring maxFetchTimeInMs on empty streams #87

Closed chadlagore closed 4 years ago

chadlagore commented 4 years ago

The library gets stuck pulling records from Kinesis for very long periods of time when the stream is empty. By looking at the debug output, I have confirmed it is due to this condition always being true when the stream is empty (we lose the lastSequenceNumber and never regain it due to emptiness).

As a result of the prolonged pulling, we run into Kinesis read limits which further throttles. In this loop for example, it hits the Kinesis API ~250 times to accomplish a micro-batch on a 30s trigger (which can take a few mins to do). Eventually, getMillisBehindLatest becomes 0 and the loop can move forward. For reference, we use a fallback stream in one region that is normally empty, and union it with another region which normally has data. The full region finishes in about 5-10s, the empty region runs for minutes sometimes.

Example select log lines in the loop for a single micro-batch (library makes 257 pull attempts):

20/08/23 20:34:54 DEBUG KinesisSourceRDD: Milli secs behind is 80956000
20/08/23 20:34:54 DEBUG KinesisSourceRDD: Milli secs behind is 80670000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 80374000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 80089000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 79790000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 79496000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 79200000
...
20/08/23 20:35:41 DEBUG KinesisSourceRDD: Milli secs behind is 1324000
20/08/23 20:35:41 DEBUG KinesisSourceRDD: Milli secs behind is 1048000
20/08/23 20:35:41 DEBUG KinesisSourceRDD: Milli secs behind is 765000
20/08/23 20:35:41 DEBUG KinesisSourceRDD: Milli secs behind is 480000
20/08/23 20:35:42 DEBUG KinesisSourceRDD: Milli secs behind is 203000
20/08/23 20:35:42 DEBUG KinesisSourceRDD: Milli secs behind is 0

Exceptions raised (86 within micro batch):

20/08/23 20:35:38 DEBUG request: Received error response: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000001 in stream stream_name under account 123456789. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: )

Current version: spark-sql-kinesis_2.11-1.1.2-spark_2.4

chadlagore commented 4 years ago

I removed the condition and tested it here. Once the stream sees records again, it immediately catches up. The iterator age grows as expected, but as soon as records show up, it moves back to the tip. Do you see any danger in removing this condition @itsvikramagr? I don't think we should assume we'll always see at-least one record on a given micro batch.

chadlagore commented 4 years ago

This also appears to have a significant impact on checkpoint recovery. In production we see tasks undergoing checkpoint recovery take up 15mins to fetch their first batch, in stage this is just a few mins (but the trigger is 30s, so it's still bad). I suspect it's happening because there are more millisBehindLatest to catch up in those situations. After removing the condition, checkpoint recovery fell under the trigger window on stage. I have not used the fix in production yet.

itsvikramagr commented 4 years ago

@chadlagore - the condition was added as part of this PR - https://github.com/qubole/kinesis-sql/pull/49/files.

Will re-look at the code and go through your use-case to see what we can do here.

chadlagore commented 4 years ago

Thank you @itsvikramagr