GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
376 stars 197 forks source link

com.google.cloud.spark.bigquery.ArrowSchemaConverter Unsupported Operation Exception #443

Closed appden1 closed 1 year ago

appden1 commented 3 years ago

I'm using spark-bigquery-with-dependencies_2.11-0.21.1.jar and having trouble with reading BigQuery data from Spark on Yarn cluster.

Pipeline: BigQuery -> Spark 2.3.2 with HDP 3.1.5 , Python 3.6

Code:

spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationProject",project-id)
spark.conf.set("materializationDataset",dataset-id)

sql="""select col1, col2 from project.dataset.table"""

df=spark.read.schema(schema).format("bigquery")\
...                     .option("credentialsFile","credential-file.json")\
...                     .option("parentProject",project-id).load(sql) 

df.show()

Log:

 executor 1): java.lang.UnsupportedOperationException
    at com.google.cloud.spark.bigquery.ArrowSchemaConverter$ArrowVectorAccessor.getUTF8String(ArrowSchemaConverter.java:311)
    at com.google.cloud.spark.bigquery.ArrowSchemaConverter.getUTF8String(ArrowSchemaConverter.java:120)
    at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getUTF8String(MutableColumnarRow.java:135)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
davidrabinowitz commented 3 years ago

What happens when you run the following:

df=spark.read.format("bigquery")\
                     .option("credentialsFile","credential-file.json")\
                     .option("parentProject",project-id).load(sql) 

Can you please share the schema of the result vs the schema you've provided?

appden1 commented 3 years ago

Schema i am using

schema = StructType([
    StructField('col1', StringType(), True),
    StructField('col2', StringType(), True),
])

df.printSchema()
root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)

If i don't specify schema , df.show() displays the 20 records, but if i do df.show(25) or df.write.format('csv').save('hdfs://path/") i get a different error

df.printSchema()
root
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)

Caused by: java.lang.IllegalStateException: Value at index is null
    at com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.BigIntVector.get(BigIntVector.java:112)
    at com.google.cloud.spark.bigquery.ArrowSchemaConverter$LongAccessor.getLong(ArrowSchemaConverter.java:364)
    at com.google.cloud.spark.bigquery.ArrowSchemaConverter.getLong(ArrowSchemaConverter.java:98)
    at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getLong(MutableColumnarRow.java:120)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

To add, my source bq fields are integer datatype. I have tried schema with IntegerType() as well, still the same issue.

appden1 commented 3 years ago

For some reason the dataframes shows null values for all the columns despite Bigquery table has data. something as below. it starts from the last column and sort of has a pattern ( every 16rows ) and until all rows are null.

+--------+---------+ |col1| col2| +--------+---------+ |a1|b1| |a2|b2| |a3|b3| |a4|null| |a5|null| |a6|null| |a7|null| |a8|null| |null|null| |null|null| |null|null| |null|null| |null|null| |null|null| |null|null|

emkornfield commented 3 years ago

Could you try downgrading the version of the spark connector to see if there is a difference? Also do you get errors if you use Avro as the output format instead of Arrow?

b0lle commented 2 years ago

I am facing the same issue: spark-bigquery-with-dependencies_2.12-0.23.2.jar Spark: 3.1.2 Scala: 2.12

Changing to Avro Format didnt help. Query tables was successful

emkornfield commented 2 years ago

Avro should at least throw a different exception though? Did that have more details?

b0lle commented 2 years ago

Hm, when I tried to show you the logs, the application suddenly worked :/