spotify / spark-bigquery

Google BigQuery support for Spark, SQL, and DataFrames
Apache License 2.0
155 stars 52 forks source link

The Apache Avro library failed to parse the header #57

Open matthew-fishkin opened 6 years ago

matthew-fishkin commented 6 years ago

Spark version: 2.2.0 Spotify/spark-bigquery version: 0.2.2

Hi,

I am trying to use the saveAsBigQuery table function to write a schema that has an array of struct as a field. However, I am getting the following error:

The Apache Avro library failed to parse the header with the follwing error: Invalid namespace: .topic_scores

The offending field is:


{
            "type": [
                {
                    "items": [
                        {
                            "namespace": ".topic_scores",
                            "type": "record",
                            "name": "topic_scores",
                            "fields": [
                                {
                                    "type": "int",
                                    "name": "index"
                                },
                                {
                                    "type": "float",
                                    "name": "score"
                                }
                            ]
                        },
                        "null"
                    ],
                    "type": "array"
                },
                "null"
            ],
            "name": "topic_scores"
        }

You can see that the namespace field begins with a dot. My guess is that the issue stems from https://github.com/spotify/spark-bigquery/blob/master/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala#L342-L346

I can't find a way to configure the recordNamespace value. According to avro documentation:

You can specify the record name and namespace like this:

import com.databricks.spark.avro._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.read.avro("src/test/resources/episodes.avro")

val name = "AvroTest"
val namespace = "com.databricks.spark.avro"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).avro("/tmp/output")

I think this is the line that reads that option, and sets the value to an empty string if not provided: https://github.com/databricks/spark-avro/blob/branch-4.0/src/main/scala/com/databricks/spark/avro/DefaultSource.scala#L114

These options are not parameterized anywhere in the Spotify library. Has anyone seen this issue or have a workaround? Thanks!

matthew-fishkin commented 6 years ago

Update on this..

My suspicion was correct. The problem is solved when you supply a recordNamespace option to your avro writer. The modified code looks like this:

/**
    * Save a [[DataFrame]] to a BigQuery table.
    */
  def saveAsBigQueryTable(tableRef: TableReference,
                          writeDisposition: WriteDisposition.Value,
                          createDisposition: CreateDisposition.Value,
                          avroOptions: Map[String, String]): Unit = {
    val bucket = conf.get(BigQueryConfiguration.GCS_BUCKET_KEY)
    val temp = s"spark-bigquery-${System.currentTimeMillis()}=${Random.nextInt(Int.MaxValue)}"
    val gcsPath = s"gs://$bucket/hadoop/tmp/spark-bigquery/$temp"
    df.write.options(avroOptions).avro(gcsPath)

    val fdf = bq.load(gcsPath, tableRef, writeDisposition, createDisposition)
    delete(new Path(gcsPath))
    fdf
  }

  /**
    * Save a [[DataFrame]] to a BigQuery table.
    */
  def saveAsBigQueryTable(tableSpec: String,
                          writeDisposition: WriteDisposition.Value = null,
                          createDisposition: CreateDisposition.Value = null,
                          avroOptions: Map[String, String] = Map.empty): Unit =
    saveAsBigQueryTable(
      BigQueryStrings.parseTableReference(tableSpec),
      writeDisposition,
      createDisposition,
      avroOptions)

And the usage is something like:

bqDataframe
      .saveAsBigQueryTable(
        tableSpec = tableName,
        avroOptions = Map("recordNamespace" -> "com.foo.bar")
      )

I can create a PR to address the issue if you guys think this is a valid solution

DaimonPl commented 6 years ago

@matthew-fishkin we are experiencing similar problems, especially with nested (struct/map/array) schemas

AFAIK C++ avro library does not allow '.' in namespace and that's root cause of problem

matthew-fishkin commented 6 years ago

@DaimonPl I can't speak specifically to the C++ version of the library. We are working with the Scala version and the problem does not lie in the avro library. It is perfectly valid to have a namespace that begins with a dot. See: https://avro.apache.org/docs/1.8.2/spec.html

The problem we encountered was that BigQuery can't upload an avro schema that has a namespace that begins with a dot. The solution I posted above worked for us. Avro provides a set of options you can define on it's writer. Supplying Map("recordNamespace" -> "com.foo.bar") as an option will fix the issue. Unfortunately Spotify does not provide a way to pass this through to the writer, so you will need to copy and adjust their code. I am waiting to hear back from an owner of the code to see if this is a viable solution and then I will gladly create a PR to introduce the change here.

DaimonPl commented 6 years ago

Google BigQuery uses C++ Avro library which does not allow dot for some reason :)

https://github.com/apache/avro/blob/17f2d75132021fafeca29edbdcade40df960fdc9/lang/c%2B%2B/impl/Node.cc#L71

pcejrowski commented 6 years ago

It's solved by https://github.com/spotify/spark-bigquery/commit/7f65455a53a7654c04368cec83100d8f780252d3.