s22s / avro2spark

Experiment in constructing a Spark `StructType` from an Avro `Schema`
Other
0 stars 1 forks source link

Object encoding to Avro is repeated for each extracted field #5

Closed metasim closed 7 years ago

metasim commented 7 years ago

Execution tree looks like this:

:- extent
:- extractfromavro(encodetoavro(input[0, geotrellis.spark.TemporalProjectedExtent, false]), StructField(extent,StructType(StructField(xmin,DoubleType,false), StructField(ymin,DoubleType,false), StructField(xmax,DoubleType,false), StructField(ymax,DoubleType,false)),false))
:  +- encodetoavro(input[0, geotrellis.spark.TemporalProjectedExtent, false])
:     +- input[0, geotrellis.spark.TemporalProjectedExtent, false]
:- epsg
:- extractfromavro(encodetoavro(input[0, geotrellis.spark.TemporalProjectedExtent, false]), StructField(epsg,IntegerType,false))
:  +- encodetoavro(input[0, geotrellis.spark.TemporalProjectedExtent, false])
:     +- input[0, geotrellis.spark.TemporalProjectedExtent, false]
:- instant
+- extractfromavro(encodetoavro(input[0, geotrellis.spark.TemporalProjectedExtent, false]), StructField(instant,LongType,false))
   +- encodetoavro(input[0, geotrellis.spark.TemporalProjectedExtent, false])
      +- input[0, geotrellis.spark.TemporalProjectedExtent, false]
metasim commented 7 years ago

Reverse Engineering Generic Encoders

Case Class

Setup

case class Foo(stuff: Seq[Int], name: String, other: Double)
val enc = Encoders.product[Foo].asInstanceOf[ExpressionEncoder[Foo]]

Serializers

enc.serializer.foreach(e ⇒ println(e.treeString(true)))

yields:

newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS stuff#0
+- newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData)
   +- assertnotnull(input[0, astraea.demo.DatabricksAvro$Foo, true], top level non-flat input object).stuff
      +- assertnotnull(input[0, astraea.demo.DatabricksAvro$Foo, true], top level non-flat input object)
         +- input[0, astraea.demo.DatabricksAvro$Foo, true]

staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, astraea.demo.DatabricksAvro$Foo, true], top level non-flat input object).name, true) AS name#1
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, astraea.demo.DatabricksAvro$Foo, true], top level non-flat input object).name, true)
   +- assertnotnull(input[0, astraea.demo.DatabricksAvro$Foo, true], top level non-flat input object).name
      +- assertnotnull(input[0, astraea.demo.DatabricksAvro$Foo, true], top level non-flat input object)
         +- input[0, astraea.demo.DatabricksAvro$Foo, true]

assertnotnull(input[0, astraea.demo.DatabricksAvro$Foo, true], top level non-flat input object).other AS other#2
+- assertnotnull(input[0, astraea.demo.DatabricksAvro$Foo, true], top level non-flat input object).other
   +- assertnotnull(input[0, astraea.demo.DatabricksAvro$Foo, true], top level non-flat input object)
      +- input[0, astraea.demo.DatabricksAvro$Foo, true]

Deserializer

println(enc.deserializer.treeString(true))

yields:

newInstance(class astraea.demo.DatabricksAvro$Foo)
:- staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull1, IntegerType, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, IntegerType), - array element class: "scala.Int", - field (class: "scala.collection.Seq", name: "stuff"), - root class: "astraea.demo.DatabricksAvro.Foo"), upcast('stuff, ArrayType(IntegerType,false), - field (class: "scala.collection.Seq", name: "stuff"), - root class: "astraea.demo.DatabricksAvro.Foo")).array, true)
:  +- mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull1, IntegerType, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, IntegerType), - array element class: "scala.Int", - field (class: "scala.collection.Seq", name: "stuff"), - root class: "astraea.demo.DatabricksAvro.Foo"), upcast('stuff, ArrayType(IntegerType,false), - field (class: "scala.collection.Seq", name: "stuff"), - root class: "astraea.demo.DatabricksAvro.Foo")).array
:     +- mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull1, IntegerType, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, IntegerType), - array element class: "scala.Int", - field (class: "scala.collection.Seq", name: "stuff"), - root class: "astraea.demo.DatabricksAvro.Foo"), upcast('stuff, ArrayType(IntegerType,false), - field (class: "scala.collection.Seq", name: "stuff"), - root class: "astraea.demo.DatabricksAvro.Foo"))
:        :- assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, IntegerType), - array element class: "scala.Int", - field (class: "scala.collection.Seq", name: "stuff"), - root class: "astraea.demo.DatabricksAvro.Foo")
:        :  +- lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, IntegerType)
:        +- upcast('stuff, ArrayType(IntegerType,false), - field (class: "scala.collection.Seq", name: "stuff"), - root class: "astraea.demo.DatabricksAvro.Foo")
:           +- 'stuff
:- upcast('name, StringType, - field (class: "java.lang.String", name: "name"), - root class: "astraea.demo.DatabricksAvro.Foo").toString
:  +- upcast('name, StringType, - field (class: "java.lang.String", name: "name"), - root class: "astraea.demo.DatabricksAvro.Foo")
:     +- 'name
+- assertnotnull(upcast('other, DoubleType, - field (class: "scala.Double", name: "other"), - root class: "astraea.demo.DatabricksAvro.Foo"), - field (class: "scala.Double", name: "other"), - root class: "astraea.demo.DatabricksAvro.Foo")
   +- upcast('other, DoubleType, - field (class: "scala.Double", name: "other"), - root class: "astraea.demo.DatabricksAvro.Foo")
      +- 'other

JavaBean

Setup

class Foo2(
  @BeanProperty var stuff: java.util.List[java.lang.Integer],
  @BeanProperty var name: String,
  @BeanProperty var other: Double
)
val enc2 = Encoders.bean[Foo2](classOf[Foo2]).asInstanceOf[ExpressionEncoder[Foo2]]

Serializers

enc2.serializer.foreach(e ⇒ println(e.treeString(true)))

yields:

staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, astraea.demo.DatabricksAvro$Foo2, true].name, true) AS name#3
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, astraea.demo.DatabricksAvro$Foo2, true].name, true)
   +- input[0, astraea.demo.DatabricksAvro$Foo2, true].name
      +- input[0, astraea.demo.DatabricksAvro$Foo2, true]

input[0, astraea.demo.DatabricksAvro$Foo2, true].other AS other#4
+- input[0, astraea.demo.DatabricksAvro$Foo2, true].other
   +- input[0, astraea.demo.DatabricksAvro$Foo2, true]

newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS stuff#5
+- newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData)
   +- input[0, astraea.demo.DatabricksAvro$Foo2, true].stuff
      +- input[0, astraea.demo.DatabricksAvro$Foo2, true]

Deserializer

println(enc2.deserializer.treeString(true))

yields:

initializejavabean(newInstance(class astraea.demo.DatabricksAvro$Foo2), (name_$eq,'name.toString), (other_$eq,assertnotnull('other, currently no type path record in java)), (stuff_$eq,staticinvoke(class java.util.Arrays, ObjectType(interface java.util.List), asList, mapobjects(MapObjects_loopValue2, MapObjects_loopIsNull3, IntegerType, newInstance(class java.lang.Integer), 'stuff).array, true)))
:- newInstance(class astraea.demo.DatabricksAvro$Foo2)
:- 'name.toString
:  +- 'name
:- assertnotnull('other, currently no type path record in java)
:  +- 'other
+- staticinvoke(class java.util.Arrays, ObjectType(interface java.util.List), asList, mapobjects(MapObjects_loopValue2, MapObjects_loopIsNull3, IntegerType, newInstance(class java.lang.Integer), 'stuff).array, true)
   +- mapobjects(MapObjects_loopValue2, MapObjects_loopIsNull3, IntegerType, newInstance(class java.lang.Integer), 'stuff).array
      +- mapobjects(MapObjects_loopValue2, MapObjects_loopIsNull3, IntegerType, newInstance(class java.lang.Integer), 'stuff)
         :- newInstance(class java.lang.Integer)
         :  +- lambdavariable(MapObjects_loopValue2, MapObjects_loopIsNull3, IntegerType)
         +- 'stuff

Tuple

Setup

val enc3 = ExpressionEncoder[(Int, String)]

Serializers

enc3.serializer.foreach(e ⇒ println(e.treeString(true)))

yields:

assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1 AS _1#6
+- assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1
   +- assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)
      +- input[0, scala.Tuple2, true]

staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2, true) AS _2#7
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2, true)
   +- assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2
      +- assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)
         +- input[0, scala.Tuple2, true]

Deserializer

println(enc3.deserializer.treeString(true))

yields:

newInstance(class scala.Tuple2)
:- upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - field (class: "scala.Int", name: "_1"), - root class: "scala.Tuple2")
:  +- getcolumnbyordinal(0, IntegerType)
+- upcast(getcolumnbyordinal(1, StringType), StringType, - field (class: "java.lang.String", name: "_2"), - root class: "scala.Tuple2").toString
   +- upcast(getcolumnbyordinal(1, StringType), StringType, - field (class: "java.lang.String", name: "_2"), - root class: "scala.Tuple2")
      +- getcolumnbyordinal(1, StringType)
metasim commented 7 years ago

Considerations

metasim commented 7 years ago

Resolution is to not pull out separate fields, but to have a single object come out of the conversion and do projections from there to get the columns desired.