GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
369 stars 193 forks source link

Unable to write on a BQ table, with the new spark connector update #658

Closed MARCTELLY closed 2 years ago

MARCTELLY commented 2 years ago

Hi all,

Since the last release of the connector some of our Spark jobs started to fail with the following error:

Exception in thread "main" java.lang.RuntimeException: Failed to write to BigQuery
    at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:70)
    at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.insert(BigQueryInsertableRelation.scala:43)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:111)
  [...]
Caused by: java.lang.IllegalArgumentException: com.google.cloud.bigquery.connector.common.BigQueryConnectorException$InvalidSchemaException: Destination table's schema is not compatible with dataframe's schema
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
    at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:466)
    at com.google.cloud.spark.bigquery.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.scala:92)
    at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:67)
    ... 40 more

After some investigations, this seems to be related to the PR #613, which added a schema equality check before writing.

For example: Given a BigQuery table with the following schema (2 nullable string columns):

[
  {
    "mode": "NULLABLE",
    "name": "field1",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "field2",
    "type": "STRING"
  }
]

And the following Spark dataframe (1 nullable string column):

import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import scala.util.Try

val spark = SparkSession
  .builder()
  .master("local[2]")
  .getOrCreate()

import spark.implicits._

val ds = spark.createDataset(Seq.empty[Option[String]]).toDF("field1")
ds.printSchema()

The following code worked:

ds
  .write
  .format("bigquery")
  .mode(SaveMode.Append)
  .option("writeMethod", "indirect")
  .save("test_dataset.table")

With the new version this does not work, as well as the following code, both raising the exception shown earlier:

ds
  .write
  .format("bigquery")
  .mode(SaveMode.Append)
  .option("writeMethod", "indirect")
  .option("enableModeCheckForSchemaFields", false)
  .save("test_dataset.table")

However, the following one (adding the missing nullable column) works:

ds
  .withColumn("field2", org.apache.spark.sql.functions.lit(null).cast("string"))
  .write
  .format("bigquery")
  .mode(SaveMode.Append)
  .option("writeMethod", "indirect")
  .save("test_dataset.table")

Below I propose a test to be added to spark-bigquery-tests/src/main/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java to ensure the continuity of the previous behavior:

public void testWriteToBigQueryWithMissingNullableField() {
  assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
  // A dataframe with missing nullable column int_null
  Dataset<Row> df =
      spark
          .createDataset(Seq(Option(1)))
          .toDF("int_req")

  // Write to a table with all nullable columns
  df.write()
      .format("bigquery")
      .mode(SaveMode.Append)
      .option("writeMethod", writeMethod.toString())
      .save(testDataset + "." + TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_NAME);
}

Kind regards.

davidrabinowitz commented 2 years ago

Can you please provide the schema of the table in BigQuery?

MARCTELLY commented 2 years ago

Hello @davidrabinowitz @suryasoma I made an update of the issue

suryasoma commented 2 years ago

Hello @MARCTELLY, the issue is fixed and the fix would be available in the next release. Thanks

javidlc commented 2 years ago

Hello, I am having the same issue. I write every day with a workflow to an existent table since 2020 and since june 1, the date of the latest release, i could not write anymore because of the same problem commented: Caused by: java.lang.IllegalArgumentException: com.google.cloud.bigquery.connector.common.BigQueryConnectorException$InvalidSchemaException: Destination table's schema is not compatible with dataframe's schema

So, when will be the next release @suryasoma ? or how can i fix this now (it is very important to my work)?

Thanks

MadMenHitBooker commented 2 years ago

Hi, i have the same issue but i've been able to do a workaround by forcing my field in spark to be nullable before writing, since my BigQuery Schema was configured that way.

kaiseu commented 2 years ago

seems this issue is affecting version 0.25.0? 0.24.2 is ok

javidlc commented 2 years ago

I changed the version to a previous one, and works. Thanks!

suryasoma commented 2 years ago

Hey everyone, please find the fix for this in the latest release 0.25.2 Thanks

katerina-kogan commented 9 months ago

Does the issue still persist in 0.34? Tried to run python dbt model: first time the table gets created, on the subsequent runs (with the same data) it gives an error "Destination table's schema is not compatible with dataframe's schema"

davidrabinowitz commented 8 months ago

I shouldn't. Can you please provide more detail, preferably as a new issue?

katerina-kogan commented 8 months ago

thank you, posted here: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/1149