GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
374 stars 196 forks source link

Spark BQ connector unable to handle timestamps - 0001-01-01 00:00:00 #1242

Closed Murli16 closed 3 months ago

Murli16 commented 3 months ago

The customer is using spark jobs to read data from sql server and write to bigquery, the customer is unable to process the timestamp column with values - 0001-01-01 00:00:00.

spark connector version - 0.36.1 Dataproc image - 2.2.5-ubuntu22 spark 3.5 Version

Error Message

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 in stage 4.0 (TID 3) had a not serializable result: com.google.cloud.spark.bigquery.repackaged.io.grpc.Status
Serialization stack:
    - object not serializable (class: com.google.cloud.spark.bigquery.repackaged.io.grpc.Status, value: Status{code=INVALID_ARGUMENT, description=Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: projects/jbh-dev-dataengineering-bqods/datasets/NonCDC_Staging/tables/RAT_ADJ_DerivativeParameter/streams/Cic2NzA4MDY4NS0wMDAwLTI4ZDktODNiYS0xNDIyM2JiNDE1YjI6czM, cause=null})
    - writeObject data (class: java.lang.Throwable)
    - object (class com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError, com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: projects/jbh-dev-dataengineering-bqods/datasets/NonCDC_Staging/tables/RAT_ADJ_DerivativeParameter/streams/Cic2NzA4MDY4NS0wMDAwLTI4ZDktODNiYS0xNDIyM2JiNDE1YjI6czM)
    - writeObject data (class: java.lang.Throwable)
    - object (class java.util.concurrent.ExecutionException, java.util.concurrent.ExecutionException: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: projects/jbh-dev-dataengineering-bqods/datasets/NonCDC_Staging/tables/RAT_ADJ_DerivativeParameter/streams/Cic2NzA4MDY4NS0wMDAwLTI4ZDktODNiYS0xNDIyM2JiNDE1YjI6czM)
    - writeObject data (class: java.lang.Throwable)
    - object (class com.google.cloud.bigquery.connector.common.BigQueryConnectorException, com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Execution Exception while retrieving AppendRowsResponse)
    - field (class: com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler$1, name: val$e, type: class java.lang.Exception)
    - object (class com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler$1, com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler$1@1be653ee)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)

Issue Reproduction Steps

**Cluster Creation Specs**
gcloud dataproc clusters create cluster \
--enable-component-gateway --region us-central1 \
--subnet default \
--no-address \
--master-machine-type n2-standard-4 \
--master-boot-disk-type pd-balanced \
--master-boot-disk-size 500 \
--num-workers 2 \
--worker-machine-type n2-standard-4 \
--worker-boot-disk-type pd-balanced \
--worker-boot-disk-size 500 \
--image-version 2.2.5-ubuntu22 \
--properties dataproc:dataproc.conscrypt.provider.enable=false \
--metadata SPARK_BQ_CONNECTOR_VERSION=0.36.1 \
--project gcp-sandbox-1-359004
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import logging
from datetime import datetime

"""
gcloud dataproc jobs submit pyspark main.py --cluster cluster --region us-central1
"""
logging.getLogger().setLevel(logging.INFO)

spark = SparkSession.builder.appName("spark_app").getOrCreate()

schema = StructType([
    StructField("datetime_col", StringType(), True),
    StructField("timestamp_col", TimestampType(), True)
])
df = spark.createDataFrame([
    ("0001-01-01 00:00:00", datetime.strptime("0001-01-01 00:00:00", '%Y-%m-%d %H:%M:%S'))
    ],
    schema)

df.show()
df.printSchema()

df.write.format("bigquery") \
    .option("createDisposition", "CREATE_NEVER") \
    .option("writeMethod", "DIRECT") \
    .option("enableModeCheckForSchemaFields", "true") \
    .mode("append") \
    .save("<obfuscated>.dataset_us_central1.datetime_reproduce")
davidrabinowitz commented 3 months ago

Notice that for Datetime types you need to create the Spark equivalent of TimestampNTZ. Also, please verify that spark support those dates. Many computer systems do not support dates prior to 1900 well, not to mention prior to 1582 (the creation of the Gregorian calendar system). Can you please share the use case that requires those dates?

Murli16 commented 3 months ago

Hi @davidrabinowitz - The issue does not seem to be with datetime attribute type, this issue is more with timestamp attribute type.

We see that in dataproc the earliest date that can be used is 1900-01-01 without facing any data issue or data corruption. We do have a use-case where the source system stores lower end timestamps as 0001-01-01 00:00:00, Is there any fix that is possible for the same.

Just to add, we have a similar spark job running in databricks that is able to process this date without issue. I understand the spark version/environment between databricks and dataproc might be different.

isha97 commented 3 months ago

@Murli16 we do support timestamps like 0001-01-01 00:00:00 however the starting timestamp is 0001-01-01 00:00:00 UTC. Please make sure you are not using any timestamp before that. It would be a good idea to explicitly specify the time zone instead of using the Local Time Zone.