awslabs / spark-sql-kinesis-connector

Spark Structured Streaming Kinesis Data Streams connector supports both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO)
Apache License 2.0
26 stars 13 forks source link

Question about early stopping for a batch #11

Open greg-roberts-bbc opened 9 months ago

greg-roberts-bbc commented 9 months ago

Hey, we really appreciate this library, it resolves a number of issues we experienced when trying the default kinesis connector in Glue. We have a question about the behaviour of the library for a high volume stream.

We're using v1.1.0 of the library in a Glue (4.0) Streaming job to pull records from a high velocity stream (~20-25 records per second), with a batch window size of ~40 seconds. The data coming into Kinesis is continuous with a fairly steady profile. Our job uses an EFO consumer, and the stream has 16 provisioned shards.

Having looked through the code, our understanding of the KinesisV2PartitionReader is that if it reaches the 'end' of the stream (i.e. userRecord.millisBehindLatest.longValue() == 0), it will always run at least one more loop to see if it can get more data. In our tests with the connector, this meant that the connector would always keep the subscription open until maxFetchRecordsPerShard had been hit, and only then would fetchNext be set to false, ending the loop.

In a micro-batch environment, this surely isn't desirable, as you could be returning the batch of data as soon as you hit the end of the shard. In our experiments, this meant the whole job was very sensitive tomaxFetchRecordsPerShard; setting it too low meant the job would fall behind because it was reading less data than had been generated since the last batch; setting it too high meant our batches were taking too long to write to the sink, so the job would fall behind anyway.

As an experiment, we changed this line to just be fetchNext = false. When we ran the job with this change in the connector, things ran as we expected; each batch contained all the data which had arrived on the stream since the last batch offsets had been committed, and the job stayed up to date constantly.

We would like to understand the intention behind this design, and if we have understood this behaviour correctly. If so, we would like to know if you would be interest in a PR from us implementing this as a configurable behaviour.

hwanghw commented 9 months ago

Hi, thanks for your feedback on early stopping. Yes, your understanding is correct. This PR introduces a new parameter maxFetchTimePerShardSec. Does this resolve your problem?

Maximum time in seconds to fetch records per shard per microbatch. If kinesis.maxFetchTimePerShardSec is not explicitly defined, the decision to conclude the current task is based on the value of kinesis.maxFetchRecordsPerShard. However, if kinesis.maxFetchTimePerShardSec is defined, the current task is terminated when either kinesis.maxFetchRecordsPerShard or kinesis.maxFetchTimePerShardSec is reached first. maxFetchTimePerShardSec must be no less than 10 seconds to make sure the fetch can be progressing.
Note: If a shard is idle (no new data) for more than 10s, the task terminates even if neither maxFetchTimePerShard nor maxFetchRecordsPerShard reached.

greg-roberts-bbc commented 9 months ago

Thanks for your response. I have looked through this and had a think. I still think this stands as a separate issue/behaviour.

The addition of maxFetchTimePerShardSec gives another means of constraining the maximum time spent reading from the stream, but what we're describing is an entirely different behaviour; rather than putting an upper limit on the time spent reading from the stream, signalling that the reader should return as soon as the end of the stream has been reached.

In our opinion, this is a more useful behaviour for reading from a stream in a micro-batch context, as it ideally minimises the time it takes for a particular record to reach the sink.

How would you feel about a PR (possibly into the v1.2.0 branch) which added this as a flaggable option?

e.g. kinesis.stopReadOnStreamEnd (bool) which implements the behaviour I describe. With this flag active, the existing maxFetch... parameters would have the same effect, but the reader would additionally return early if it reached the end of the stream.

hwanghw commented 9 months ago

As there can be more than one message having millisBehindLatest=0 in KDS, If some of the messages are not fetched by connector yet (so dataQueue.size() == 0), there is no guarantee that the batch is stopped at the end of the stream/shard. Thus for millisBehindLatest=0 messages, it could be some messages are processed at this batch, while the rest will need to wait for next batch.

If the option helps on your scenario, please open a PR on this. As described above, may need to find a new name to the option (stopReadWhenMillisBehindLatestIsZero? I know it is long (-: ) . Thanks for your detailed explanation and contribution.

greg-roberts-bbc commented 9 months ago

I think stopReadWhenMillisBehindLatestIsZero covers the behaviour best. Where shall I direct the PR; into main? or another branch?

hwanghw commented 9 months ago

Please use main branch. I have v1.2.0 changes merged to main. Thanks