spotify / magnolify

A collection of Magnolia add-on modules
https://spotify.github.io/magnolify
Apache License 2.0
165 stars 26 forks source link

Rework AvroCompat for ParquetType #766

Open clairemcginty opened 1 year ago

clairemcginty commented 1 year ago

Background: Parquet doesn’t have a single canonical in-memory representation like Avro does; it’s a file format whose read/write layer allows the user to select the specific data format they’d like to read Parquet records into. Parquet-Avro, which belongs to the OSS Parquet library, is one of the most popular. Magnolify-Parquet provides an alternative data format: Scala case classes. Theoretically, data formats can be mixed and matched; you can write using parquet-avro and read into Scala case classes, or vice versa.

The exception to data format interchangeability is when a schema contains a repeated field. Parquet’s MessageType natively supports marking any primitive or complex field as repeated, for example:

message TestRecord {
  required int64 intField (INTEGER(64,true));
  repeated binary listField (STRING);
}

By default, this is how Magnolify-Parquet generates schemas for repeated types. However, this protocol for repeated fields is incompatible with the Parquet-Avro format:

scala> import magnolify.parquet._
scala> case class TestRecord(intField: Int, listField: List[String])
scala> ParquetType[TestRecord].schema
val res0: org.apache.parquet.schema.MessageType =
message TestRecord {
  required int32 intField (INTEGER(32,true));
  repeated binary listField (STRING);
}
scala> ParquetType[TestRecord].avroSchema
java.lang.UnsupportedOperationException: REPEATED not supported outside LIST or MAP. Type: repeated binary listField (STRING)
  at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:292)
  at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:279)
  at magnolify.parquet.ParquetType$$anon$1.avroSchema$lzycompute(ParquetType.scala:89)
  at magnolify.parquet.ParquetType$$anon$1.avroSchema(ParquetType.scala:88)
  ... 59 elided

As a result, Parquet records containing a repeated field that are written with Parquet-Avro could be read using Magnolify-Parquet, until we introduced AvroCompat to Magnolify. This was a Scala object that, when imported, used an encoding trick to produce repeated schemas compatible with Parquet-Avro:

scala> import magnolify.parquet.ParquetArray.AvroCompat._
scala> ParquetType[TestRecord].schema
val res1: org.apache.parquet.schema.MessageType =
message TestRecord {
  required int32 intField (INTEGER(32,true));
  required group listField (LIST) {
    repeated binary array (STRING);
  }
}
scala> ParquetType[TestRecord].avroSchema
val res2: org.apache.avro.Schema = {"type":"record","name":"TestRecord","namespace":"$iw","fields":[{"name":"intField","type":"int"},{"name":"listField","type":{"type":"array","items":"string"}}]}

Problem Statement: This works, but has some downsides that have become more apparent with increased adoption, namely:

  1. The mechanism itself (importing a class) is not intuitive to users and easy to forget to add – and once you’ve started producing data with a certain schema, it’s non-trivial to change that schema.
  2. Since data produced with AvroCompat requires that the consumer import AvroCompat to read it, this design requires the consumer to have knowledge of how the data was produced.

IMO, the Avro compatibility design could be re-worked in a way that’s (a) easy and intuitive for producers to enable, and (b) doesn’t require the data consumer to know about how the upstream was produced. Possibly we should also make Avro-compatible the default way that Magnolify-Parquet writes repeated field fields, so that it’s opt-out, not opt-in, although that’s a potentially big breaking change for users.

clairemcginty commented 6 months ago

Have some new thoughts on this -

Reading the Parquet spec more closely, a non-grouped repeated field defaults to a required list. We've been relying on this default behavior in Magnolify ParquetType, but we probably should have been always adding a wrapper group to be explicit.

I filed and merged a fix for the underlying incompatibility in parquet-mr's AvroSchemaConverter, PARQUET-2425, since technically non-grouped repeated field should be supported 😅

Once the fix is released, IMO we should start moving away from AvroCompat in magnolify-parquet. Ideally we could just modify ParquetType to always produce a grouped repeated field schema, but that would potentially cause incompatibility issues downstream in the reader (for example, if you're reading a month's worth of partitions, and half of them are produced with a grouped field schema and half aren't, Scio would complain). Therefore, I think we can do this in two parts:

(1) Update all the reader code to treat non-grouped repeated field schemas as equivalent to required repeated field schemas during read time. This would mean updating Schema.checkCompatibility as well as wrapping repeated schemas into required groups if an Avro list is detected, here (We could just use AvroSchemaConverter#convert(pt.avroSchema) to get back a wrapped MessageType once PARQUET-2425 is released). Deprecate AvroCompat for reads.

(2) Eventually update the writer code to produce wrapped repeated schemas, maybe with a fallback option via Configuration. AvroCompat has a second functionality aside from wrapping lists: adding a metadata key parquet.avro.schema to the file footer. We should encapsulate that logic via a Configuration flag, or simply always write it 🤷‍♀️ . Deprecate AvroCompat for writes.

Thoughts?

clairemcginty commented 2 weeks ago

Additionally on the write side, once we've upgraded to Parquet 1.14.2 and the array compatibility is no longer an issue, I don't see any reason not to write the parquet.avro.schema key to the metadata footer by default, and deprecate AvroCompat on write.