spark-redshift-community / spark-redshift

Performant Redshift data source for Apache Spark
Apache License 2.0
135 stars 62 forks source link

Error parsing super type #146

Closed ryan6310 closed 1 month ago

ryan6310 commented 10 months ago

I am working on a data pipeline that involves multiple steps of reading and writings to redshift. I am having trouble with arrays of strings that are getting written to super type.

In spark the schema is

|-- race: array (nullable = true)
 |    |-- element: string (containsNull = true)

and in spark I see what I expect

-----------
| ['white'] |
-----------

This is then written to redshift. The column metadata contain redshift_type = Super

In redshift I can see the resulting items in the form

[{"string":"white"}]

When I load the table from redshift and set the schema I get a parsing error

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData

Is this the expected behavior? Do I need to manually parse the loaded string into arrays?

Full trace below:

File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 913, in show
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o216.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 22.0 failed 4 times, most recent failure: Lost task 1.3 in stage 22.0 (TID 32) ([2600:1f14:1e32:7b00:4e67:11ad:a41b:bf52] executor 1): java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:184)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2974)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2910)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2909)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2909)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1263)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3173)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3101)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:154)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:265)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:264)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:541)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:503)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4244)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3205)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4234)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4232)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4232)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:3205)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3426)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:286)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:325)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:184)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
bsharifi commented 9 months ago

@ryan6310 Which version of the connector are you using? Complex type support was added to version 6.0.0 and later.

ryan6310 commented 9 months ago

I am using EMR Serverless 6.12.0.

bsharifi commented 1 month ago

@ryan6310, EMR 6.12 uses an older version of the connector (5.1.0). Please upgrade to EMR 7.0 or newer to utilize a post 6.0.0 version of the connector.