Open WTa-hash opened 3 years ago
It seems the issue was mentioned here as well: https://github.com/qubole/kinesis-sql/pull/90#issuecomment-685418745
Yeah, the alternative is that maxReadTimeInMs
is ignored and the application will continually call and get throttled against the Kinesis read API. I think the appropriate fix is to migrate to SubscribeToShard
, which uses an event driven architecture to activate the streaming application. This is probably a large lift.
I sent an email to the maintainer of this repo, and didn't get a response. For now, I've forked the project here https://github.com/roncemer/kinesis-spark-connector and updated it to build for Spark 3.2.1. Under Spark 3.2.1, it appears to be working correctly. I allowed it to go overnight with no new records being posted to the Kinesis data stream. When I started posting records again, the records arrived in Spark, and were processed.
I issued pull request https://github.com/qubole/kinesis-sql/pull/113/files from my forked repo back to the original qubole/kinesis-sql repo. Be sure to read the full description under the Comments tab as well.
I could use the help of anyone who might be interested in this project. Apparently, qubole/kinesis-sql is abandoned for about two years, and the main guy who was maintaining it doesn't respond to emails. If anyone can get in touch with someone who has control of this repo, please ask them to make me a committer. Barring that, I will have to publish the jar file from my forked repo as a maven resource. In any case, if I end up maintaining this project, either the original or forked repo, I will need volunteers to help handle the builds each time a new version of Spark is released.
In the mean time, feel free to build your own jar from my forked repo and run it under Spark 3.2.1. Also, let me know if it works under 3.3.x, or if you run into any other problems.
Thanks! Ron Cemer
I have a Spark structured stream that is using Qubole Kinesis connector 1.2.0 on a Kinesis stream with 2 shards with 2 day retention period.
These are my Qubole Kinesis configs:
Here's the issue I'm having... imagine a few records are pushed at time 00:00:00, then next set of records at 20:00:00. There's about a 20 hour gap between the 2 sets of records in the Kinesis stream. After this fix https://github.com/qubole/kinesis-sql/commit/5bd378b9094438d182bbbd4b489e32fd03e690af was introduced in 1.2.0, I ran into an issue where Spark is unable to fetch the later records from time 20:00:00 due to the long gap of no activity. It seems removing idle time between reads and increasing max fetch time helps in getting the later set of records. When I switch back to 1.1.4, then the provided Qubole Kinesis configs from above works, but I notice that it doesn't honor max fetch time and so Spark spends more time trying to get to the tip and may block other spark jobs until this gets done.
What's the recommended approach in this case? Using timestamp as offset may work better?