RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
940 stars 372 forks source link

Running the spark cluster with yarn #24

Open ScottWang opened 8 years ago

ScottWang commented 8 years ago

My code:

    val sc = new SparkContext(new SparkConf()
        .setMaster(sparkMode)
        .setAppName("recommendation")
        .set("redis.host", redisHost)
        .set("redis.port", redisPort)
        .set("redis.auth", redisPassword))

where sparkMode = 'yarn-cluster'

I am getting the following error:

diagnostics: User class threw exception: redis.clients.jedis.exceptions.JedisDataException: ERR unknown command 'CLUSTER'

dvirsky commented 8 years ago

The CLUSTER group of commands is available since redis 3.0. I'm guessing you're using redis 2.8?

sunheehnus commented 8 years ago

Hello @ScottWang , could you please give more detail about the stack trace? And please could you use redis-cli to see what info returns? Thanks

ScottWang commented 8 years ago

Stack trace:

Exception in thread "main" redis.clients.jedis.exceptions.JedisDataException: ERR unknown command 'CLUSTER' at redis.clients.jedis.Protocol.processError(Protocol.java:117) at redis.clients.jedis.Protocol.process(Protocol.java:142) at redis.clients.jedis.Protocol.read(Protocol.java:196) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:288) at redis.clients.jedis.Connection.getRawObjectMultiBulkReply(Connection.java:233) at redis.clients.jedis.Connection.getObjectMultiBulkReply(Connection.java:239) at redis.clients.jedis.Jedis.clusterSlots(Jedis.java:3273) at com.redislabs.provider.redis.RedisConfig.getClusterNodes(RedisConfig.scala:216) at com.redislabs.provider.redis.RedisConfig.getNodes(RedisConfig.scala:253) at com.redislabs.provider.redis.RedisConfig.getHosts(RedisConfig.scala:168) at com.redislabs.provider.redis.RedisConfig.(RedisConfig.scala:96) at com.redislabs.provider.redis.RedisContext.toRedisZSET$default$4(redisFunctions.scala:69) at UserVodSelfIndex$$anonfun$main$1$$anonfun$apply$1.apply$mcVID$sp(uservodselfindex.scala:147) at org.apache.spark.mllib.linalg.SparseVector.foreachActive(Vectors.scala:749) at UserVodSelfIndex$$anonfun$main$1.apply(uservodselfindex.scala:145) at UserVodSelfIndex$$anonfun$main$1.apply(uservodselfindex.scala:145) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at UserVodSelfIndex$.main(uservodselfindex.scala:145) at UserVodSelfIndex.main(uservodselfindex.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Info on redisLab version of redis:

Server

redis_version:2.8.21 redis_git_sha1:00000000 redis_git_dirty:0 redis_build_id:0000000000000000000000000000000000000000 redis_mode:standalone os:Linux 3.13.0-36-generic x86_64 arch_bits:64 multiplexing_api:epoll gcc_version:4.8.2 process_id:6 run_id:1a06ec4463f466c99d2db7e47e3b44c6edcc901f tcp_port:19638 uptime_in_seconds:1724973 uptime_in_days:19 hz:10 lru_clock:0

Clients

connected_clients:1 client_longest_output_list:0 client_biggest_input_buf:0 blocked_clients:0

Memory

used_memory:11191640 used_memory_human:10.67M used_memory_rss:11191640 used_memory_peak:12017304 used_memory_peak_human:11.46M used_memory_lua:36864 mem_fragmentation_ratio:1 mem_allocator:jemalloc-3.6.0

Persistence

loading:0 rdb_changes_since_last_save:0 rdb_bgsave_in_progress:0 rdb_last_save_time:1458153785 rdb_last_bgsave_status:ok rdb_last_bgsave_time_sec:0 rdb_current_bgsave_time_sec:-1 aof_enabled:1 aof_rewrite_in_progress:0 aof_rewrite_scheduled:0 aof_last_rewrite_time_sec:-1 aof_current_rewrite_time_sec:-1 aof_last_bgrewrite_status:ok aof_last_write_status:ok aof_current_size:0 aof_base_size:0 aof_pending_rewrite:0 aof_buffer_length:0 aof_rewrite_buffer_length:0 aof_pending_bio_fsync:0 aof_delayed_fsync:0

Stats

total_connections_received:18 total_commands_processed:51 instantaneous_ops_per_sec:0 rejected_connections:0 sync_full:0 sync_partial_ok:0 sync_partial_err:0 expired_keys:0 evicted_keys:0 keyspace_hits:0 keyspace_misses:0 pubsub_channels:0 pubsub_patterns:0 latest_fork_usec:0

Replication

role:master connected_slaves:0 master_repl_offset:0 repl_backlog_active:0 repl_backlog_size:1048576 repl_backlog_first_byte_offset:0 repl_backlog_histlen:0

CPU

used_cpu_sys:0.00 used_cpu_user:0.00 used_cpu_sys_children:0.00 used_cpu_user_children:0.00

Keyspace

dvirsky commented 8 years ago

One hint is that the line numbers don't match those of the actual code. Perhaps the version of the connector is the one before we fixed a similar issue with RLEC?

ScottWang commented 8 years ago

@dvirsky, ok, let me pull the latest and build it again, ty

ScottWang commented 8 years ago

The latest built does work but I am running into another problem. While it is trying to persist the data into redis, it seems, the client is running out of connection pool, is there a way to tune this.

Error: WARN TaskSetManager: Lost task 45.0 in stage 1008.0 (TID 57228, ip-10-196-65-10.us-west-2.compute.internal): redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at redis.clients.util.Pool.getResource(Pool.java:50) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86) at com.redislabs.provider.redis.RedisEndpoint.connect(RedisConfig.scala:73) at com.redislabs.provider.redis.RedisNode.connect(RedisConfig.scala:83) at com.redislabs.provider.redis.RedisConfig.connectionForKey(RedisConfig.scala:137) at com.redislabs.provider.redis.RedisContext$.setZset(redisFunctions.scala:164) at com.redislabs.provider.redis.RedisContext$$anonfun$toRedisZSET$1.apply(redisFunctions.scala:71) at com.redislabs.provider.redis.RedisContext$$anonfun$toRedisZSET$1.apply(redisFunctions.scala:71) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: redis.clients.jedis.exceptions.JedisDataException: ERR max number of clients reached at redis.clients.jedis.Protocol.processError(Protocol.java:117) at redis.clients.jedis.Protocol.process(Protocol.java:142) at redis.clients.jedis.Protocol.read(Protocol.java:196) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:288) at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:187) at redis.clients.jedis.BinaryJedis.auth(BinaryJedis.java:2001) at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:87) at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:819) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:429) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:360) at redis.clients.util.Pool.getResource(Pool.java:48)

dvirsky commented 8 years ago

@sunheehnus the default for the pool size is 5 IIRC. should be set to like 100.

ScottWang commented 8 years ago

I would prefer tunable parameter for this, but in the meantime could you point it out where is the setting so I can rebuild the package to test for my spark job.

sunheehnus commented 8 years ago

Hello @dvirsky and @Scottwang , Yes we should increase the default pool size. But at this time, looks like the issue is not caused by this. It is Caused by: redis.clients.jedis.exceptions.JedisDataException: ERR max number of clients reached. We can reproduce this by:

redis.conf

port 6380
requirepass xxx
maxclients 1

test code

import com.redislabs.provider.redis._
def main(args: Array[String]) {
    val re = RedisEndpoint("localhost" ,6380, "xxx")
    re.connect()
    re.connect()
}

@ScottWang , could you please increase the maxclients for redis-server, or could you please check if there are too many other connected clients used by other programs? Thanks :-)

ScottWang commented 8 years ago

@sunheehnus, it is a redislabs redis instance, I do not think I have access to that parameter. This database is only used by the Spark job, the job has huge data set, right now is running in a 50 nodes cluster. Do you think there were too many connections to the redis server? If that is the case, how could I rectify the situation? The job is running in a cluster mode.

screen shot 2016-04-05 at 8 35 01 pm
dvirsky commented 8 years ago

@ScottWang This is not the pool size actually, because when the pool is exhausted Jedis will just wait, and not throw this exception. I think you should tweak the RLEC connection parameters, you can control this with rladmin. Ping me privately and we'll try to sort it out.

dvirsky commented 8 years ago

@sunheehnus look at the connections chart, 10k seems too much for 50 nodes. looks like a bug in the connector maybe

sunheehnus commented 8 years ago

Yes, I will give a patch tonight to see, IMHO, maybe we should just destroy the conn pool in functions like setKVs, I will do more research tonight :-)

sunheehnus commented 8 years ago

Hello @ScottWang , I have wrote some new changes to fix this in the branch issue24, could you please try it again? Thanks very much :+1:


hints

git fetch origin
git checkout origin/issue24

to apply the new changes.

ScottWang commented 8 years ago

You got it!

ScottWang commented 8 years ago

@sunheehnus and @dvirsky: The branch did not fix the problem, but it did run much longer before it error out. I just sent an email to you both privately with the monitoring graph and errors.

prasankh commented 6 years ago

Hi @ScottWang @dvirsky @sunheehnus

We are specifically looking an update on this thread, as we have run into a similar issue. When we are running spark-submit command with master -local[*] it is working fine. But when we are using spark-submit with master yarn-client we are getting below error. We are using AWS EMR as our yarn cluster. Please update.

18/02/05 11:24:36 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-16-2-31.us-west-2.compute.internal:46808 with 2.8 GB RAM, BlockManagerId(6, ip-10-16-2-31.us-west-2.compute.internal, 46808, None) 18/02/05 11:24:37 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 207, ip-10-16-3-177.us-west-2.compute.internal, executor 4): redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at redis.clients.util.Pool.getResource(Pool.java:53) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226) at com.redislabs.provider.redis.ConnectionPool$.connect(ConnectionPool.scala:33) at com.redislabs.provider.redis.RedisEndpoint.connect(RedisConfig.scala:67) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getKV$1.apply(RedisRDD.scala:43) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getKV$1.apply(RedisRDD.scala:41) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD.getKV(RedisRDD.scala:40) at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused (Connection refused) at redis.clients.jedis.Connection.connect(Connection.java:207) at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:93) at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1767) at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:106) at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:868) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) at redis.clients.util.Pool.getResource(Pool.java:49) ... 35 more Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at redis.clients.jedis.Connection.connect(Connection.java:184) ... 42 more

18/02/05 11:24:37 INFO TaskSetManager: Starting task 0.1 in stage 2.0 (TID 210, ip-10-16-3-177.us-west-2.compute.internal, executor 1, partition 0, RACK_LOCAL, 6694 bytes) 18/02/05 11:24:37 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-10-16-3-177.us-west-2.compute.internal:41325 (size: 4.0 MB, free: 2.8 GB)

shaynativ commented 6 years ago

Hi, The connection error might be a result of the environment setup. Are you sure all the required ports are open in your AWS servers? Can you try to open a Redis client (redis-cli) on your AWS cluster and connect to your Redis server?

prasankh commented 6 years ago

@shaynativ yeah, The ports are open and we have already tested with redis-cli on each of nodes on the same cluster and it got connected without any issue.

gkorland commented 6 years ago

@ScottWang did you manage to find a workaround the issue?