AbsaOSS / hyperdrive

Extensible streaming ingestion pipeline on top of Apache Spark
Apache License 2.0
44 stars 13 forks source link

Preserve avro schema from Avro to Catalyst and from Catalyst to Avro #138

Closed kevinwallimann closed 2 years ago

kevinwallimann commented 4 years ago

Background

Currently, the org.apache.spark.sql.avro.SchemaConverters class is used to derive an avro type from a catalyst type. However, in a Avro-> Catalyst-> Avro query, this conversion is lossy. For example, information such as the default value, documentation or any custom json field in the avro schema is lost when converting to catalyst, and e.g. avro types BYTES and FIXED are both converted to the same catalyst type DecimalType (or BinaryType if no avro logical type is present).

For example, a BYTES type with logical type decimal

    "type" : {
      "type" : "bytes",
      "logicalType" : "decimal",
      "precision" : 8,
      "scale" : 3
    }

is converted to the Spark type DecimalType, which is in turn converted to the Avro type

    "type" : {
      "type" : "fixed",
      "name" : "fixed",
      "namespace" : "topLevelRecord.fieldName",
      "size" : 10,
      "logicalType" : "decimal",
      "precision" : 8,
      "scale" : 3
    }

Furthermore, default values in the source Avro schema are discarded in the Avro -> Catalyst conversion. This has two consequences: 1) Obviously, there's no way to generate a target Avro schema with that default value, and 2) a nullable type in avro is expressed as a union with null, where null is the first type in the union if and only if the default value is null. However, if the default value is unknown, there's no way to determine whether null should be the first or second type in the union.

For example, when a record schema with a default value

{
  "type" : "record",
  "name" : "topLevelRecord",
  "fields" : [ {
    "name" : "stringCol",
    "type" : [ "string", "null" ],
    "default" : "abcd"
  } ]
}

is converted to a StructType, the default value is lost

StructType(Seq(StructField("stringCol", StringType, nullable = true)))

and when the type is converted to an Avro type, the null type is in front of the string type in the union.

{
  "type" : "record",
  "name" : "topLevelRecord",
  "fields" : [ {
    "name" : "stringCol",
    "type" : [ "null", "string" ]
  } ]
}

Feature

There are two approaches.

1) The source schema could be used as an input to generate the target schema. However, since the Spark schema can change due to any number of spark transformations, renamings, column additions, there's no generic straightforward way to map a field in the source schema to a field in the target schema. A heuristic approach would be needed to decide which fields of the input Avro schema correspond to the fields of the output Avro schema.

2) The metadata object of Spark's StructField can be used to transport the information from the source Avro schema into the Spark schema and from there to the target Avro schema. This only works for Avro schemas, where the root type is a record type, i.e. it doesn't work if the Avro schema is just a map or an array, for example, because these would not be wrapped around by a StructField, but directly to a MapType or ArrayType, which don't have a metadata field, however.

Other

This issue only deals with default values and logical types. It may be extended to also support custom fields in the Avro schema in a separate issue. See also #137 https://issues.apache.org/jira/browse/SPARK-28008