stratosphere / incubator-systemml

Mirror of Apache SystemML (Incubating)
Apache License 2.0
1 stars 4 forks source link

LinRegDS.dml Fails on Cluster #30

Closed carabolic closed 8 years ago

carabolic commented 8 years ago

We tried to run the LinRegDS.dml script on the cluster (cloud-11) with flink_hybrid mode and an 80GB training matrix. But the jobs fails due to

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Job submission to the JobManager timed out.

Full Exception

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.sysml.api.DMLException: org.apache.sysml.runtime.DMLRuntimeException: org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in program block generated from statement block between lines 131 and 136 -- Error evaluating instruction: FLINK°mapmm°_mVar107·MATRIX·DOUBLE°X·MATRIX·DOUBLE°_mVar108·MATRIX·DOUBLE°LEFT°true°SINGLE_BLOCK
        at org.apache.sysml.api.DMLScript.executeScript(DMLScript.java:354)
        at org.apache.sysml.api.DMLScript.main(DMLScript.java:195)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        ... 6 more
Caused by: org.apache.sysml.runtime.DMLRuntimeException: org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in program block generated from statement block between lines 131 and 136 -- Error evaluating instruction: FLINK°mapmm°_mVar107·MATRIX·DOUBLE°X·MATRIX·DOUBLE°_mVar108·MATRIX·DOUBLE°LEFT°true°SINGLE_BLOCK
        at org.apache.sysml.runtime.controlprogram.Program.execute(Program.java:151)
        at org.apache.sysml.api.DMLScript.execute(DMLScript.java:684)
        at org.apache.sysml.api.DMLScript.executeScript(DMLScript.java:340)
        ... 12 more
Caused by: org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in program block generated from statement block between lines 131 and 136 -- Error evaluating instruction: FLINK°mapmm°_mVar107·MATRIX·DOUBLE°X·MATRIX·DOUBLE°_mVar108·MATRIX·DOUBLE°LEFT°true°SINGLE_BLOCK
        at org.apache.sysml.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:333)
        at org.apache.sysml.runtime.controlprogram.ProgramBlock.executeInstructions(ProgramBlock.java:222)
        at org.apache.sysml.runtime.controlprogram.ProgramBlock.execute(ProgramBlock.java:166)
        at org.apache.sysml.runtime.controlprogram.Program.execute(Program.java:144)
        ... 14 more
Caused by: org.apache.sysml.runtime.DMLRuntimeException: Could not collect final block of org.apache.flink.api.java.operators.MapOperator@3b4d50b
        at org.apache.sysml.runtime.instructions.flink.utils.DataSetAggregateUtils.sumStable(DataSetAggregateUtils.java:52)
        at org.apache.sysml.runtime.instructions.flink.MapmmFLInstruction.processInstruction(MapmmFLInstruction.java:148)
        at org.apache.sysml.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:303)
        ... 17 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Job submission to the JobManager timed out.
        at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
        at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
        at org.apache.sysml.runtime.instructions.flink.utils.DataSetAggregateUtils.sumStable(DataSetAggregateUtils.java:50)
        ... 19 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Job submission to the JobManager timed out.
        at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:140)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:379)
        ... 25 more
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out.
        at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:256)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reproduce the Bug

The test data is already in HDFS at /user/cbruecke/systemml/linRegData_80G.train.bin and /user/cbruecke/systemml/linRegData_80G.labels.bin respectively.

It is necessary that at all times Yarn and HDFS is running.

Configuration

Mainly copied from the SystemML guide.

in mapred-site.xml

<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
  <property>
    <name>mapreduce.map.java.opts</name>
    <value>-Xmx2g -Xms2g -Xmn200m</value>
  </property>
  <property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-Xmx2g -Xms2g -Xmn200m</value>
  </property>
  <property>
    <name>mapreduce.map.memory.mb</name>
    <value>3072</value>
  </property>
  <property>
    <name>mapreduce.reduce.memory.mb</name>
    <value>3072</value>
  </property>
</configuration>

in yarn-site.xml

<configuration>
  <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>cloud-11.dima.tu-berlin.de</value>
    <description></description>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
</configuration>

Data Generation

hdfs@cloud-11$ bin/yarn jar /share/hadoop/cbruecke/incubator-systemml/target/SystemML.jar -f /share/hadoop/cbruecke/incubator-systemml/scripts/datagen/genLinearRegressionData.dml -nvargs numSamples=10000000 numFeatures=1000 maxFeatureValue=5 maxWeight=5 addNoise=FALSE b=0 sparsity=1.0 output=/user/cbruecke/systemml/linRegData_80G.bin format=binary perc=0.5

Data Preparation

hdfs@cloud-11$ bin/yarn jar /share/hadoop/cbruecke/incubator-systemml/target/SystemML.jar -f /share/hadoop/cbruecke/incubator-systemml/scripts/utils/splitXY.dml -nvargs X=/user/cbruecke/systemml/linRegData_80G.bin y=1001 OX=/user/cbruecke/systemml/linRegData_80G.train.bin OY=/user/cbruecke/systemml/linRegData_80G.labels.bin ofmt=binary

Running in FLINK_HYBRID Mode

This is the phase that is actually failing. But the first phase which includes TSMM succeeded. Only the subsequent Flink job including MAPMM cannot be submitted due to the akka timeout.

hadoop@cloud-11$ bin/flink run -C file:///share/hadoop/cbruecke/incubator-systemml/target/lib/hadoop-mapreduce-client-jobclient-2.7.1.jar  /share/hadoop/cbruecke/incubator-systemml/target/SystemML.jar -f /share/hadoop/cbruecke/incubator-systemml/scripts/algorithms/LinearRegDS.dml -exec hybrid_flink -explain -nvargs X=/user/cbruecke/systemml/linRegData_80G.train.bin Y=/user/cbruecke/systemml/linRegData_80G.labels.bin B=/user/cbruecke/sysstemml/betas.flink.csv fmt=csv

Notes

mvn -DskipTest -Dhadoop.version=2.7.1 clean package
export HADOOP_HOME=/share/hadoop/stable/hadoop-2.7.1
start-yarn.sh
flink run -C file:///share/hadoop/cbruecke/incubator-systemml/target/lib/hadoop-mapreduce-client-jobclient-2.7.1.jar [...] -p [DOP]
akunft commented 8 years ago

@FelixNeutatz Is this solved now, with the adjusted akka.framesize?

carabolic commented 8 years ago

Yes couple there were a couple of things that needed to be tweaked in order to get the job running on cluster. I've added a section Notes to the issue description.