RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
935 stars 368 forks source link

NullPointerException when writing dataframe #339

Closed nofardi closed 2 years ago

nofardi commented 2 years ago

My spark job reads data from redis, joins the data with another new dataframe and then writes it to redis. I keep getting NullPointerException when writing although most of the data is being written. I run the job on an haddop cluster, I'm writing approx. 7M keys

This is how I read

val redisDf = sqlContext.read.format("org.apache.spark.sql.redis")
      .schema(getRedisSchema())
      .option("keys.pattern", s"membership_poc:$accountId:*")
      .option("key.column", "redis_id")
      .load()

And this is how I write:

identifiersToWrite.write
      .format("org.apache.spark.sql.redis")
      .option("table", s"membership_poc")
      .option("key.column", "redis_id")
      .mode(SaveMode.Append)
      .save()

The "redis_id" cloumns looks like this: s"$accountId:$anotherId"

The stacktrace: Lost task 842.0 in stage 237.0 (TID 20929, spotcluster-29004-130-prod.eu1.appsflyer.com, executor 14): java.lang.NullPointerException at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$dataKeyId$2(RedisSourceRelation.scala:214) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.redis.RedisSourceRelation.dataKeyId(RedisSourceRelation.scala:214) at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$7(RedisSourceRelation.scala:139) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.immutable.List.map(List.scala:298) at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$6(RedisSourceRelation.scala:139) at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$6$adapted(RedisSourceRelation.scala:137) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$5(RedisSourceRelation.scala:137) at org.apache.spark.sql.redis.RedisSourceRelation.$anonfun$insert$5$adapted(RedisSourceRelation.scala:135) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:994) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:994) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

andrijaperovic commented 2 years ago

@nofardi running into as similar issue when writing out a large number of keys, would you be able to share you solution of how you resolved the issue?

fe2s commented 2 years ago

@andrijaperovic , please make sure there are no null keys in your dataframe