AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
231 stars 76 forks source link

Detect different schema versions in batch #336

Closed talperetz1 closed 10 months ago

talperetz1 commented 1 year ago

Hi, I have a question regarding using abris. I have a spark stream that read from kafka topic parse the avro message using Abris and then write it. I wanted to know if in each micro-batch when I read from kafka I can detect if I have two messages with the same schema but different version. For example lets says I added an optional filed to the schema and release newer version and now I have in the topic some "old" messages with the old schema and new messages with the new schema that consumed in the same micro batch

cerveada commented 1 year ago

Abris is able to detect the version of writer schema and change the values according to newer schema version specified (reader schema). Like using default value for new field in the newer schema. You define the reader schema when you call Abris.

talperetz1 commented 1 year ago

Hi @cerveada Thanks for the response, I didnt quite understood how its work. I set my abris config and as i understood it happened only once when the query is first started

val abrisConfigBase = AbrisConfig.fromConfluentAvro .downloadReaderSchemaByLatestVersion .andRecordNameStrategy(schemaName,"") .usingSchemaRegistry(schemaRegistryUrl)

Now lets say that when I started the query and set the abrisConfig the latest version was 1 ( with two columns x,y) and then I am read from kafka spark.readStream.......

After a while my schema as been evolved and now its have a new optinal column (x,y,z) So when I am reading the next batch I have some messages with the new version (x,y,z) and some message that still in kafka and yet consumed with the older version (x,y). Now when I am using Abris to create a dataframe I am using my abris config which when it created the latest version was 1 (x,y) so the new messages which contain column z will miss this column. And its ok I understand the the spark must be restarted for the newer schema will be downloaded. But my question is when I am reading from Kafka and receive the value column with Avro binary format how I can know that I receive some messages with schema version A and some messages with schema version B.

cerveada commented 1 year ago

Well, Abris detects the wrtiter schema Id here: https://github.com/AbsaOSS/ABRiS/blob/7dea3faba2e1397cbed6b6e66fb3138b7a623274/src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala#L144

There is no mechanism to give this information to the user. It just tries to convert the data to reader schema and it will be successful if these two schemas are compatible.

Some ideas:

talperetz1 commented 1 year ago

Sorry but what is exactly this buffer?

cerveada commented 1 year ago

Look into the linked source code to see more.

The schema id is part of the confluent avro format https://github.com/AbsaOSS/ABRiS#confluent-avro-format