AbsaOSS / hyperdrive

Extensible streaming ingestion pipeline on top of Apache Spark
Apache License 2.0
44 stars 13 forks source link

Preserve nullability from Avro to Catalyst Schema #137

Closed kevinwallimann closed 4 years ago

kevinwallimann commented 4 years ago

Currently, in a Kafka-to-Kafka (i.e. Avro -> Catalyst -> Avro) workflow (with columnselectortransformer), all fields are always nullable in the destination topic. Example: Source schema

{
  "type": "record",
  "name": "pageviews",
  "namespace": "ksql",
  "fields": [
    {
      "name": "viewtime",
      "type": "long"
    }
  ]
}

is written as

{
  "type": "record",
  "name": "pageviews",
  "namespace": "ksql",
  "fields": [
    {
      "name": "viewtime",
      "type": ["long", "null"]
    }
  ]
}

Expected: Non-nullable fields in the source Avro schema should also be non-nullable in the destination. Nullable fields should stay nullable obviously.

Migration note Making an existing nullable field non-nullable is a forward-compatible change (it's almost like adding a field)

kevinwallimann commented 4 years ago

Analysis za.co.absa.abris.avro.sql.AvroDataToCatalyst is always nullable. https://github.com/AbsaOSS/ABRiS/blob/985fab1894826b4ea97a48d065a048b44a0ed180/src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala#L47 Not sure why it's not child.nullable, but it wouldn't change much in our case.

Since the expression wraps around the actual avro record, it causes all fields to be nullable when the wrapper expression is flattened like this

dataFrame
      .select(from_confluent_avro(col("value"), schemaRegistrySettings) as 'data)
      .select("data.*")

The root cause for the nullability is that the value of a kafka record can in general always be null. One use-case for a null value is the tombstone https://www.confluent.io/blog/handling-gdpr-log-forget/.