51zero / eel-sdk

Big Data Toolkit for the JVM
Apache License 2.0
145 stars 35 forks source link

Error writing parquet file from JSON with object array #389

Open haggy opened 5 years ago

haggy commented 5 years ago

Im trying to use JsonSource and write it as Parquet using ParquetSink. The issue I'm running into is that I get a ClassCastException whenever the Parquet writer encounters a Row created from json with an array of objects. From what I see, it's because the writer is trying to cast a scala Map to a Seq[Any] in the StructWriter.

Here is where the exception is thrown: https://github.com/51zero/eel-sdk/blob/v1.2.4/eel-core/src/main/scala/io/eels/component/parquet/RecordConsumerWriter.scala#L105

Here is the stack trace:

Exception in thread "main" java.lang.ClassCastException: scala.collection.immutable.Map$Map1 cannot be cast to scala.collection.Seq
    at io.eels.component.parquet.StructWriter.write(RecordConsumerWriter.scala:105)
    at io.eels.component.parquet.ArrayParquetWriter.$anonfun$write$2(RecordConsumerWriter.scala:84)
    at io.eels.component.parquet.ArrayParquetWriter.$anonfun$write$2$adapted(RecordConsumerWriter.scala:81)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at io.eels.component.parquet.ArrayParquetWriter.write(RecordConsumerWriter.scala:81)
    at io.eels.component.parquet.StructWriter.$anonfun$write$3(RecordConsumerWriter.scala:113)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
    at io.eels.component.parquet.StructWriter.write(RecordConsumerWriter.scala:106)
    at io.eels.component.parquet.RowWriter.write(RowWriteSupport.scala:42)
    at io.eels.component.parquet.RowWriteSupport.write(RowWriteSupport.scala:33)
    at io.eels.component.parquet.RowWriteSupport.write(RowWriteSupport.scala:15)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
    at io.eels.component.parquet.ParquetSink$$anon$1.write(ParquetSink.scala:35)
    at io.eels.datastream.SinkAction$$anon$2.$anonfun$run$3(SinkAction.scala:54)
    at io.eels.datastream.SinkAction$$anon$2.$anonfun$run$3$adapted(SinkAction.scala:53)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at io.eels.datastream.SinkAction$$anon$2.$anonfun$run$2(SinkAction.scala:53)
    at io.eels.datastream.SinkAction$$anon$2.$anonfun$run$2$adapted(SinkAction.scala:52)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at io.eels.datastream.SinkAction$$anon$2.run(SinkAction.scala:52)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Here is a fully runnable example (will need hadoop deps if you don't already have them):

import java.io.{ByteArrayInputStream, File, FileInputStream}
import java.nio.charset.StandardCharsets

import io.eels.component.json.JsonSource
import io.eels.component.parquet.ParquetSink
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

object ArrayOfObjectsTest extends App {

  // Setup Filesystem
  val hadoopConf = new Configuration()
  hadoopConf.addResource(
    new FileInputStream(new File(getClass.getResource("/hadoop/core-site.xml").toURI))
  )
  hadoopConf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
  implicit val fs: FileSystem = FileSystem.get(hadoopConf)

  // Setup Parquet Sink
  // Intentionally just overwrite the same file
  private val hadoopPath = s"/tmp/parquet-test"
  private val sink = ParquetSink(fs.makeQualified(new Path(hadoopPath)))
    .withOverwrite(true)

  // Setup JSON source with constant JSON value
  // Notice the `objectArray` field is an array of objects
  val source = JsonSource(() => {
    val jsonString = """
          {"objectArray":[{"key1":"value1"}]}
        """.stripMargin
    new ByteArrayInputStream(jsonString.getBytes(StandardCharsets.UTF_8))
  })

  // This throws: java.lang.ClassCastException: scala.collection.immutable.Map$Map1 cannot be cast to scala.collection.Seq
  source.toDataStream().to(sink)
}

core-site.xml file for local hadoop dev (I put this in my resources folder in a hadoop sub directory):

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/tmp</value>
    </property>

    <property>
        <name>fs.default.name</name>
        <value>file:///</value>
    </property>

</configuration>

I do realize that this is on version 1.2.4 of eels but there is no later version for scala 2.12 on Maven. Can you see any reason why Im getting this writer error? If it's an actual bug I'd be happy to take a stab at fixing it.

garyfrost commented 5 years ago

Hi haggy, thanks for raising this issue. This usecase is exactly the type of functionality eel looks to provide. We would be very welcome to you have a look at a fix, thanks.

Also, we'll look to get a new version out on 2.12 soon.

Thanks

haggy commented 5 years ago

@garyfrost I've started looking into this bug. Just an FYI, the build is currently failing due to a single test in the CsvSink: ""support overwrite"

"[998e7733-fbba-4361-96fa-f92596031e5f].csv" was not equal to "[d7238986-3d82-463b-a716-0de0e115bbce].csv"
ScalaTestFailureLocation: io.eels.component.csv.CsvSinkTest$$anonfun$1$$anonfun$apply$mcV$sp$5 at (CsvSinkTest.scala:114)
Expected :"[d7238986-3d82-463b-a716-0de0e115bbce].csv"
Actual   :"[998e7733-fbba-4361-96fa-f92596031e5f].csv"
<Click to see difference>

org.scalatest.exceptions.TestFailedException: "[998e7733-fbba-4361-96fa-f92596031e5f].csv" was not equal to "[d7238986-3d82-463b-a716-0de0e115bbce].csv"
    at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
    at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
    at io.eels.component.csv.CsvSinkTest$$anonfun$1$$anonfun$apply$mcV$sp$5.apply(CsvSinkTest.scala:114)
    at io.eels.component.csv.CsvSinkTest$$anonfun$1$$anonfun$apply$mcV$sp$5.apply(CsvSinkTest.scala:79)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
    at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
    at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:1075)
    at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
    at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1088)
    at io.eels.component.csv.CsvSinkTest.org$scalatest$BeforeAndAfter$$super$runTest(CsvSinkTest.scala:15)
    at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:203)
    at io.eels.component.csv.CsvSinkTest.runTest(CsvSinkTest.scala:15)

it's more of a heads up. Im not going to dig into that as it's a pre-existing issue and Im not familiar with that piece of it