harlow / kinesis-consumer

Golang library for consuming Kinesis stream data
MIT License
263 stars 88 forks source link

Improve throughput with optional immediate rescan #122

Open jtackaberry opened 4 years ago

jtackaberry commented 4 years ago

This PR adds an option WithImmediateRescan() which, when enabled (it's disabled by default), causes the scan loop to skip the scan tick and immediately go back to Kinesis for another batch.

Another change is that the first scan is always performed immediately, without waiting for the next scan tick. This new behavior exists regardless of whether immediate rescanning is enabled.

These two control flow changes combined allow for higher scan intervals, which results in less chatty clients while also preserving throughput. In fact, this slightly increases effective throughput per shard even with the default 250ms scan interval by eliminating any residual delay between batches, which improves the bandwidth-delay product.

A second commit in this PR exposes MillisBehindLatest in the Record struct passed to ScanFunc(). My use case is to allow applications to provide metrics for per-shard lag times, but there could be other uses as well.

This second change is unrelated, but since it's in the same area of code as the immediate rescan option they became co-dependent. If this is a problem, I can remove it for now.

jtackaberry commented 4 years ago

A variation on this idea I'd like to consider is to also use MillisBehindLatest to aid in the decision of whether an immediate rescan should be done. If we fetched > 0 records and we're > 0 milliseconds behind, then rescan immediately. This could further reduce redundant calls to Kinesis while not impeding the scenario of trying to catch up on older events as quickly as possible.

I'll experiment with this idea.

harlow commented 4 years ago

Another change is that the first scan is always performed immediately, without waiting for the next scan tick. This new behavior exists regardless of whether immediate rescanning is enabled.

Love this! can we split this out into it's own PR and get this merged first. then we can work on the interval delay

jtackaberry commented 4 years ago

Love this! can we split this out into it's own PR and get this merged first. then we can work on the interval delay

Yep no problem.

jtackaberry commented 4 years ago

Another change is that the first scan is always performed immediately, without waiting for the next scan tick. This new behavior exists regardless of whether immediate rescanning is enabled.

Love this! can we split this out into it's own PR and get this merged first. then we can work on the interval delay

123 is now created just with this logic.

I also removed the MillisBehindLatest addition to the Record from this PR so as not to muddy the waters. I'll submit a separate PR for that once #123 shakes out.

jtackaberry commented 4 years ago

A variation on this idea I'd like to consider is to also use MillisBehindLatest to aid in the decision of whether an immediate rescan should be done. If we fetched > 0 records and we're > 0 milliseconds behind, then rescan immediately. This could further reduce redundant calls to Kinesis while not impeding the scenario of trying to catch up on older events as quickly as possible.

Ran this overnight and I'm happy with the result. I've included that logic in this PR.

jtackaberry commented 4 years ago

Rebased and squashed.

jtackaberry commented 4 years ago

Thinking on this a little more, I wonder if for certain types of streams this could actually reduce effective thoughput.

For low volume streams, this should be innocuous. When there's activity on the stream, we'll quickly catch up and then simply wait for the next scan tick as with the existing behavior.

For high volume streams, each call to Kinesis will give us plenty of work to do, so it makes sense to read that data as quickly as possible. However, there is a 5 TPS rate limit per shard for GetRecords, which we should try to respect lest we get throttled. With the immediate rescan logic, the probability of getting throttled is high for a high volume stream. So this is a gap in this PR.

A more interesting unintended consequence exists for middle-of-the-road data rates. In this scenario, we are more likely to have a higher number of GetRecords calls with each returning a fewer number of records than if gave a bit of time to queue up a hunk of records to pull at once.

Based on that, I'm thinking it makes sense to

Thoughts?

harlow commented 4 years ago

@jtackaberry I'd say let's keep it simple for now. If the current PR is working well for you then I'm cool with merging in it's current form.

One thing I guess could be interesting in this middle case; is whether *resp.MillisBehindLatest > c.scanInterval and if it's greater than we grab rows; else let it wait in the ticker until it gets to the desired interval.

jtackaberry commented 4 years ago

One thing I guess could be interesting in this middle case; is whether *resp.MillisBehindLatest > c.scanInterval and if it's greater than we grab rows; else let it wait in the ticker until it gets to the desired interval.

That sounds like a sensible tweak. So you're not overly concerned about hitting the API frequently enough to trigger throttling?

I think we may want another ticker that fires according to the AWS-specified rate limit, which we fall back to in "immediate" mode. There's another use case driving this, which I'm going to open an issue about so we can talk about it further.