cloudml / zen

Zen aims to provide the largest scale and the most efficient machine learning platform on top of Spark, including but not limited to logistic regression, latent dirichilet allocation, factorization machines and DNN.
Apache License 2.0
170 stars 75 forks source link

[FM] The training process of FM algorithm is so slow #64

Open yxzf opened 7 years ago

yxzf commented 7 years ago

I use the fm algorithm in zen to train a dataset, which has 15 partition and about 600MB. The training is so slow, about 1h30m. The xgboost and lr algorithm only cost 30min.

witgo commented 7 years ago

Yes, I have noticed this, I am currently working on implementation based on mini-batch and MCMC.

yxzf commented 7 years ago

@witgo And when the numIterations larger than 300, the spark job always failed. Even in small dataset.

witgo commented 7 years ago

@yxzf , Can you provide us with more information? eg: commands arguments and spark log.

yxzf commented 7 years ago

@witgo The dataset is agaricus: https://github.com/dmlc/xgboost/blob/master/demo/data/agaricus.txt.train The parameters are:

FM_DATA=/user/***/agaricus.txt.train
FM_MODEL=/user/***/fm_test_agaricus_model
${SPARK_SUBMIT} \
  --class ${CLASS} \
  --jars ${FM_JAR},${FM_EXAMPLE_JAR},${LIBFM_JAR} \
  --num-executors 40 \
  --executor-cores 4 \
  --executor-memory 4g \
  --queue ${QUEUE} \
  --master ${MASTER} \
  --driver-memory 8g \
  --conf spark.shuffle.manager=SORT \
${JAR} --task all --learner c --rank 10 --numIterations 300 --regular 0.01,0.01,0.01 --kryo ${FM_DATA} ${FM_MODEL}

The spark log about error is:

16/10/09 11:22:50 INFO ContainerManagementProtocolProxy: Opening proxy : rz-data-hdp-dn2865.rz.sankuai.com:8043 16/10/09 11:22:53 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveExecutor(38,Yarn deallocated the executor 38 (container container_e06_1474974038482_2525421_02_000126))] in 3 attempts org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/CoarseGrainedScheduler#325208432]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280) at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270) at scala.concurrent.Future$class.recover(Future.scala:324) at scala.concurrent.impl.Promise$DefaultPromise.recover(Promise.scala:153) at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:319) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:100) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:309) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:113) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/CoarseGrainedScheduler#325208432]] had already been terminated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132) at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:307) ... 26 more 16/10/09 11:22:53 ERROR AkkaRpcEnv: Ignore error: Error notifying standalone scheduler's driver endpoint org.apache.spark.SparkException: Error notifying standalone scheduler's driver endpoint at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:312) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:113) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.spark.SparkException: Error sending message [message = RemoveExecutor(38,Yarn deallocated the executor 38 (container container_e06_1474974038482_2525421_02_000126))] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:118) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:309) ... 23 more Caused by: org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/CoarseGrainedScheduler#325208432]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280) at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270) at scala.concurrent.Future$class.recover(Future.scala:324) at scala.concurrent.impl.Promise$DefaultPromise.recover(Promise.scala:153) at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:319) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:100) ... 25 more Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/CoarseGrainedScheduler#325208432]] had already been terminated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132) at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:307) ... 26 more

SatishReddy006 commented 7 years ago

@yxzf Can you provide information about how do you resolved this issue

witgo commented 7 years ago

@yxzf Does your code call sparkContext.setCheckpointDir ? In my tests when the numIterations larger than 400, it also works.

SatishReddy006 commented 7 years ago

@yxzf not calling sc.checkpointdir. But while running model getting same exception as above

witgo commented 7 years ago

@Spinnap You can try it, similar to the following code

    val sc = new SparkContext(conf)
    val checkpointDir = s"$out/checkpoint"
    sc.setCheckpointDir(checkpointDir)
    val lfm = new FMClassification(trainSet, stepSize, l2, rank, useAdaGrad, 1.0, storageLevel)
    lfm.run(numIterations)