awslabs / amazon-kinesis-producer

Amazon Kinesis Producer Library
Apache License 2.0
400 stars 331 forks source link

recordTtl = Long.MAX_VALUE causes records to expire immediately & no standard exception message in UserRecordFailedException #54

Open endario opened 8 years ago

endario commented 8 years ago

I'm fairly new to Kinesis and any help is greatly appreciated.

I pretty much followed the code example on AWS to the letter but every single callback fails with a null message. I'm pretty stuck now as I can't find anyone help having similar problems.

I ran the program on both Rackspace Linux and Mac OS X with the same result. I've also tried changing the partition key and the record content without success.

I'm using the latest 0.10.2 and installed using Maven.

Included the full stack trace below:

[2016-06-07 18:28:54.862884] [0x0000700000081000] [info] [kinesis_producer.cc:79] Created pipeline for stream "my-kinesis-stream" [2016-06-07 18:28:54.863092] [0x0000700000081000] [info] [shard_map.cc:83] Updating shard map for stream "my-kinesis-stream" [2016-06-07 18:28:54.988505] [0x00007fff7cc3f000] [info] [shard_map.cc:163] Successfully updated shard map for stream "my-kinesis-stream" found 1 shards 18:28:55.895 [kpl-callback-pool-0-thread-0] WARN c.s.kinesis.KinesisRecordProducer - Failed to send record '1465320533000' to Kinesis. com.amazonaws.services.kinesis.producer.UserRecordFailedException: null at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.onPutRecordResult(KinesisProducer.java:188) [amazon-kinesis-producer-0.10.2.jar:na] at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.access$000(KinesisProducer.java:127) [amazon-kinesis-producer-0.10.2.jar:na] at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler$1.run(KinesisProducer.java:134) [amazon-kinesis-producer-0.10.2.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_66] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_66] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_66] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_66] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]

endario commented 8 years ago

Turned out to me a lesser issue.

I set recordTtl to Long.MAX_VALUE as the document # Maximum (inclusive): 9223372036854775807 suggests that it's valid value, but it ended up making all record to expire immediately.

Personally I think providing a standard Exception message in UserRecordFailedException helps debugging, but the exception did reveal the cause once I enhanced its handling (thanks to existing code examples):

Futures.addCallback(future, new FutureCallback() { @Override public void onFailure(@Nonnull Throwable throwable) { if (throwable instanceof UserRecordFailedException) { Attempt last = Iterables.getLast(((UserRecordFailedException) throwable).getResult().getAttempts()); LOGGER.warn("Failed to put record. Error code '{}' : '{}'.", last.getErrorCode(), last.getErrorMessage()); } LOGGER.warn(String.format("Exception occurred when sending record '%s' to Kinesis", partitionKey), throwable); }

samuelgmartinez commented 8 years ago

This is an advice, not a comment on the issue itself :)

Be careful setting the TTL that long, because the TTL is meant to discard records when your producer is throttled because you are doing so much requests/putting so much data.

This throttled records are stored in the daemon using an unbounded buffer. So if you have been throttled and you have this high TTLs the processor will crash because eventually it will run out of memory. So you should assume that in the case of being throttled you may lose some records :(

If you expect to hit that you can use multiple shards or, if your records are small enough, use producer aggregation to pack multiple records into one single kinesis record.

:

pfifer commented 8 years ago

Thanks for reporting this we'll look into updating the documentation, and the validation to prevent setting the record TTL Long.MAX_VALUE.

I agree with @samuelgmartinez on being very careful with large TTLs.

asnare commented 7 years ago

So, um, what is the largest valid value here?

I was just trying to figure this out…

The protobuf definition used to transfer this setting says "unsigned 64-bit". That seems to be a lie everyone is willing to live with:

The point at which things seem to go wrong is the arithmetic for expiry. When a message is queued, the time at which the timeout is calculated by adding 'now' to the record_ttl value. This promptly overflows to a time which has already lapsed.

This bug is a frequent issue with timeout logic. The correct way to handle expiry is:

  1. Record the start time. (Do not calculate the expected expiry time.)
  2. When checking for expiry, calculated the elapsed time: now() - start_time
  3. Expiry has occurred if elapsed_time >= record_ttl.

Steps 2 and 3 are safe from an overflow perspective, assuming time always increases. This is why it's important for expiry logic to also use a monotonically-increasing clock. (I think that means using std::chrono::steady_clock instead of std::chrono::high_resolution_clock since the latter can go backwards.)

Coming full circle…

I apologise if I've misunderstood the C++ side of things, but I suspect this is what's going on.