emmalanguage / emma

A quotation-based Scala DSL for scalable data analysis.
http://emma-language.org
Apache License 2.0
63 stars 19 forks source link

Running Emma Jobs on a cluster #60

Closed FelixNeutatz closed 9 years ago

FelixNeutatz commented 9 years ago

Running Emma Jobs on a Flink cluster:

Works fine :)

Example:

mvn clean package -Pflink 

./flink run --classpath /tmp/emma/codegen/ \
-c eu.stratosphere.emma.examples.CommandLineInterface \
/home/felix/emma/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
tc /home/felix/emma/emma/emma-sketchbook/src/test/resources/graphs/triangle-cnt/edges.tsv

Running Emma Jobs on a Spark cluster:

works in general, e.g. for this case:

val algorithm = emma.parallelize {    
    val len = 10
    val inp = fromPath(input)
    val size = DataBag(inp).withFilter(_.length > len).fetch().size
    println("size: " + size)
    size
}

with command:

mvn clean package -Pspark

cd spark-1.4.1
mvn -Dscala-2.11 -DskipTests clean package

./bin/spark-submit \
--verbose \
--class eu.stratosphere.emma.examples.CommandLineInterface \
--master spark://Tardis:7077 \
--conf spark.executor.extraClassPath=/tmp/emma/codegen/ \
--conf spark.driver.extraClassPath=/tmp/emma/codegen/ \
/home/felix/emma/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
bt \
spark-remote spark://Tardis 7077 \
/home/felix/emma/emma/emma-common/src/test/resources/lyrics/Jabberwocky.txt

Doesn't work if you want to use CSVInputFormat. Error: macro implementation not found: materializeCSVConvertors

When I write a simple Spark Job and call the macro from the Jar e.g. in a mapper, this works perfectly fine.

So the next thing I tried was to integrate the Macro code into the Spark DataFlowGenerator. Now the code is executed but it doesn't find the class by which it is typed. E.g. in case of the triangle count it doesn't find the edge class:

java.lang.NoClassDefFoundError: eu/stratosphere/emma/examples/graphs/TriangleCount$Schema$Edge

which is really strange because this is in the jar of the executed program.

I will further investigate ...

aalexandrov commented 9 years ago

The series of errors in Spark suggest that the Emma jar is not included in the toolbox classpath.

You might want to add explicitly the emma-skatchbook classpath to the toolbox classpath as suggested by. Check the recent discussion here, especially the remark by @fschueler.

FelixNeutatz commented 9 years ago

Can you post the link again, it is broken somehow

aalexandrov commented 9 years ago

I mean issue #57.

FelixNeutatz commented 9 years ago

ok, this fixed the original "macro implementation not found: materializeCSVConvertors" error. But the triangle class is still not found:

15/08/17 14:19:41 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-1,5,main]
java.lang.NoClassDefFoundError: eu/stratosphere/emma/examples/graphs/TriangleCount$Schema$Edge
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.getDeclaredMethod(Class.java:2128)
    at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1431)
    at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:72)
    at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:494)
    at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
        ...

and:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror1.jinvokeraw(JavaMirrors.scala:373)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvoke(JavaMirrors.scala:339)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror.apply(JavaMirrors.scala:355)
    at eu.stratosphere.emma.codegen.utils.DataflowCompiler.execute(DataflowCompiler.scala:68)
    at eu.stratosphere.emma.runtime.Spark.executeTempSink(Spark.scala:40)
    at eu.stratosphere.emma.examples.graphs.TriangleCount$$anon$1.runParallel(TriangleCount.scala:28)
    at eu.stratosphere.emma.examples.graphs.TriangleCount$$anon$1.run(TriangleCount.scala:19)
    at eu.stratosphere.emma.examples.graphs.TriangleCount$$anon$1.run(TriangleCount.scala)
    at eu.stratosphere.emma.examples.graphs.TriangleCount.run(TriangleCount.scala:17)
    at eu.stratosphere.emma.examples.CommandLineInterface$$anonfun$main$3.apply(CommandLineInterface.scala:56)
aalexandrov commented 9 years ago

Can you write a small Spark job that has hard-coded logic for conversion of text lines to Edge instances, e.g.

in.fromText().map(line => val values = line.split('\t'); Edge(values(0).toInt, values(1).toInt, values(2).toInt))

and see whether this works?

FelixNeutatz commented 9 years ago
spark.textFile("/home/felix/emma/emma/emma-sketchbook/src/test/resources/graphs/triangle-cnt/edges.tsv")
         .map({
          line =>
            val values = line.split('\t'); Edge(values(0).toLong, values(1).toLong)
    }).collect().foreach(println)

works, when I import eu.stratosphere.emma.examples.graphs.TriangleCount.Schema.Edge -> I recognized that the Classloader tries to load "eu.stratosphere.emma.examples.graphs.TriangleCount$Schema$Edge". Could $ sign be the problem?

aalexandrov commented 9 years ago

Yes, this might be the case.

You can try an Edge class which is contained in a top level compilation unit (i.e. it is in a file on it's own).

FelixNeutatz commented 9 years ago

No that's not the problem, still get the error ...

FelixNeutatz commented 9 years ago

when I try to read a Tuple:

val incoming = read(input, new CSVInputFormat[Tuple1[String]])
val size = incoming.withFilter(_._1.length > len).fetch().size

then tpe.toString of ConvertorsMacros is "(String,)" in this case. This throws the following error

[ERROR] /home/felix/emma/emma2/emma/emma-sketchbook/src/main/scala/eu/stratosphere/emma/examples/graphs/BaseTest.scala:37: exception during macro expansion: 
scala.reflect.macros.ParseException: identifier expected but ')' found.
    at scala.reflect.macros.contexts.Parsers$$anonfun$parse$1.apply(Parsers.scala:18)
    at scala.reflect.macros.contexts.Parsers$$anonfun$parse$1.apply(Parsers.scala:17)
    at scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:91)
    at scala.reflect.macros.contexts.Parsers$class.parse(Parsers.scala:17)
    at scala.reflect.macros.contexts.Context.parse(Context.scala:6)
    at scala.reflect.macros.contexts.Context.parse(Context.scala:6)
    at eu.stratosphere.emma.macros.program.util.ProgramUtils$class.untypecheck(ProgramUtils.scala:23)
    at eu.stratosphere.emma.macros.program.WorkflowMacros$LiftHelper.untypecheck(WorkflowMacros.scala:28)
    at eu.stratosphere.emma.macros.program.controlflow.ControlFlowNormalization$class.normalize(ControlFlowNormalization.scala:19)
    at eu.stratosphere.emma.macros.program.WorkflowMacros$LiftHelper.normalize(WorkflowMacros.scala:28)
    at eu.stratosphere.emma.macros.program.WorkflowMacros$LiftHelper.parallelize(WorkflowMacros.scala:43)
    at eu.stratosphere.emma.macros.program.WorkflowMacros.parallelize(WorkflowMacros.scala:18)
joroKr21 commented 9 years ago

@FelixNeutatz, it seems that showCode(q"${weakTypeOf[Tuple1[String]]}") = "(String,)" which is not a valid type definition, so it's a bug.

FelixNeutatz commented 9 years ago

is it a scala bug or do we have to change something?

aalexandrov commented 9 years ago

I think it's a Scala bug.

FelixNeutatz commented 9 years ago

ok, I try a workaround. But yeah, I found my first Scala bug :)

aalexandrov commented 9 years ago

I wish I could say that this will be the exception...

FelixNeutatz commented 9 years ago

Tuple2[String,String] works, but then I get the following error

java.lang.NoClassDefFoundError: eu/stratosphere/emma/api/CSVConvertors
    at __wrapper$1$6ad675198ec648b0b071862d4e92e2ce.comprehension$macro$133$$anonfun$1.apply(<no source file>:5)
    at __wrapper$1$6ad675198ec648b0b071862d4e92e2ce.comprehension$macro$133$$anonfun$1.apply(<no source file>:5)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:414)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: eu.stratosphere.emma.api.CSVConvertors
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

so something is really off with the classpath :(

FelixNeutatz commented 9 years ago

Hi, I need a bit of advice. I wanted to print the classpath of the classloader in org.apache.spark.executor.Executor and org.apache.spark.storage.MemoryStore. So I added a log instruction and rebuild Spark with "mvn -Dscala-2.11 -DskipTests clean package". But when I run Spark again it seems that it didn't change at all. Do I have to copy some jars somewhere or what is my error here?

fschueler commented 9 years ago

Maybe mvn install instead of package does the trick? Or you have to copy the new packaged version into your maven repo (usually under ~/.m2/repository/).

FelixNeutatz commented 9 years ago

Good idea, I deleted the corresponding .m2 folder. I also tried mvn install. But both isn't working :( really strange ??

FelixNeutatz commented 9 years ago

They actually provide a script to do so: $ dev/change-version-to-2.11.sh $ mvn -Dscala-2.11 -DskipTests clean install

reading documentation solves most of the problems ... (next time...)

aalexandrov commented 9 years ago

(facepalm)

FelixNeutatz commented 9 years ago

so I inserted a statement before the exception in org.apache.spark.storage.MemoryStore:

val buffer = new StringBuffer()
for (url <- ((Thread
     .currentThread()
     .getContextClassLoader()
     .asInstanceOf[java.net.URLClassLoader])).getURLs()) {
        buffer.append(new java.io.File(url.getPath()))
        buffer.append(";")
}
val classpath = buffer.toString() // == /home/felix/emma/spark-1.4.1/work/app-20150827150513-0000/0/./emma-sketchbook-1.0-SNAPSHOT.jar;

while (values.hasNext && keepUnrolling) {
//...

When I open the jar in the corresponding folder, there is also eu/stratosphere/emma/api/CSVConvertors ...

Any ideas what to check next?

aalexandrov commented 9 years ago

@FelixNeutatz If I follow correctly, you are now investigating the classpath based on this trace

java.lang.NoClassDefFoundError: eu/stratosphere/emma/api/CSVConvertors
    at __wrapper$1$6ad675198ec648b0b071862d4e92e2ce.comprehension$macro$133$$anonfun$1.apply(<no source file>:5)
    at __wrapper$1$6ad675198ec648b0b071862d4e92e2ce.comprehension$macro$133$$anonfun$1.apply(<no source file>:5)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:414)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

At the moment you're printing the classpath at the MemoryStore call point. Due to the way the classpath is loaded, however, some of the entries might be missing in the generated code (the top two lines, starting with __wrapper$1$....

The most accurate way to check the classpath is to adapt the code generation quotes in the Spark DataflowGenerator so the classpath is printed at construction time. A straight forward way to do this is to quote the classpath printing code at line 78:

      // assemble dataFlow
      q"""object ${TermName(dataFlowName)} {
        import _root_.org.apache.spark.SparkContext._

        // add classpath printing code here

        ..${closure.UDFs.result().toSeq}
        $runMethod
      }""" ->> compile

You should then see the classpath for the object that actually causes the error. My bet is that it will be different from the one you see in the MemoryStore.

FelixNeutatz commented 9 years ago

the printed classpath is: "/home/felix/emma/emma2/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar;" which also has the missing class ...

but we still get the exception

FelixNeutatz commented 9 years ago

Hi, I tried together with @joroKr21 to debug. But we couldn't find any point where the classpath doesn't have the necessary jar included.

The current exception / error is the following:

java.lang.NoClassDefFoundError: eu/stratosphere/emma/api/CSVConvertors
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at scala.reflect.runtime.ReflectionUtils$.staticSingletonInstance(ReflectionUtils.scala:64)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaModuleMirror.instance(JavaMirrors.scala:525)
    at eu.stratosphere.emma.codegen.utils.DataflowCompiler.execute(DataflowCompiler.scala:85)
    at eu.stratosphere.emma.runtime.Spark.executeTempSink(Spark.scala:40)
    at eu.stratosphere.emma.examples.graphs.BaseTest$$anon$1.runParallel(BaseTest.scala:38)
    at eu.stratosphere.emma.examples.graphs.BaseTest$$anon$1.run(BaseTest.scala:37)
    at eu.stratosphere.emma.examples.graphs.BaseTest$$anon$1.run(BaseTest.scala)
    at eu.stratosphere.emma.examples.graphs.BaseTest.run(BaseTest.scala:35)
    at eu.stratosphere.emma.examples.CommandLineInterface$$anonfun$main$3.apply(CommandLineInterface.scala:56)
    at eu.stratosphere.emma.examples.CommandLineInterface$$anonfun$main$3.apply(CommandLineInterface.scala:55)
    at scala.Option.foreach(Option.scala:257)
    at eu.stratosphere.emma.examples.CommandLineInterface$.main(CommandLineInterface.scala:55)
    at eu.stratosphere.emma.examples.CommandLineInterface.main(CommandLineInterface.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: eu.stratosphere.emma.api.CSVConvertors
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 24 more

It is also interesting that the actual error is "NoClassDefFoundError". Which occurs in this line of the dataflowcompiler:

tb.mirror.reflect(dfMirror.instance)

Another interesting fact is when I write the following:

println("test: " + tb.mirror.classLoader.loadClass("eu.stratosphere.emma.api.CSVConvertors").getCanonicalName)

This doesn't throw any error at all. So our guess is that there is something wrong with dfMirror.

Any ideas?

joroKr21 commented 9 years ago

We should try to use JHades and see if we can get more information.

FelixNeutatz commented 9 years ago

good idea :+1: I will try it :)

FelixNeutatz commented 9 years ago

Yeah, JHades did the trick. With JHades I could figure out that "--jars" doesn't make the jars sufficiently available, like I thought it would be. So the current command I run is:

./bin/spark-submit \
--verbose \
--class eu.stratosphere.emma.examples.CommandLineInterface \
--master spark://Tardis:7077 \
--conf spark.executor.extraClassPath=/tmp/emma/codegen/:/home/felix/emma/emma2/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
--conf spark.driver.extraClassPath=/tmp/emma/codegen/:/home/felix/emma/emma2/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
--conf spark.executor.extraLibraryPath=/home/felix/emma/emma2/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
--conf spark.driver.extraLibraryPath=/home/felix/emma/emma2/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
/home/felix/emma/emma2/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
bt \
spark-remote spark://Tardis 7077 \
/home/felix/emma/emma2/emma/emma-common/src/test/resources/lyrics/Jabberwocky.txt

Now I finally get another exception, which has nothing to do with classpaths. Finally, life is good again :)

FelixNeutatz commented 9 years ago

Yes, that was the issue. So the final command is the following:

./bin/spark-submit \
--verbose \
--class eu.stratosphere.emma.examples.CommandLineInterface \
--master spark://Tardis:7077 \
--conf spark.executor.extraClassPath=/tmp/emma/codegen/:/home/felix/emma/emma2/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
--conf spark.driver.extraClassPath=/tmp/emma/codegen/:/home/felix/emma/emma2/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
/home/felix/emma/emma2/emma/emma-sketchbook/target/emma-sketchbook-1.0-SNAPSHOT.jar \
tc \
spark-remote spark://Tardis 7077 \
/home/felix/emma/emma/emma-sketchbook/src/test/resources/graphs/triangle-cnt/edges.tsv  

To make this work properly, you have to configure the right Spark version in the parent pom file!

I would suggest to copy the user jar file into "/tmp/emma/codegen/". This would also solve the Toolbox issue:

./bin/spark-submit \
--verbose \
--class eu.stratosphere.emma.examples.CommandLineInterface \
--master spark://Tardis:7077 \
--conf spark.executor.extraClassPath=/tmp/emma/codegen/:/tmp/emma/codegen/emma-sketchbook-1.0-SNAPSHOT.jar \
--conf spark.driver.extraClassPath=/tmp/emma/codegen/:/tmp/emma/codegen/emma-sketchbook-1.0-SNAPSHOT.jar \
/tmp/emma/codegen/emma-sketchbook-1.0-SNAPSHOT.jar \
tc \
spark-remote spark://Tardis 7077 \
/home/felix/emma/emma/emma-sketchbook/src/test/resources/graphs/triangle-cnt/edges.tsv
aalexandrov commented 9 years ago

To make this work properly, you have to configure the right Spark version in the parent pom file!

You mean changing to 1.4.1 in the Emma POM?

FelixNeutatz commented 9 years ago

I tried it out another time and it seems the Spark version doesn't matter.

I also found a solution to generalize the ToolBox creation. So we don't need to copy the user jar into to the temp folder:

val cp = getClass.getClassLoader.asInstanceOf[URLClassLoader].getURLs().map(_.getFile()).mkString(System.getProperty("path.separator"))
val tb = mirror.mkToolBox(options = s"-d $codeGenDir -cp $cp")
FelixNeutatz commented 9 years ago

I included all important changes into a PR: https://github.com/FelixNeutatz/emma/commit/21d8d57ae3235557e24d7d1035096063d0bb85e7

I will rebase tomorrow :)

FelixNeutatz commented 9 years ago

I get an error when I run:

mvn clean install -Pspark

Do you get the same error:

Results :

Tests in error: 
  AlternatingLeastSquaresTest>FunSuite.run:1559->FunSuite.org$scalatest$FunSuiteLike$$super$run:1559->FunSuite.runTests:1559->FunSuite.runTest:1559->FunSuite.withFixture:1559->validate:77 » IllegalArgument
  AlternatingLeastSquaresTest>FunSuite.run:1559->FunSuite.org$scalatest$FunSuiteLike$$super$run:1559->FunSuite.runTests:1559->FunSuite.runTest:1559->FunSuite.withFixture:1559->validate:77 » IllegalArgument

Tests run: 16, Failures: 0, Errors: 2, Skipped: 0

But when I delete AlternatingLeastSquaresTest everything works fine.

FelixNeutatz commented 9 years ago

The strange thing is, it works in IntelliJ