databricks / spark-avro

Avro Data Source for Apache Spark
http://databricks.com/
Apache License 2.0
539 stars 310 forks source link

Fails to write record containing map of array of record #272

Open laymain opened 6 years ago

laymain commented 6 years ago

Hi,

Spark-avro fails to write a record that contains map of array of record with the following error:

org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.UnresolvedUnionException:
  Not in union [{"type":"record","name":"properties","fields":[{"name":"string","type":["string","null"]}]},"null"]: {"string": "one"}
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:308)
    at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
    at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
    at com.databricks.spark.avro.AvroOutputWriter.write(AvroOutputWriter.scala:81)
    at com.databricks.spark.avro.AvroOutputWriter.write(AvroOutputWriter.scala:75)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:327)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.UnresolvedUnionException:
  Not in union [{"type":"record","name":"properties","fields":[{"name":"string","type":["string","null"]}]},"null"]: {"string": "one"}
    at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:740)
    at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
    at org.apache.avro.reflect.ReflectDatumWriter.writeObjectArray(ReflectDatumWriter.java:120)
    at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:111)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
    at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:234)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:121)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
    at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
    at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:302)
    ... 17 more

schema.json

{
  "name": "Event",
  "namespace": "com.laymain.sandbox.avro",
  "type": "record",
  "fields": [
    {
      "name": "properties",
      "type": {
        "type": "map",
        "values": {
          "type": "array",
          "items": {
            "type": "record",
            "name": "ArrayObjectProperties",
            "fields": [
              {
                "name": "string",
                "type": "string",
                "connect.parameters": {
                  "avro.java.string": "String"
                },
                "avro.java.string": "String"
              }
            ]
          }
        }
      }
    }
  ]
}

event.json

{
  "properties": {
    "object": [
      { "string": "one" },
      { "string": "two" }
    ]
  }
}

Avro file generated using avro-tools: java -jar avro-tools-1.8.2.jar --schema-file schema.json event.json > event.avro

Spark code:

public static void main(String[] args) throws Exception {
    URL avroResource = Main.class.getClassLoader().getResource("event.avro");
    if (avroResource == null) {
        throw new RuntimeException("Missing resource event.avro");
    }

    SparkSession sparkSession = SparkSession.builder()
        .appName("com.laymain.sandbox.avro")
        .master("local[*]")
        .getOrCreate();
    sparkSession
        .read()
        .format("com.databricks.spark.avro")
        .load(avroResource.getPath())
        .write()
        .mode(SaveMode.Overwrite)
        .format("com.databricks.spark.avro")
        .save("output");
}
laymain commented 6 years ago

Due to #92 the generated schema for the Spark Row has all its fields nullable, which gives the following output schema:

{
  "type": "record",
  "name": "topLevelRecord",
  "fields": [{
      "name": "properties",
      "type": [{
          "type": "map",
          "values": [{
              "type": "array",
              "items": [{
                  "type": "record",
                  "name": "properties",
                  "fields": [{
                      "name": "string",
                      "type": ["string", "null"]
                    }
                  ]
                }, "null"]
            }, "null"]
        }, "null"]
    }
  ]
}

When I try to use this schema with the json object to generate a new avro file: avro-tool fromjson --schema-file schema-generated.json event.json > event-generated.avro I get the following error:

Exception in thread "main" org.apache.avro.AvroTypeException: Unknown union branch object
        at org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:445)
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at org.apache.avro.tool.DataFileWriteTool.run(DataFileWriteTool.java:99)
        at org.apache.avro.tool.Main.run(Main.java:87)
        at org.apache.avro.tool.Main.main(Main.java:76)

Is the generated schema invalid or is it a bug in Avro?

laymain commented 6 years ago

It is linked to the nullable issue (#92), the expected new input data for the generated schema is:

{
  "properties": {
    "map": {
      "object": {
        "array": [
          {"properties": {"string": {"string": "one"}}},
          {"properties": {"string": {"string": "two"}}}
        ]
      }
    }
  }
}

instead of the initial input

{
  "properties": {
    "object": [
      { "string": "one" },
      { "string": "two" }
    ]
  }
}
laymain commented 6 years ago

I found a workaround by using the schema to generate the right StructType with SchemaConverters and creating a new dataset with this StructType:

public static void main(String[] args) throws Exception {
  Schema schema = new Schema.Parser().parse(Main.class.getClassLoader().getResourceAsStream("schema.json"));
  DataType dataType = SchemaConverters.toSqlType(schema).dataType();
  StructType structType = (StructType)dataType;

  URL avroResource = Main.class.getClassLoader().getResource("event.avro");
  if (avroResource == null) {
    throw new RuntimeException("Missing resource event.avro");
  }

  SparkSession sparkSession = SparkSession.builder()
    .appName("com.laymain.sandbox.avro")
    .master("local[*]")
    .getOrCreate();
  Dataset<Row> dataset = sparkSession
    .read()
    .format("com.databricks.spark.avro")
    .load(avroResource.getPath());
  dataset
    .sqlContext()
    .createDataFrame(dataset.rdd(), structType)
    .write()
    .mode(SaveMode.Overwrite)
    .format("com.databricks.spark.avro")
    .save("output");
}