This change should add the ability to specify starting Kinesis position to read from by passing a JSON containing sequence numbers for each shard directly to "startingposition" option.
A bit of background
In our spark structured stream application, we have a listener (org.apache.spark.sql.streaming.StreamingQueryListener) that manually persists SourceProgress.endOffset (which is a JSON string) to an external storage after each batch. And in case of quoble/kinesis-sql it looks like:
And we need to be able to pick up this JSON on application re-deploy and continue processing from the last position (at-least-once semantics is ok for us).
kafka-0-10-sql allows to pass a JSON searilized offset as "startingOffsets" option, and continue reading from the last position.
While moving one of spark jobs to Kinesis we've realized that such functionality is missing.
By this change I'd like to add an ability to specify Kinesis initial position, by passing JSON serialized map shardId->ShardInfo to "startingposition" option.
Thanks, @snorochevskiy for this. This is great functionality and I have been thinking to add the support for some time now. Thanks for picking it up and starting the PR.
This change should add the ability to specify starting Kinesis position to read from by passing a JSON containing sequence numbers for each shard directly to
"startingposition"
option.A bit of background In our spark structured stream application, we have a listener (
org.apache.spark.sql.streaming.StreamingQueryListener
) that manually persistsSourceProgress.endOffset
(which is a JSON string) to an external storage after each batch. And in case of quoble/kinesis-sql it looks like:And we need to be able to pick up this JSON on application re-deploy and continue processing from the last position (at-least-once semantics is ok for us).
kafka-0-10-sql allows to pass a JSON searilized offset as "startingOffsets" option, and continue reading from the last position. While moving one of spark jobs to Kinesis we've realized that such functionality is missing.
By this change I'd like to add an ability to specify Kinesis initial position, by passing JSON serialized map shardId->ShardInfo to
"startingposition"
option.This implementation is already tested on our environment. (I understand it may be similar to https://github.com/qubole/kinesis-sql/pull/65)