spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.55k stars 513 forks source link

ParquetTypeSortedBucketIO writes and reads don't work with CaseMapper #5017

Open clairemcginty opened 11 months ago

clairemcginty commented 11 months ago

key getter does not work when a CaseMapper is required to translate field names:

[info]   Cause: java.lang.IllegalStateException: Leaf key field user_name does not exist in record class class org.apache.beam.sdk.extensions.smb.ParquetEndToEndTest$Event
[info]   at org.apache.beam.sdk.extensions.smb.ParquetBucketMetadata.toKeyGetters(ParquetBucketMetadata.java:374)
[info]   at org.apache.beam.sdk.extensions.smb.ParquetBucketMetadata.extractKeyPrimary(ParquetBucketMetadata.java:240)
[info]   at org.apache.beam.sdk.extensions.smb.BucketMetadata.getKeyBytesPrimary(BucketMetadata.java:269)
[info]   at org.apache.beam.sdk.extensions.smb.BucketMetadata.primaryComparableKeyBytes(BucketMetadata.java:302)
[info]   at org.apache.beam.sdk.extensions.smb.SortedBucketSource$BucketedInput.lambda$createIterator$1(SortedBucketSource.java:551)
[info]   at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
[info]   at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
[info]   at org.apache.beam.sdk.extensions.smb.BufferedIterator.refill(BufferedIterator.java:42)
[info]   at org.apache.beam.sdk.extensions.smb.BufferedIterator.<init>(BufferedIterator.java:35)
[info]   at org.apache.beam.sdk.extensions.smb.SortedBucketSource$BucketedInput.lambda$createIterator$2(SortedBucketSource.java:553)

repro: https://github.com/spotify/scio/compare/parquet-type-caseformat?expand=1

RustedBones commented 11 months ago

Cause in in the ParquetBucketMetadata, when class is a Product, we expect to extract the key using the keyField[Secondary].

If magnolify's CaseMapper has an effect on the key field name, the metadata, won't be able to access the field using the reflect Method name.

As a 'dirty' workaround, users can define alias function for the key(s), respecting the storage casing, eg:

case class Event(userName: String, event: String, timestamp: Int) {
  def user_name: String = userName
}
implicit val parquetType: ParquetType[Event] = ParquetType[Event](SnakeCaseMapper)

Clean fix would imply to give the bucket metadata a way to transform the key field into the proper chained calls Method[]. As construction of the metadata on the read side is done from json file only, injecting this custom mapping isn't straightforward