Azure / azure-schema-registry-for-kafka

Kafka support for Azure Schema Registry.
https://aka.ms/schemaregistry
MIT License
13 stars 20 forks source link

how to pass schema version as kafka property #15

Open KrupalVanukuri opened 3 years ago

KrupalVanukuri commented 3 years ago

If schema has multiple versions, as a producer/consumer how to pass/refer particular version from kafka properties or kafka configuration ? Can I use something like below ?

consumer: properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "https://myeventhubNamespace:443/$schemagroups/mygroupName/schemas/mySchemaName/versions/2");

producer:

 properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "https://myeventhubNamespace:443/$schemagroups/mygroupName/schemas/mySchemaName/versions/2");

full method:

public KafkaTemplate<Object, Object> getKafkaTemplate() { final Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer); properties.put("security.protocol", securityProtocol); properties.put("sasl.mechanism", saslMechanism); properties.put("sasl.jaas.config", saslJaasConfig); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer.class);

    properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "https://myeventhubNamespace:443/                                                  $schemagroups/mygroupName/schemas/mySchemaName/versions/2");
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential); 
    properties.put(KafkaAvroSerializerConfig.SCHEMA_GROUP_CONFIG,"flightEventAvroSchemaGroup");
    final KafkaTemplate<Object, Object> template = new KafkaTemplate<Object, Object>(new DefaultKafkaProducerFactory<>(properties));
    return template;
}

Is this recommended? And does port no 443 will change on deleting and creating the EH namespace with same name ?

KrupalVanukuri commented 3 years ago

Any suggestions please ?

KrupalVanukuri commented 3 years ago

Any suggestions on how schema evolve works?

Created a schema group with Forward compatibility and uploaded a schema with initial version 1 in Azure portal. Now both producer and consumer applications are working fine. Now I want to add a new filed to schema, So I did edit the schema in portal and added the default field. Version got increment to 2 with new GUID. Now updated the producer application with version 2 schema and sent a message to EH. Message appended with version 2 GUID. When consumer application received this new message, its failed because consumer using version 1 and received message has version 2 GUID. But as per Forward compatibility, consumer should process this new message successfully by ignoring the default field. This feature is very important for our applications to use schema registry. Can you please provide any help on this? And any documentation and samples to refer ?

Version 2: { "type": "record", "namespace": "com.test", "name": "Employee", "fields": [ { "name": "firstName", "type": "string" }, { "name": "middleName", "type": [ "null", "string" ], "default" : null

    },
    {
        "name": "age",
        "type": "int"
    }
]

}

Version 1:

{ "type": "record", "namespace": "com.test", "name": "Employee", "fields": [ { "name": "firstName", "type": "string" }, { "name": "age", "type": "int" } ] }

hmlam commented 3 years ago

The TL;DR answer to your question is that typically it is not recommended to pass schema version as Kafka properties, but rather, you should follow the typical Avro model and generate a versioned java class of your Avro schema, and let the internal mechanics of the schema registry to register the schema for you.

========================= Since you are in the Kafka space, the integration with schema is typically done through code generation, rather than through portal experience (The portal experience gives you a good UI tool to look at a schema after it is written). I would suggest you run through our sample at https://github.com/Azure/azure-schema-registry-for-kafka/tree/master/java/avro/samples to give you a better understanding of how the integration is typically done.

In particular, in Kafka, typically produce/fetch using a schema is more or less about creating Avro classes for the given schema, rather than pointing a config to a schema endpoint. Your typical flow should be something like the following:

  1. Define your schema (let say AvroUser.avsc)
  2. Generate the java classes using the schema (most people use avro-maven-plugin to generate the java classes in their project)
  3. After step 2 you should now have a java class that encapsulate your schema (e.g. AvroUser.java), which you can use that class in your code in both producer and consumer to write your data using the schema - again, you should refer to our sample for details.
  4. Note: specific to Event Hub Schema Registry, you should always pre-create the schema group that you specified in schema.group config in portal before running your code/sample.

As for schema evolution, most people typically go through this kind of life cycle.

  1. Let say you start with AvroUser.avsc. Producer and Consumer uses that schema (the generated class) to produce records - since you configured to use our schema registry, the producer and consumer will register that schema into the registry using the namespace and name you specified in your avsc.
  2. At some point you want to create version 2 of AvroUser.avsc - you make modification to the schema, but keep the same namesapce and name, and let say you now have a file called AvroUserV2.avsc.
  3. When you compile your code, avro-maven-plugin will generate AvroUserV2.java which now you will use in your producer/consumer code. When you run your code, just like in step 1, the library will automatically register that new schema with the registry - because the namespace and name are the same, it will now know this is the new version.

Note that since each data you produce and fetch are all embeded with the schema id, so data produce/fetch in step (1) will have a different schema id than data produce/fetch in step (3) - This is how the data can still be de/serialized even though schema has evolved. Also note that your generated java files (AvroUser.java and AvroUserV2.java) is basically the code representation of your schema evolution.

KrupalVanukuri commented 3 years ago

Hi Eric,

  I tired below process and getting error. Please let us know if you can available for 30 min to walk through our applications.

Steps:

  1. Created schema group with Forward compatibility in portal

  2. Defined the schema v1 (with only 2 fields) at both producer and consumer applications and generated classes using avro-maven-plugin.

  3. Produced the message, schema is auto registered in schema registry.

  4. Consumer received the message successfully.

  5. Added new field called middleName in schema (v2) and generated the classes at only producer application.

  6. No changes made at consumer application, So consumer is still having schema v1 and corresponding generated classes.

  7. Produced the message with schema v2 and schema v2 got auto registered in schema registry.

  8. Now Consumer application throwing error because newly added filed not found in its corresponding avro class to map the field. Avro classes are SpecificRecord type and SpecifiRecord look for order/position of fields.

2021.06.09 04:12:39,600 org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 ERROR com.azure.core.util.logging.ClientLogger.performLogging(ClientLogger.java:350) - Error deserializing Avro message. java.lang.IllegalStateException: Error deserializing Avro message. at com.azure.data.schemaregistry.avro.AvroSchemaRegistryUtils.decode(AvroSchemaRegistryUtils.java:134) ~[azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?] at com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer.lambda$deserializeAsync$1(SchemaRegistryAvroSerializer.java:101) ~[azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?] at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:169) [reactor-core-3.4.0.jar!/:3.4.0] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) [reactor-core-3.4.0.jar!/:3.4.0] at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) [reactor-core-3.4.0.jar!/:3.4.0] at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar!/:3.4.0] at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199) [reactor-core-3.4.0.jar!/:3.4.0] at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) [reactor-core-3.4.0.jar!/:3.4.0] at reactor.core.publisher.Mono.subscribe(Mono.java:3972) [reactor-core-3.4.0.jar!/:3.4.0] at reactor.core.publisher.Mono.block(Mono.java:1678) [reactor-core-3.4.0.jar!/:3.4.0] at com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer.deserialize(SchemaRegistryAvroSerializer.java:50) [azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?] at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:66) [azure-schemaregistry-kafka-avro-1.0.0-beta.4.jar!/:?] at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) [kafka-clients-2.6.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) [kafka-clients-2.6.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) [kafka-clients-2.6.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) [kafka-clients-2.6.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) [kafka-clients-2.6.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) [kafka-clients-2.6.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) [kafka-clients-2.6.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) [kafka-clients-2.6.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) [kafka-clients-2.6.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) [kafka-clients-2.6.0.jar!/:?] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1238) [spring-kafka-2.6.3.jar!/:2.6.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133) [spring-kafka-2.6.3.jar!/:2.6.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1054) [spring-kafka-2.6.3.jar!/:2.6.3] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?] at java.util.concurrent.FutureTask.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.IndexOutOfBoundsException: Invalid index: 2 at com.aa.opshub.test.Employee.put(Employee.java:111) ~[classes!/:0.0.1-SNAPSHOT] at org.apache.avro.generic.GenericData.setField(GenericData.java:816) ~[avro-1.9.2.jar!/:1.9.2] at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.2.jar!/:1.9.2] at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[avro-1.9.2.jar!/:1.9.2] at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.2.jar!/:1.9.2] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[avro-1.9.2.jar!/:1.9.2] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[avro-1.9.2.jar!/:1.9.2] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.9.2.jar!/:1.9.2] at com.azure.data.schemaregistry.avro.AvroSchemaRegistryUtils.decode(AvroSchemaRegistryUtils.java:131) ~[azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?] ... 27 more

Consumer Config:

   public Map<String, Object> configs() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
    properties.put("security.protocol", securityProtocol);
    properties.put("sasl.mechanism", saslMechanism);
    properties.put("sasl.jaas.config", saslJaasConfig);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
         com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class);
    properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "registryurl");
    properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
    properties.put(KafkaAvroDeserializerConfig.AVRO_SPECIFIC_READER_CONFIG, true);
    return properties;
}

 Producer: Schema v2:
{
   "type": "record",
   "namespace": "com.aa.opshub.test",
   "name": "Employee",
   "fields": [
    {
        "name": "firstName",
        "type": "string"
    },
    {
        "name": "age",
        "type": "int"
    },
    {
        "name": "middleName",
        "type": "string"
    }
]

}

Consumer: Schema v1

{ "type" : "record", "namespace" : "com.aa.opshub.test", "name" : "Employee", "fields" : [ { "name" : "firstName" , "type" : "string" }, { "name" : "age", "type" : "int" } ] }

Method throwing error in Employee.java (Avro class)

  @SuppressWarnings(value="unchecked")

public void put(int field$, java.lang.Object value$) { switch (field$) { case 0: firstName = (java.lang.CharSequence)value$; break; case 1: age = (java.lang.Integer)value$; break; default: throw new IndexOutOfBoundsException("Invalid index: " + field$); } }

Thanks Krupal

From: Eric Lam (MSFT) @.> Sent: Tuesday, June 8, 2021 7:22 PM To: Azure/azure-schema-registry-for-kafka @.> Cc: Vanukuri, Krupal - Vendor @.>; Author @.> Subject: Re: [Azure/azure-schema-registry-for-kafka] how to pass schema version as kafka property (#15)

Since you are in the Kafka space, the integration with schema is typically done through code generation, rather than through portal experience (The portal experience gives you a good UI tool to look at a schema after it is written). I would suggest you run through our sample at https://github.com/Azure/azure-schema-registry-for-kafka/tree/master/java/avro/samples to give you a better understanding of how the integration is typically done.

In particular, in Kafka, typically produce/fetch using a schema is more or less about creating Avro classes for the given schema, rather than pointing a config to a schema endpoint. Your typical flow should be something like the following:

  1. Define your schema (let say AvroUser.avsc)
  2. Generate the java classes using the schema (most people use avro-maven-pluginhttps://mvnrepository.com/artifact/org.apache.avro/avro-maven-plugin to generate the java classes in their project)
  3. After step 2 you should now have a java class that encapsulate your schema (e.g. AvroUser.java), which you can use that class in your code in both producer and consumer to write your data using the schema - again, you should refer to our samplehttps://github.com/Azure/azure-schema-registry-for-kafka/tree/master/java/avro/samples for details.
  4. Note: specific to Event Hub Schema Registry, you should always pre-create the schema group that you specified in schema.group config in portal before running your code/sample.

As for schema evolution, most people typically go through this kind of life cycle.

  1. Let say you start with AvroUser.avsc. Producer and Consumer uses that schema (the generated class) to produce records - since you configured to use our schema registry, the producer and consumer will register that schema into the registry using the namespace and name you specified in your avsc.
  2. At some point you want to create version 2 of AvroUser.avsc - you make modification to the schema, but keep the same namesapce and name, and let say you now have a file called AvroUserV2.avsc.
  3. When you compile your code, avro-maven-plugin will generate AvroUserV2.java which now you will use in your producer/consumer code. When you run your code, just like in step 1, the library will automatically register that new schema with the registry - because the namespace and name are the same, it will now know this is the new version.

Note that since each data you produce and fetch are all embeded with the schema id, so data produce/fetch in step (1) will have a different schema id than data produce/fetch in step (3) - This is how the data can still be de/serialized even though schema has evolved.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHubhttps://github.com/Azure/azure-schema-registry-for-kafka/issues/15#issuecomment-857279431, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AT7S3HEEHZRC6ANDIHOW4EDTR2XZRANCNFSM45JGF24Q.

KrupalVanukuri commented 3 years ago

When I look at the below link, I see DatumReader created based on writer schema. I believe to deserialize the payload/message, DatatumReader should consider ReaderSchema.
https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/AvroSchemaRegistryUtils.java

Method Name: getDatumReader(Schema writerSchema)

hmlam commented 3 years ago

Your repro looks correct to me as a forward compatibility sample. I would suggest filing a github issues over at the azure-sdk-for-java repo with your forward compatibility repro step, so that the SDK team can track this and fix it as needed.

KrupalVanukuri commented 2 years ago

@hmlam any updates on this azure schema registry for kafka with Forward compatibility ?