quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.68k stars 2.65k forks source link

KinesisEvent object cannot be parsed in AWS Lambda Function #16322

Open debae opened 3 years ago

debae commented 3 years ago

Describe the bug

If you use the quarkus-lambda extension you can consume all AWS events by simply implementing the interface and selecting the correct type of event. In case of Kinesis :

public class MyKinesisLambda implements RequestHandler<KinesisEvent, Void> {

    @Override
    public Void handleRequest(KinesisEvent input, Context context) {
        // logic here
    }
}

The configured jackson ObjectMapper however cannot marshal the input event correctly.

Expected behavior

The KinesisEvent json can be marshalled correctly and the RequestHandler gets invoked.

Actual behavior

An exception is thrown inside the LambdaProcessor : 

Cannot deserialize instance of `java.util.Date` out of VALUE_NUMBER_FLOAT token
 at [Source: (ByteArrayInputStream); line: 1, column: 3772] (through reference chain: com.amazonaws.services.lambda.runtime.events.KinesisEvent["Records"]->java.util.ArrayList[0]->com.amazonaws.services.lambda.runtime.events.KinesisEvent$KinesisEventRecord["kinesis"]->com.amazonaws.services.lambda.runtime.events.KinesisEvent$Record["approximateArrivalTimestamp"]): com.fasterxml.jackson.databind.exc.MismatchedInputException
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.Date` out of VALUE_NUMBER_FLOAT token
 at [Source: (ByteArrayInputStream); line: 1, column: 3772] (through reference chain: com.amazonaws.services.lambda.runtime.events.KinesisEvent["Records"]->java.util.ArrayList[0]->com.amazonaws.services.lambda.runtime.events.KinesisEvent$KinesisEventRecord["kinesis"]->com.amazonaws.services.lambda.runtime.events.KinesisEvent$Record["approximateArrivalTimestamp"])
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1468)
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1242)
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1148)
    at com.fasterxml.jackson.databind.deser.std.StdDeserializer._parseDate(StdDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.DateDeserializers$DateBasedDeserializer._parseDate(DateDeserializers.java:200)
    at com.fasterxml.jackson.databind.deser.std.DateDeserializers$DateDeserializer.deserialize(DateDeserializers.java:290)
    at com.fasterxml.jackson.databind.deser.std.DateDeserializers$DateDeserializer.deserialize(DateDeserializers.java:273)
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:293)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:156)
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:293)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:156)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:290)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:249)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:26)
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:293)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:156)
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2079)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1453)
    at io.quarkus.amazon.lambda.runtime.JacksonInputReader.readValue(JacksonInputReader.java:17)
    at io.quarkus.amazon.lambda.runtime.AmazonLambdaRecorder.handle(AmazonLambdaRecorder.java:69)
    at io.quarkus.amazon.lambda.runtime.QuarkusStreamHandler.handleRequest(QuarkusStreamHandler.java:58)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)

To Reproduce

Take an example input generated by sam local : sam local generate-event kinesis get-records Which gives you the following event:

{
  "Records": [
    {
      "kinesis": {
        "partitionKey": "partitionKey-03",
        "kinesisSchemaVersion": "1.0",
        "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
        "sequenceNumber": "49545115243490985018280067714973144582180062593244200961",
        "approximateArrivalTimestamp": 1428537600.0
      },
      "eventSource": "aws:kinesis",
      "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
      "invokeIdentityArn": "arn:aws:iam::EXAMPLE",
      "eventVersion": "1.0",
      "eventName": "aws:kinesis:record",
      "eventSourceARN": "arn:aws:kinesis:EXAMPLE",
      "awsRegion": "us-east-1"
    }
  ]
}

Environment (please complete the following information):

Output of uname -a or ver

Darwin APRXJGH6D5D5F9 18.7.0 Darwin Kernel Version 18.7.0: Mon Aug 31 20:53:32 PDT 2020; root:xnu-4903.278.44~1/RELEASE_X86_64 x86_64

Output of java -version

openjdk version "11.0.6" 2020-01-14

Quarkus version or git rev

1.11.3.Final

Additional context

(Add any other context about the problem here.)

quarkus-bot[bot] commented 3 years ago

/cc @matejvasek, @patriot1burke

debae commented 3 years ago

In order to get it to work we need to register an ObjectMapperCustomizer ( which is used by quarkus-jackson ) that allows a float to be converted to a Date :

@Singleton
    public ObjectMapperCustomizer kinesisObjectMapperCustomiser() {
        return new ObjectMapperCustomizer() {
            @Override
            public void customize(ObjectMapper objectMapper) {
                SimpleModule kinesisModule = new SimpleModule();
                kinesisModule.addDeserializer(Date.class, new JsonDeserializer<>() {
                    @Override
                    public Date deserialize(JsonParser jsonParser, DeserializationContext ctxt) throws IOException {
                        Calendar calendar = Calendar.getInstance();
                        calendar.setTimeInMillis(jsonParser.getValueAsLong());
                        return calendar.getTime();
                    }
                });
                objectMapper.registerModule(kinesisModule);
            }
        };
    }
debae commented 3 years ago

The same applies when you are using the events from DynamoDB Streams

geoand commented 3 weeks ago

Is this still an issue?

debae commented 3 weeks ago

Yes it is, we still need to add this configuration. The version of quarkus we are using now is 3.13.2

geoand commented 3 weeks ago

Thanks for the quick feedback!