RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
939 stars 370 forks source link

NullPointerException when writing dataframe #346

Closed andrijaperovic closed 1 year ago

andrijaperovic commented 2 years ago

Running into an NPE when trying to write keys to Redis, we recently upgraded to spark-redis:3.1.0-SNAPSHOT by building the JAR from the release tag and staging it on our DBFS to run as part of our Databricks job, but it seems that we are still encountering the same error (for information, we are using 9.1 Databricks runtime with Spark 3.1.2:

Received command c on object id p0
Received command c on object id p1
2022-06-14T23:26:51.438+0000: [GC (Allocation Failure) [PSYoungGen: 5657600K->138221K(5795840K)] 5909237K->509172K(17665024K), 0.2369699 secs] [Times: user=0.56 sys=0.14, real=0.24 secs] 
There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/databricks/spark/python/pyspark/sql/utils.py", line 202, in call
    raise e
  File "/databricks/spark/python/pyspark/sql/utils.py", line 199, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/databricks/python/lib/python3.8/site-packages/jobs/scorer.py", line 170, in writeToRedis
    df.filter(df.score >= 0.0).write.format("org.apache.spark.sql.redis").option("key.column", "ip").option(
  File "/databricks/spark/python/pyspark/sql/readwriter.py", line 1134, in save
    self._jwrite.save()
  File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/sql/utils.py", line 117, in deco
    return f(*a, **kw)
  File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o548.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 64) (10.150.1.92 executor 6): java.lang.NullPointerException
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$dataKeyId$2(RedisSourceRelation.scala:202)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.redis.RedisSourceRelation.dataKeyId(RedisSourceRelation.scala:202)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$7(RedisSourceRelation.scala:127)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$6(RedisSourceRelation.scala:127)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$6$adapted(RedisSourceRelation.scala:125)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$5(RedisSourceRelation.scala:125)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$5$adapted(RedisSourceRelation.scala:123)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2517)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2828)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2775)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2769)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2769)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1305)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1305)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1305)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3036)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2977)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2965)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1067)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2477)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2460)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2517)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2542)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1025)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:419)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1023)
    at org.apache.spark.sql.Dataset.$anonfun$foreachPartition$1(Dataset.scala:2949)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.Dataset.$anonfun$withNewRDDExecutionId$1(Dataset.scala:3814)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:130)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:273)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:223)
    at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3812)
    at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2949)
    at org.apache.spark.sql.redis.RedisSourceRelation.insert(RedisSourceRelation.scala:123)
    at org.apache.spark.sql.redis.DefaultSource.createRelation(DefaultSource.scala:23)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:213)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:257)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:209)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:167)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:166)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:1080)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:130)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:273)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:223)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1080)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:469)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:439)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:312)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$dataKeyId$2(RedisSourceRelation.scala:202)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.redis.RedisSourceRelation.dataKeyId(RedisSourceRelation.scala:202)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$7(RedisSourceRelation.scala:127)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$6(RedisSourceRelation.scala:127)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$6$adapted(RedisSourceRelation.scala:125)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$5(RedisSourceRelation.scala:125)
    at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$5$adapted(RedisSourceRelation.scala:123)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2517)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

I'm also observing a GC Allocation Failure so not sure if this is also a two-fold problem with spark.executor.memory.

fe2s commented 2 years ago

It looks like some of the rows of your DataFrame have null key. The key is specified with "key.column" option.

sazzad16 commented 1 year ago

Closed due to inactivity.