RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
939 stars 372 forks source link

Spark-redis with redis.clients.jedis.exceptions.JedisDataException cannot be cast to java.util.List #196

Closed kannyyou closed 5 years ago

kannyyou commented 5 years ago

`
val spark = SparkSession .builder() .appName("SparkRedis") .config("spark.redis.host", redisHost) .config("spark.redis.port", redisPort) .getOrCreate()

val redisSchema = StructType(Array( StructField("key", StringType), StructField("value", StringType) ))

val redisDF = spark .read .format("org.apache.spark.sql.redis") .schema(redisSchema) .option("keys.pattern","dims/campaign_id/") .load() `

When I run the above code, it comes with the below error. Spark 2.3.0 Spark-redis 2.3.1 redis.clients:jedis:3.0.0

19/09/05 20:49:17 INFO DAGScheduler: ResultStage 0 (show at RedisReader.scala:40) failed in 1.283 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdstg-1-003.stg.bi2.las1.mz-inc.com, executor 6): java.lang.ClassCastException: redis.clients.jedis.exceptions.JedisDataException cannot be cast to java.util.List at org.apache.spark.sql.redis.HashRedisPersistence.decodeRow(HashRedisPersistence.scala:49) at org.apache.spark.sql.redis.RedisSourceRelation$$anonfun$org$apache$spark$sql$redis$RedisSourceRelation$$scanRows$1$$anonfun$apply$9.apply(RedisSourceRelation.scala:291) at org.apache.spark.sql.redis.RedisSourceRelation$$anonfun$org$apache$spark$sql$redis$RedisSourceRelation$$scanRows$1$$anonfun$apply$9.apply(RedisSourceRelation.scala:289) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.redis.RedisSourceRelation$$anonfun$org$apache$spark$sql$redis$RedisSourceRelation$$scanRows$1.apply(RedisSourceRelation.scala:289) at org.apache.spark.sql.redis.RedisSourceRelation$$anonfun$org$apache$spark$sql$redis$RedisSourceRelation$$scanRows$1.apply(RedisSourceRelation.scala:276) at com.redislabs.provider.redis.util.ConnectionUtils$.withConnection(ConnectionUtils.scala:20) at org.apache.spark.sql.redis.RedisSourceRelation.org$apache$spark$sql$redis$RedisSourceRelation$$scanRows(RedisSourceRelation.scala:276) at org.apache.spark.sql.redis.RedisSourceRelation$$anonfun$buildScan$3$$anonfun$apply$7$$anonfun$apply$8.apply(RedisSourceRelation.scala:179) at org.apache.spark.sql.redis.RedisSourceRelation$$anonfun$buildScan$3$$anonfun$apply$7$$anonfun$apply$8.apply(RedisSourceRelation.scala:178) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

fe2s commented 5 years ago

Hi @kannyyou,

Can you please share how dims/campaign_id/ look like in Redis so that I can reproduce the exception?

kannyyou commented 5 years ago

@fe2s "dims/campaign_id/" is just key (string) in my redis. I can't get any value for any key. If I use RDD as described in https://github.com/RedisLabs/spark-redis/blob/master/doc/rdd.md, it works well.

fe2s commented 5 years ago

You cannot read strings into a dataframe, it's not supported. Dataframes are mapped to Redis hash.

fe2s commented 5 years ago

Closing the issue.

@kannyyou, if you need any further help, please open another issue.

daciz commented 3 years ago

I have the same question. redis.clients.jedis.exceptions.JedisDataException cannot be cast to java.util.List but I use this way to create RDD Dataset load = spark.read() .format( "org.apache.spark.sql.redis" ) .schema(schema) .option("keys.pattern", "u:*") .option("infer.schema", true) .load();