Apicurio / apicurio-registry

An API/Schema registry - stores APIs and Schemas.
https://www.apicur.io/registry/
Apache License 2.0
584 stars 259 forks source link

Avro Producer considers "avro.java.string" as part of the schema comparations #1592

Closed rmarting closed 1 year ago

rmarting commented 3 years ago

Found ArtifactNotFoundException testing a Avro producer integrated with Apicurio registry with the following versions:

Given the following Avro schema definition registered into the Apicurio Registry:

{
  "type": "record",
  "name": "AggregateMetric",
  "namespace": "com.redhat.banking.eda.model.events",
  "doc": "Aggregated Metric with important information.",
  "fields": [
    {
      "name": "name",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": "Metric Name."
    }
  ]
}

A producer application downloads it using the apicurio-registry-maven-plugin and generates the Java classes with the avro-maven-plugin with the following definition:

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schemas</sourceDirectory>
                            <includes>
                                <include>**/*.avsc</include>
                            </includes>
                            <outputDirectory>${project.build.directory}/generated-sources/schemas</outputDirectory>
                            <stringType>String</stringType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

When the producer tries to publish a new record in Kafka using the next configuration in the application.properties file:

# Aggregate metrics Generator
%dev.mp.messaging.outgoing.generated-aggregate-metrics.connector=smallrye-kafka
%dev.mp.messaging.outgoing.generated-aggregate-metrics.topic=eda.events.aggregate-metrics
%dev.mp.messaging.outgoing.generated-aggregate-metrics.acks=all
%dev.mp.messaging.outgoing.generated-aggregate-metrics.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
%dev.mp.messaging.outgoing.generated-aggregate-metrics.value.serializer=io.apicurio.registry.serde.avro.AvroKafkaSerializer
%dev.mp.messaging.outgoing.generated-aggregate-metrics.apicurio.registry.headers.enabled=true
%dev.mp.messaging.outgoing.generated-aggregate-metrics.apicurio.registry.auto-register=false
%dev.mp.messaging.outgoing.generated-aggregate-metrics.apicurio.registry.avro.encoding=JSON

I got the next exception:

2021-06-17 15:19:47,213 ERROR [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-22) SRMSG18206: Unable to write to Kafka from channel generated-aggregate-metrics (topic: eda.events.aggregate-metrics): io.apicurio.registry.rest.client.exception.ArtifactNotFoundException: No artifact with ID 'AggregateMetric' in group 'com.redhat.banking.eda.model.events' was found.

However if the producer is configured with:

%dev.mp.messaging.outgoing.generated-aggregate-metrics.apicurio.registry.auto-register=true

Then the application works successfully, but in the Service Registry I have two versions of the same artifact. The second version, registered automatically by the producer, the schema has the following structure:

{
  "type": "record",
  "name": "AggregateMetric",
  "namespace": "com.redhat.banking.eda.model.events",
  "doc": "Aggregated Metric with important information.",
  "fields": [
    {
      "name": "name",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": "Metric Name."
    },
  ]
}

So as the Java classes created include the new avro.java.string type in the schema definition, Apicurio identifies as a different one.

It seems that there are some approaches to resolve it, basically aligning the Avro schema with the final schema created by the avro-maven-plugin, but I don't know if it is a best approach.

Should Apicurio add some extra logic to skip in the schema comparation for this kind of "extra" properties?

It seems that there are some issues related with this:

EricWittmann commented 3 years ago

First of all, terrific bug report. Very detailed. :)

From what I can tell from this explanation, I think the issue is simply that your producer is not configured to find the schema that is pre-registered in the registry. Clearly the serializer is looking for the schema at com.redhat.banking.eda.model.events/AggregateMetric (of the form groupId/artifactId). Is the artifact actually registered at that location? When you enable auto-register=true do you actually get two versions of the same artifact, or do you get two artifacts?

I'm actually not sure why this is resolving to that groupId and artifactId - I don't see where that is configured in your properties, which means that there is some default behavior in the SerDe layer that is figuring that out. If that is not matching with what's actually in the registry, then you can either register the artifact at different coordinates, or you can configure your producer with the artifactId and groupId you did use when registering the schema.

@famartinrh What do you think about this?

famarting commented 3 years ago

First of all, same concern as Eric, I don't see how you can get the error No artifact with ID 'AggregateMetric' in group 'com.redhat.banking.eda.model.events' was found. , I think you may have configured apicurio.registry.artifact-resolver-strategy: io.apicurio.registry.serde.avro.strategy.RecordIdStrategy but didn't show us? otherwise I'll be very confused.

Also, when you configure auto-register=true it could make sense the result you are getting, try setting this property https://github.com/Apicurio/apicurio-registry/blob/master/serdes/serde-common/src/main/java/io/apicurio/registry/serde/SerdeConfig.java#L67 to RETURN

rmarting commented 3 years ago

To demostrate it I updated one of my repositories with the latest version of Strimizi 0.23.0 and Apicurio Registry 2.0.1.

This branch has an updated version of a Quarkus application to produce and consume Avro-schema messages from a Kafka cluster. You could deploy easily into minikube or CodeReady Containers.

If you follow the README.md you could deploy the different components (Kafka, Apicurio) and Quarkus application. Before to produce the first message into Kafka, the Avro schema is already registered in Apicurio Registry as version "1", however when you execute the command to produce the first message, the Quarkus application fails with the following exception:

ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (executor-thread-1) HTTP Request to /producer/kafka/messages failed, error id: 437f34ce-ad68-4367-b92a-ab0d1c737a1a-1: org.jboss.resteasy.spi.UnhandledException: io.apicurio.registry.rest.client.exception.ArtifactNotFoundException: No artifact with ID 'Message' in group 'io.jromanmartin.kafka.schema.avro' was found.

This is because the KafkaConfig class disables the auto register artifact feature Line 116

However if you change that value to true, redeploy the application and send a new message, then it works!

However there is a side efect in Apicurio Registry: the artifact has two versions.

Version 1 available at http://service-registry.amq-streams-demo.apps-crc.testing/ui/artifacts/io.jromanmartin.kafka.schema.avro/Message/versions/1 with the following content:

{
  "name": "Message",
  "namespace": "io.jromanmartin.kafka.schema.avro",
  "type": "record",
  "doc": "Schema for a Message.",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "doc": "Message timestamp."
    },
    {
      "name": "content",
      "type": "string",
      "doc": "Message content."
    }
  ]
}

Version 2 available at http://service-registry.amq-streams-demo.apps-crc.testing/ui/artifacts/io.jromanmartin.kafka.schema.avro/Message/versions/2 with the following content:

{
  "type": "record",
  "name": "Message",
  "namespace": "io.jromanmartin.kafka.schema.avro",
  "doc": "Schema for a Message.",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "doc": "Message timestamp."
    },
    {
      "name": "content",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": "Message content."
    }
  ]
}

As you can see, there is a new version auto-registered by apicurio client because the artifact was not found. Basically it is the same schema but the second one includes the attribute avro.java.string.

IMHO it is a wrong implementation done by the avro-maven-plugin because the classes generated don't include the EXACTLY schema definition from the avsc file. As it is described here. However I don't know if Apicurio Registry could manage it in some way, maybe with more exception details (e.g: Adding that there is not a version matching with the schema, as the artifact already exists in Apicurio), or maybe adding some extra comparation process to skip the avro.java.string property.

At the moment, I should suggest to use the schema generated by the Avro Maven Plugin, but I don't like so much because it requires to use a language-specific schema instead of the original one, but at least the application does not register new versions in Apicurio.

WDYT?

EricWittmann commented 3 years ago

This is an excellent update with information we need to diagnose the issue, thanks @rmarting - we may need a little bit of time to dig into this. I see that you are using RecordIdStrategy which explains part of what we're seeing here. But I would still need to dig into why it's not using the version you pre-register. Maybe @famartinrh knows offhand, but I would need to play with it.

EricWittmann commented 3 years ago

OK @rmarting I had a chance to poke at this this morning. The reason this is failing is because the default schema resolver couldn't find the pre-registered schema in the registry. It couldn't find it because there are two ways for the serializer/producer to find the schema:

1) By coordinates (groupId + artifactId + version) 2) By content (groupId + artifactId + shahash(content))

Based on your configuration, the resolver isn't even trying (1) because you haven't specified what version of the artifact you want to use. And of course (2) is failing for the reasons you have already identified (the content of the schema is different in the registry vs. what's in the code).

There are two ways to get this working:

1) Tell the resolver to use the latest version 2) Tell the resolver exactly which version you want

I got your example working locally using (1) by adding this to your KafkaConfig.java class:

props.putIfAbsent(SerdeConfig.FIND_LATEST_ARTIFACT, true);

But you could alternatively get it working using (2) by adding this instead:

props.putIfAbsent(SerdeConfig.EXPLICIT_ARTIFACT_VERSION, "1");

You'll need to decide whether either of these approaches makes sense. Using the latest means that any time you register a new version of the schema, your producer will use that one. But setting an explicit version means that ever time you want a new version of your schema you may need to redeploy your producer. Of course, you could set the explicit version dynamically using some custom code.

Also note that it is possible to set a custom version # when registering your artifact. So e.g. you could set the version to "1.0" if you wanted, so that you don't get an automatic "1" for the first version, "2" for the second version, etc. In case that's useful.

rmarting commented 3 years ago

Thanks @EricWittmann for your complex summary of the root cause of this issue.

After that I am thinking that the best approach is to set up the version when the artifact is registered, and the producer and consumer apps use the right version value. With this approach the applications should work isolated if we are registering new versions of the artifact (automatically or not).

However I was testing to manage my approach and it seems that the KafkaProducer is not using the declared versions in my set up. As IMHO is a different bug I created this one to track it.

At the same time I was testing all different combinations between both properties with the next summary:

Thoughts?

EricWittmann commented 3 years ago

Yes I think disabling auto-register is definitely the way to go. And you may be right about there being a bug with EXPLICIT_ARTIFACT_VERSION. I've set aside some time tomorrow to look into it and get back to you.

Note that you only need to worry about the Serializer configuration. The Deserializer always uses the GlobalId of the specific schema in the registry, which is included in the Kafka message by the Serializer.

carlesarnal commented 1 year ago

Closing as this has been resolved.

ivanpedersen commented 11 months ago

The Confluent serializer have the option to remove Java-specific properties.

The avro-maven-plugin generated code adds Java-specific properties such as "avro.java.string":"String", which may prevent schema evolution. You can override this by setting avro.remove.java.properties=true in the Avro serializer configurations.

https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-serializer

Is there any particular reason why something similar can't be added in the Apicurio serializer? Currently I'm specifying EXPLICIT_ARTIFACT_VERSION but it would be nice to not have to and simply relying on RecordIdStrategy instead.