AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
230 stars 75 forks source link

Make schema converter configurable #268

Closed kevinwallimann closed 2 years ago

kevinwallimann commented 2 years ago

Background

Currently, SchemaConverters is used in AvroDataToCatalyst to convert an Avro schema to a Spark data type.

I have a workload that reads from a Kafka topic, adds / transforms some columns, and then writes that data back to another Kafka topic. The schema for the target Kafka topic is auto-generated (based on the Spark schema) and is registered outside ABRiS. The source schema contains default values that should be kept for the target schema, but with the SchemaConverters implementation, these default values are lost. Furthermore, some type conversions from Avro to Spark are not bijective, e.g. Avro types BYTES and FIXED are both converted to DecimalType (if their Avro logical type is Decimal) or both to BinaryType (if they have no Avro logical type). Without the original logical type, it is impossible to decide whether DecimalType should be converted back to BYTES or FIXED. The SchemaConverters implementation chooses FIXED. This is undesirable if the source schema needs to be preserved for whatever reason. The same problem exists for Spark type TimestampType which is always converted to Avro type Long / TimestampMicros, but it could as well be Long / TimestampMillis.

One solution to these problems is to store the extra information from the Avro schema to column metadata in Spark (StructField metadata, to be precise). However, to do this, a custom schema conversion is required.

Feature

The SchemaConverters implementation may work for most use-cases, but there are scenarios in which a user wishes to convert an Avro schema in a custom way, e.g. writing default values into Spark metadata. Thus, I suggest making the schema conversion configurable, where a user can optionally pass a function Schema => DataType to the From Avro Config. The default is (schema: Schema) => SchemaConverters.toSqlType(schema).dataType

Other

This functionality should be backported to ABRiS v5 as well.

cerveada commented 2 years ago

This may potentially cause issue with serializability of the converter function. The config map used until now uses only serializable types, and that guarantees everything will be serializable at the end. With the function, the responsibility will be shifted to the user.

The testing of this is harder because spark running locally doesn't try to serialize the objects, but the one on the cluster does.

It seems functions can be serializable in Scala, but only when certain conditions are met https://stackoverflow.com/questions/47806670/how-to-serialize-functions-in-scala

kevinwallimann commented 2 years ago

Ok, I understand. In that case, I can create a trait around the function and only pass the fully qualified name of the custom implementation to the config map. Abris would load it then using the ServiceLoader. That's the same principle how Spark loads different data sources: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L640

cerveada commented 2 years ago

That could work. Or use an object, but in that case the serializability requirement should be well documented. It's up to you.

kevinwallimann commented 2 years ago

Hi @cerveada I refactored the PR completely. Now I'm using the Service Provider Interface (SPI), as commented earlier. I tested it on our use-case on the cluster and it worked as expected. I added a section in the readme as well.