qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

Can't get newer records #96

Open Tomaszuniek opened 4 years ago

Tomaszuniek commented 4 years ago

Hi, I am not shure what is this issue related to, so I try my luck here. I am trying to get records from my stream with qubole kinesis spark library:

val kinesis = sparkContextService.SQLC.sparkSession.readStream .format("kinesis") .option("streamName", "streamName") .option("region", "region") .option("endpointUrl", "endpointUrl") .option("startingposition", "TRIM_HORIZON") .option("awsAccessKeyId", "awsAccessKeyId") .option("awsSecretKey", "awsSecretKey") .option("avoidEmptyBatches", "true") .load() and then: kinesis .selectExpr("CAST(data AS STRING)").as[(String)] .groupBy("data").count() .writeStream .format("console") .outputMode("complete") .start() .awaitTermination()

To be shure that everything within Kinesis is okay, I test the putting and getting data with other non-spark project. I put the data into the Kinesis stream with my other non-spark project with PutRecordRequest. After putting the data into the stream, within this non-spark project I test if the data is put correctly. I try to see if the data is present with the GetRecordsRequest.

while (thereAreShards) {
    GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
    getRecordsRequest.setShardIterator(shardIterator);
    try {
        GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest);
        records = result.getRecords();
        if(records.size()!=0){
            System.out.println(count);
            count = 0;
        }
        else
        {
            count++;
        }
        shardIterator = result.getNextShardIterator();
    } catch (Throwable t){
        System.out.println(t);
    }
    if (shardIterator.equals(null)) {
        thereAreShards = false;
    }

I noticed, that if my count in non-spark app, needed to get the first result is something like 100, 112, 130, (so the app needs for example 100 getNextShardIterator() to get the data), I can't get the data in my spark application. However, if my non-spark project returns count like 5, or 10 I get data in spark app without problems:

Batch: 2

+----+-----+

|data|count|

+----+-----+

|abc | 400|

| ef | 900|

|GHKIJ| 500|

|Hi | 1000|

+-----+-----+

So if the data is put far in the shard I can't access it, even when I left the spark query running for 24 hours and 6300 batches passed I didn't get any data. But if the data is on the 5th shard iterator, I get the data in the 2nd batch already. The newer data that I put is placed further in the shard, the count is always bigger than 100. For example, I put "abc" 500 times to empty shard, so the "count" value I get is something like 115. The count of the older data however is getting smaller, it starts also around 100, but after some time it's count is 5, so I see the movement of the data on the shard. Anyone knows what could be the issue, why can't I get the newer data that is further in the shard and needs more getNextShardIterator()? I thought it needed more time, but more than 20 hours should be more than enough so this shouldn't be the issue with that. What is interesting is that once the job is running from beggining, I can't get the new data even after 20 hours, but when I start the query later, when the data is not so new and is placed closer on the shard I get the data immediately. Of course I check if the data is already being put in the Kinesis stream by checking in the Kinesis console and trying the GetRecordsRequest. Is it possible that starting from one place on the shard, and with iterator going up and the data being moved down on the shard, the iterator and the data "pass" each other somehow? Thanks in advance, Tomasz

rae89 commented 3 years ago

@Tomaszuniek were you able to fix this issue? I am facing the same problem

queueburt commented 3 years ago

I'm having a similar issue that I think might be related, and I'm trying to get down to root-cause. For me, the problem seems to occur when the connector gets stuck on querying for shard iterators "AFTER_SEQUENCE_NUMBER" and doesn't get any results. Put another way, if there's a gap between records (or a sequence number passes the trim horizon with no records in the next iterator), the connector seems to query "AFTER_SEQUENCE_NUMBER" ad-infinitum as opposed to tracking "Next Shard Iterator". Best case, the streaming app falls so far behind that it's effectively gathering records at the end of the duration. Worst case, it loses data entirely. Neither is okay.

TL;DR; "AFTER_SEQUENCE_NUMBER" isn't a globalized "all records after this data" mechanism. It (and also TRIM_HORIZON, for the record) can return no records even if there's data in shard. Kinesis's mechanism for determining iterations for empty data in a shard seems to vary based on total duration, but the short version is that it seems that either this connector doesn't consider when to use the next shard iterator to sequence its motion through the stream, or the incrementation logic isn't triggering when it should.

roncemer commented 2 years ago

I sent an email to the maintainer of this repo, and didn't get a response. For now, I've forked the project here https://github.com/roncemer/kinesis-spark-connector and updated it to build for Spark 3.2.1. Under Spark 3.2.1, it appears to be working correctly. I allowed it to go overnight with no new records being posted to the Kinesis data stream. When I started posting records again, the records arrived in Spark, and were processed.

I issued pull request https://github.com/qubole/kinesis-sql/pull/113/files from my forked repo back to the original qubole/kinesis-sql repo. Be sure to read the full description under the Comments tab as well.

I could use the help of anyone who might be interested in this project. Apparently, qubole/kinesis-sql is abandoned for about two years, and the main guy who was maintaining it doesn't respond to emails. If anyone can get in touch with someone who has control of this repo, please ask them to make me a committer. Barring that, I will have to publish the jar file from my forked repo as a maven resource. In any case, if I end up maintaining this project, either the original or forked repo, I will need volunteers to help handle the builds each time a new version of Spark is released.

In the mean time, feel free to build your own jar from my forked repo and run it under Spark 3.2.1. Also, let me know if it works under 3.3.x, or if you run into any other problems.

Thanks! Ron Cemer