RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
939 stars 372 forks source link

Using fromRedisZSetWithScore in Zeppelin Fails #52

Open seizadi opened 7 years ago

seizadi commented 7 years ago

I am trying to use Zeppelin to visualize table in Redis.

I have follwoing AWS EMR setup to run Zeppelin Release label:emr-5.4.0 Hadoop distribution:Amazon 2.7.3 Applications:Ganglia 3.7.2, Spark 2.1.0, Zeppelin 0.7.0

I am running with following Redis-Spark JARs % ls /usr/share/dal/lib commons-pool2-2.3.jar jedis-2.7.2.jar spark-redis-0.3.2.jar

I have setup the Zeppelin CLASSPATH environment to pull the proper JARs for Redis-Spark: $ cat /etc/zeppelin/conf/zeppelin-env.sh .... export CLASSPATH=":/usr/lib/hadoop-lzo/lib/:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/:/usr/share/aws/emr/emrfs/auxlib/:/usr/share/dal/lib/*" ....

I can ssh to the node and run this code from from the spark-shell CLI on the cluster.

Here is the code that I run in the Zeppelin Spark Interpreter and the the error that is returned:

import com.redislabs.provider.redis._ import org.apache.spark.sql.SparkSession

def redisSpark(host: String, port: String): SparkSession = SparkSession.builder().master("local").appName(this.getClass.getSimpleName).config("redis.host", host).config("redis.port", port).getOrCreate()

sc.stop() val spark = redisSpark("172.28.4.177", "6379")

val rdd = spark.sparkContext.fromRedisZSetWithScore("portal.300107.month:net:user:device:pid:dsrc.hits") rdd.foreach(println)

rdd: org.apache.spark.rdd.RDD[(String, Double)] = RedisZSetRDD[11] at RDD at RedisRDD.scala:117 org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: Failed to create local dir in /mnt/tmp/blockmgr-97b1f7a1-ca02-4d7a-a362-7f08db74109b/0c. java.io.IOException: Failed to create local dir in /mnt/tmp/blockmgr-97b1f7a1-ca02-4d7a-a362-7f08db74109b/0c. at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:70) at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:111) at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1339) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:910) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:726) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1233) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:122) at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:996) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:862) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:862) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreach(RDD.scala:915) ... 56 elided Caused by: java.io.IOException: Failed to create local dir in /mnt/tmp/blockmgr-97b1f7a1-ca02-4d7a-a362-7f08db74109b/0c. at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:70) at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:111) at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1339) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:910) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:726) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1233) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:122) at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:996) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:862) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

dolphane commented 7 years ago

Hi seizadi,

I'm running into the same problem with Zeppelin (" Failed to create local dir in /mnt/tmp/blockmgr ...") even when using the modifications you mention in your pull request. Do you follow the same steps in creating a new spark session?

Thanks

subbareddyn commented 6 years ago

Hi,

I'm also facing the same issue in zeppelin. Could you please let me know what steps that you have done to overcome this.