awslabs / amazon-kinesis-producer

Amazon Kinesis Producer Library
Apache License 2.0
399 stars 331 forks source link

Running KPL on AWS Lambda #243

Open deirdrekong opened 5 years ago

deirdrekong commented 5 years ago

Hi,

I am using KPL on AWS Lambda. When I tried to invoke it, I only got this in the log 2019-01-24 23:39:59 <fdb361ef-0aa2-423c-ba21-29c39b52987e> INFO KinesisProducer:881 - Extracting binaries to /tmp/amazon-kinesis-producer-native-binaries

However, the record was not written to Kinesis data stream and Lambda just returned.

Any insights on what is happening?

Thanks,

deirdrekong commented 5 years ago

When I put flushsync after addUserRecord then I get further. Any ideas why I need to put flushsync in Lambda but not in service when using KPL?

bisoldi commented 5 years ago

@deirdrekong Did you get anywhere with this?

jkuehl2 commented 5 years ago

i have the same problem, i implement my lambda handlers calling flushsync() in the end. But how to handle if the lambda times out? Of course retrying the whole batch is an option but kinesis behaves differently here than other AWS services.

bisoldi commented 5 years ago

@jkuehl2 I decided in the end that the KPL was just too heavy-weight for Lambda and it seems the Java cold-start time was just too much. I ended up doing it in Python using the asyncio kinesis-producer and got much better performance.

If you're stuck on using Java / KPL, I don't see a perfect way to do it. The KPL doesn't guarantee order, so you couldn't even simply keep track of where you are in the file (i.e. where you are in the file != where the KPL is in terms of successful PutRecord(s) calls)

jkuehl2 commented 5 years ago

For me switching to python is no option, we rely heavily on running Java, our usage pattern has a cold start every few hours and only taking 1-2 seconds (only lightweight DI frameworks).

The main problem is that KPL breaks idempotency of lambda executions this way, as there is no way to produce the same output for the same input if a different set of records is written to kinesis.

Usually this leads to a failing lambda where you have to retry the input event / batch which is fine, but this is not the case here.

I reply to my original post, when the lambda times out it should produce an error metric for that lambda, and then you just need to retry the whole batch. This is the standard way to handle it for lambdas. The consumer side (the consumer of the kinesis stream) should always be able to handle duplicates.

bisoldi commented 5 years ago

Not sure I follow... In general, one would only be able to expect idempotency with the exact same call (in your case, the same records - not different records). Also (and I'm trying to be argumentative - just trying to understand your use case), I don't follow how the KPL breaks idempotency any more than any other library that communicates with an AWS service.

But, I guess more to the heart of your comment, when the Lambda times out, it does produce an error metric, even when using the KPL within the Lambda. If you go to Cloudwatch, there is an Error metric underneath the Lambda function name, which will report the failure and (assuming you're using SQS to trigger the Lambda) you can then retry it. And if your consumer can handle duplicates, than I don't see any reason why you can't retry the job, though if it timed out the first time using the KPL (not due to cold start), good chance it will time out every time.

jkuehl2 commented 5 years ago

I meant the same input (which could be any event, e.g. from dynamoDb streams) but different output (writing with KPL to kinesis). the internal implementation addUserRecord() methods uses a ConcurrentHashmap of futures. I have not looked any further into how exactly the data is produced to Kinesis, but i suspect that is also unordered - and without explicitely calling the flushSync() method the recordset written to kinesis will be different for the same input for multiple tries.

We agree on how time outs should be handled, that is the clean standard pattern to handle lambda errors, which i have already implemented. In specific, if we run into timeout i post to SQS (extended w/s3) and run a splitter on the input, reducing the batch-size and re-run the splitted job.