GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
378 stars 198 forks source link

Issue with BigQuery counts in Spark after writing to table #1290

Open rchynoweth opened 2 months ago

rchynoweth commented 2 months ago

I am having an issue getting accurate counts when reading/writing to BigQuery from Databricks after installing the connector.

Connector Version: spark-3.5-bigquery-0.39.1.jar Apache Spark 3.5.0 Scala 2.12 Databricks 14.3LTS

Code to replicate:

from pyspark.sql import Row

# Create a list of Rows
data = [
    Row(col1="value1_1", col2="value1_2", col3="value1_3"),
    Row(col1="value2_1", col2="value2_2", col3="value2_3"),
    Row(col1="value3_1", col2="value3_2", col3="value3_3"),
    Row(col1="value4_1", col2="value4_2", col3="value4_3"),
    Row(col1="value5_1", col2="value5_2", col3="value5_3"),
    Row(col1="value6_1", col2="value6_2", col3="value6_3"),
    Row(col1="value7_1", col2="value7_2", col3="value7_3"),
    Row(col1="value8_1", col2="value8_2", col3="value8_3"),
    Row(col1="value9_1", col2="value9_2", col3="value9_3"),
    Row(col1="value10_1", col2="value10_2", col3="value10_3")
]

# Create DataFrame
df = spark.createDataFrame(data)

# count table
(spark.read
  .format("bigquery")
  .option("table", 'mydataset.test_read_write_table')
  .option("project", 'myproject')
  .load()
  ).count()

### prints 50

# write 10 rows to table
(df.write
  .mode('append')
  .format("bigquery")
  .option("project", 'myproject')
  .option("writeMethod", "direct")
  .save('mydataset.test_read_write_table')
)

# check count again
(spark.read
  .format("bigquery")
  .option("table", 'mydataset.test_read_write_table')
  .option("project", 'myproject')
  .load()
).count()
### prints 50 but should be 60

When I check the BQ table the rows are updated but not reflected in my Dataframe. If I use the query option instead of the table and perform "select count(1) from mydataset.test_read_write_table" then the counts are accurate. This seems like a potential cache problem which I tried using the cacheExpirationTimeInMinutes option to 0 but it seems to not work. However, if I set it to a positive integer it does work after the time setting is up.

agrawal-siddharth commented 2 months ago

What happens if you omit the first count check? I'm trying to narrow down if there is an issue with the spark.read being called twice.

rchynoweth commented 2 months ago

Interesting, it appears that the last count returned is still incorrect if I omit the first count. To summarize, I just did the following:

  1. Ran a count in the BQ Console to determine the existing row count (140).
  2. Create a DF with 10 rows.
  3. Wrote the 10 rows to BQ via direct write.
  4. Ran spark.read with a count. It returned the incorrect number of rows (140).
  5. Ran a count in the BQ Console and it returned the proper row count (150).

However, after a couple minutes it returned the proper count as a dataframe.

MichalBogoryja commented 1 week ago

Hi @rchynoweth, I've encountered a similar issue. It looks like since spark-BQ connector 0.34.0 enableReadSessionCaching property is by default "true' (and corresponding one readSessionCacheDurationMins default is 5mins). I'm not 100% sure but for me, if you read the same table (even if it was changed in the meantime) within 5 minutes spark doesn't read any data from BQ but from the cache. I've tried to clean the cache but with no success. Simply set this property to "false" once starting your spark session and you should get the correct count. However, the option to refresh the table should be added.

rchynoweth commented 3 days ago

@MichalBogoryja - do you have to set it in the spark settings? Not in the dataframe read options? Doesn't work for me in the dataframe read.

vishalkarve15 commented 2 days ago

This has been fixed and will be available in the next release. In the meantime, you can test it using the nightly build. E.g. gs://spark-lib-nightly-snapshots/spark-3.5-bigquery-nightly-snapshot.jar