GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
378 stars 198 forks source link

Disable Schema Changes #1244

Closed gmiano closed 4 months ago

gmiano commented 5 months ago

Hi,

I am using the BigQuery connector on a DataProc cluster to create a DataFrame and store it within a BigQuery table. The table is created via Terraform with a hardcoded schema where all fields are set to REQUIRED.

This table is refreshed every day, and the results are overwritten. The code used to write to this table is as follows:

data.write.format("bigquery") \
    .mode("overwrite") \
    .option("table", table_fullname) \
    .option("createDisposition", "CREATE_NEVER") \
    .option("allowFieldAddition", "false") \
    .option("allowFieldRelaxation", "false") \
    .option("temporaryGcsBucket", self._config.get_config("temp_gcs_bucket")) \
    .save()

Unfortunately, this configuration does not work as expected. When writing to the table, the schema is updated, and many fields are switched from REQUIRED to NULLABLE. Since the connector version wasn't explicitly specified, it should be the default version, which is spark-3.5-bigquery-0.39.0.jar.

Any guidance on resolving this issue would be greatly appreciated. Thank you!

isha97 commented 4 months ago

The fix will be available in the next release.

gmiano commented 3 months ago

Hey @isha97 , good afternoon.

Finally I was able to update our connector and test the new feature. Schema field relaxation works as expected. Thank you for your commitment and effective fix! It was very helpful for my team.

However schema field addition seems not to work. It is not a big deal for us as we wont use it, however I just want to notify this other unexpected behaviour. I replaced the old jar with gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.40.0.jar, however if I add a new column that is not defined in the table schema, then the schema is updated and the new column added.

Here the code I use to write:

df_writer = (
    data.write.format(source="bigquery")
    .mode(saveMode="overwrite")
    .option(key="table", value=table_fullname)
    .option(key="createDisposition", value="CREATE_NEVER")
    .option("allowFieldAddition", "false")
    .option("allowFieldRelaxation", "false")
    .option(key="temporaryGcsBucket", value=self._config.get_config("temp_gcs_bucket"))
)