confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.22k stars 1.11k forks source link

Can't deserialize Specific Avro records and reserialize as JSON #1092

Open MichaelDrogalis opened 5 years ago

MichaelDrogalis commented 5 years ago

I'm writing a basic Kafka Streams application that takes a topic serialized with Avro and Schema Registry, and produces a new topic with the same data serialized as JSON. It uses all of the serdes from this repository.

When I run the app, it throws an exception within Jackson, the underlying JSON library. This can be exhibited by looking at what the serializers are doing outside of KStreams in a smaller context --

I have a class that has been generated by Avro, named User, that has three fields: name, favoriteColor, and favoriteNumber. This is its schema:

{
  "namespace": "io.confluent.developer.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number",  "type": ["int", "null"]},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

When I try to convert an object of that type and serialize it to JSON using the mechanism that KafkaJsonDeserializer uses under the hood:

ObjectMapper o = new ObjectMapper();
User u = User.newBuilder().setName("michael").setFavoriteNumber(42).setFavoriteColor("green").build();
String json  = o.writeValueAsString(u);

It fails with:

org.codehaus.jackson.map.JsonMappingException: Not a map: {"type":"record","name":"User","namespace":"io.confluent.developer.avro","fields":[{"name":"name","type":{"type":"string","avro.java.string":"String"}},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":[{"type":"string","avro.java.string":"String"},"null"]}]} (through reference chain: io.confluent.developer.avro.User["schema"]->org.apache.avro.RecordSchema["valueType"])

After tracking down what's going on, it looks like Jackson is trying to serialize anything with a getter method. This includes the methods getSchema and getClassSchema, which is what is blowing up on the exception above. It's not entirely clear to me why those methods don't serialize cleanly, but even if they did, I wouldn't want them in the corresponding JSON output.

I worked around this, at least in my small reproducable example, by adding a serializer feature to ignore anything that doesn't have a setter:

ObjectMapper o = new ObjectMapper();
o.configure(SerializationConfig.Feature.REQUIRE_SETTERS_FOR_GETTERS, true);

User u = User.newBuilder().setName("michael").setFavoriteNumber(42).setFavoriteColor("green").build();
String json = o.writeValueAsString(u);

But now, json has produced the wrong output. Note the duplicate fields with camelCase and underscores:

{"name":"michael","favorite_number":42,"favorite_color":"green","favoriteNumber":42,"favoriteColor":"green"}

It's again not entirely clear to me why that is happening. But there is yet one more attempt at a workaround: ignoring everything except the fields of the inner class:

ObjectMapper o = new ObjectMapper();
o.configure(SerializationConfig.Feature.REQUIRE_SETTERS_FOR_GETTERS, true);

o.setVisibilityChecker(o.getSerializationConfig().getDefaultVisibilityChecker()
    .withFieldVisibility(JsonAutoDetect.Visibility.ANY)
    .withGetterVisibility(JsonAutoDetect.Visibility.NONE)
    .withSetterVisibility(JsonAutoDetect.Visibility.NONE)
    .withCreatorVisibility(JsonAutoDetect.Visibility.NONE));

User u = User.newBuilder().setName("michael").setFavoriteNumber(42).setFavoriteColor("green").build();
String json = o.writeValueAsString(u);

This piece of code does the right thing:

{"name":"michael","favorite_number":42,"favorite_color":"green"}

But here's the catch: the underlying ObjectMapper in KafkaJsonDeserializer can't be arbitrarly configured. So despite managing to wraggle it into the correct state with this example, it doesn't do the trick for anyone using both sets of Serdes out of the box.

OneCricketeer commented 5 years ago

I wonder if invoking Jackson directly is really necessary vs using Avro's JsonEncoder, which wraps some Jackson classes. That should work, as that is what avro-console-consumer does.

https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageFormatter.java#L184-L199

MichaelDrogalis commented 5 years ago

It seems like there are two things here:

1 - either Jackson or Avro has a bug that prevents Avro POJO objects from being serialized to JSON cleanly. 2 - even if (1) was solved, it would still serialize the schema alongside the relevant fields, which is generally not desirable. It seems like you would want a configuration option to pass to the JSON serializer to ignore getters and setters and only pick up fields.

OneCricketeer commented 5 years ago

I mean, this "works", although, does show the field type along with the value. Primarily because the fields are nullable

        Schema schema = SchemaBuilder.builder("io.confluent.developer.avro")
                .record("User")
                .fields()
                .requiredString("name")
                .optionalInt("favorite_number")
                .optionalString("favorite_color")
                .endRecord();

        GenericData.Record record = new GenericRecordBuilder(schema)
                .set("name", "cricket007")
                .set("favorite_number", 42)
                .set("favorite_color", "green")
                .build();

        try {
            JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, System.out);
            DatumWriter<Object> writer = new GenericDatumWriter<>(schema);

            writer.write(record, encoder);

            encoder.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

Outputs {"name":"cricket007","favorite_number":{"int":42},"favorite_color":{"string":"green"}}


For consistency, the schema that was generated looks like so

{
  "type" : "record",
  "name" : "User",
  "namespace" : "io.confluent.developer.avro",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "favorite_number",
    "type" : [ "null", "int" ],
    "default" : null
  }, {
    "name" : "favorite_color",
    "type" : [ "null", "string" ],
    "default" : null
  } ]
}
MichaelDrogalis commented 5 years ago

I think we need to get to the bottom about why Jackson's Object Mapper isn't able to serialize specific records. It's tempting to just swap out the serializer implementation, but it's a bit of a blind change.

OneCricketeer commented 5 years ago

SpecificDatumWriter works similarly. And if the goal is to both serialize and deserialize, then I've included that as well

Schema schema = User.SCHEMA$;
User user = User.newBuilder()
        .setName("cricket007")
        .setFavoriteNumber(42)
        .setFavoriteColor("green")
        .build();

// serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos);
    DatumWriter<User> writer = new SpecificDatumWriter<>(User.class);

    writer.write(user, encoder);

    encoder.flush();
    baos.close();
} catch (IOException e) {
    e.printStackTrace();
}

// deserialize
InputStream is = new ByteArrayInputStream(baos.toByteArray()); 
try {
    JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, is);
    DatumReader<User> reader = new SpecificDatumReader<>(User.class);

    User read = reader.read(null, decoder);

    org.junit.jupiter.api.Assertions.assertEquals(user, read);  // true
} catch (IOException e) {
    e.printStackTrace();
}

My main point here was that instantiating your own ObjectMapper likely isn't necessary, especially because it inspects the constructor, getters, and setters, rather than streaming the writes in JsonGenerator and iterating the get(int field) method in the order the fields are declared by the schema.

MichaelDrogalis commented 5 years ago

I think we're misunderstanding each other a bit. The reason I'm focusing on ObjectMapper is because that is what KafkaJsonSerializer uses, which is the class this repository exposes to support JSON de/serialization. It's immaterial to me, as a user, how it's serialized. But since there is a bug, I'm cracking open the internals to examine it more closely.

MichaelDrogalis commented 5 years ago

I dug around a bit more. This is failing because reflection is calling get* methods as deep as it can on the object, which eventually bottoms out here.

cgraving commented 5 years ago

Take a look here. If you add a mixin you can tell Jackson to ignore the getSchema. It works pretty slick. I wish I didn't have to, but it is what it is sometimes.

https://stackoverflow.com/questions/7421474/how-can-i-tell-jackson-to-ignore-a-property-for-which-i-dont-have-control-over

I am not using the KafkaJsonSerializer, though. Using my own Jackson Object Mapper.

MichaelDrogalis commented 5 years ago

Yeah, I think we could either have a new serializer that is aware of these pitfalls, or adjust the current one to be more mindful of Avro. I'd lean to the latter.

OneCricketeer commented 4 years ago

@matej2 it would be great if you provided a SSCCE of your own code and relevant stacktrace.

Regarding the rest of what you said, there's no HATEOAS compatibility mode for the Registry API, so that in itself you'd want to make a separate feature request for.

Similarly, it's not clear what you mean by "links in avro schema"

As per the Avro spec, "reader" and "writer" schemas are allowed to differ and both are required when performing (de)serialization against Avro. The only thing that the Registry provides is a place to look up an embedded ID in the binary format of the message to get the schema that the message had been produced with (which could be ignored and only acquire the latest one for the relevant subject&strategy)

matej2 commented 4 years ago

Oh nevermind, i fixed it

saurabh256 commented 3 years ago

I am not really sure but it seems similar issue apears on DeadLetterPublishingRecoverer. If the consumer is using AVRO but on failure trying to send to dead letter queue in Json (using Json Serialization).

consumer config has io.confluent.kafka.serializers.KafkaAvroDeserializer and publisher config for dead letter publishing is using org.springframework.kafka.support.serializer.JsonSerializer

my versions are spring-boot: 2.3.5.RELEASE kafka-avro-serializer: 5.3.2 org.apache.avro:avro - 1.10.0

Let me know if you believe this is correct and need more information from me.