thunder-project / thunder

scalable analysis of images and time series
http://thunder-project.org
Apache License 2.0
814 stars 184 forks source link

Missing Thunder imports on EC2 slaves #63

Closed broxtronix closed 9 years ago

broxtronix commented 9 years ago

I am running Thunder (master branch, updated today) on EC2, and I have been encountering a problem wherin my ec2 slave instances are having trouble importing and running Thunder code. I've included one simple example that triggers this behavior below.

Poking around a bit in the thunder/python/thunder/utils/ec2.py script, it looks like Thunder is installed on the master node, but not on the slaves nodes. Only '/root/thunder/python/thunder/utils/data/' gets mirrored over to the slave nodes.

I'm guessing that this is because Spark will often pickle and send over any Thunder code on the master python process to the slave processes, but this does not seem to be working correctly here. I had been under the impression that this pickling capability only worked for symbols that had been imported into the master node's python process, but not if the code called by one of those symbols then called an import command of its own. That is, if 'import ' is called from within a pickled function on the slave nodes, it will attempt to find that library on the slave node's local python installation, which would explain why it is failing here since thunder is not installed locally on the slaves!

As a workaround for now, I have installed Thunder on the slave nodes using the following commands:

~/spark-ec2/copy-dir /root/thunder pssh -h /root/spark-ec2/slaves echo /root/thunder/python >> /usr/lib/python2.6/site-packages/paths.pth

(The second command ensures that thunder is in the PYTHONPATH for the slaves)

Let me know if I'm missing anything, or if there is any trick to getting the pickling / import to work with the code residing solely on the master node. Thanks!!

Test case

imagepath = 's3n://path/to/some/s3/data'
rdd_vols = tsc.loadImages(imagepath, inputformat='tif-stack').cache()
num_vols = rdd_vols.count() # Force data to be cached

rdd_series = rdd_vols.toSeries().cache()
num_series = rdd_series.count()

This produces the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-16-509ed1013e0d> in <module>()
----> 1 rdd_series = rdd_vols.toSeries().cache()
      2 num_series = rdd_series.count()
      3 print num_series

/root/thunder/python/thunder/rdds/images.pyc in toSeries(self, blockSize, splitsPerDim, groupingDim)
    280         blocksdata = self._scatterToBlocks(blockSize=blockSize, blocksPerDim=splitsPerDim, groupingDim=groupingDim)
    281 
--> 282         return blocksdata.toSeries(seriesDim=0)
    283 
    284     def saveAsBinarySeries(self, outputdirname, blockSize="150M", splitsPerDim=None, groupingDim=None,

/root/thunder/python/thunder/rdds/imageblocks.py in toSeries(self, seriesDim, newdtype, casting)
     24             return ImageBlocks._blockToSeries(blockVal, seriesDim)
     25 
---> 26         blockedrdd = self._groupIntoSeriesBlocks()
     27 
     28         # returns generator of (z, y, x) array data for all z, y, x

/root/thunder/python/thunder/rdds/imageblocks.py in _groupIntoSeriesBlocks(self)
     84         # sort must come after group, b/c group will mess with ordering.
     85         # return self.rdd.groupByKey().sortByKey(ascending=True).mapValues(lambda v: ImageBlockValue.fromPlanarBlocks(v, 0))
---> 86         sortedRdd = self.rdd.groupByKey().sortBy(lambda (k, _): k[::-1])
     87         return sortedRdd.mapValues(lambda v: ImageBlockValue.fromPlanarBlocks(v, 0))
     88 

/root/spark/python/pyspark/rdd.pyc in sortBy(self, keyfunc, ascending, numPartitions)
    631         [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    632         """
--> 633         return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
    634 
    635     def glom(self):

/root/spark/python/pyspark/rdd.pyc in sortByKey(self, ascending, numPartitions, keyfunc)
    601         # the key-space into bins such that the bins have roughly the same
    602         # number of (key, value) pairs falling into them
--> 603         rddSize = self.count()
    604         maxSampleSize = numPartitions * 20.0  # constant from Spark's RangePartitioner
    605         fraction = min(maxSampleSize / max(rddSize, 1), 1.0)

/root/spark/python/pyspark/rdd.pyc in count(self)
    845         3
    846         """
--> 847         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
    848 
    849     def stats(self):

/root/spark/python/pyspark/rdd.pyc in sum(self)
    836         6.0
    837         """
--> 838         return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
    839 
    840     def count(self):

/root/spark/python/pyspark/rdd.pyc in reduce(self, f)
    757             if acc is not None:
    758                 yield acc
--> 759         vals = self.mapPartitions(func).collect()
    760         return reduce(f, vals)
    761 

/root/spark/python/pyspark/rdd.pyc in collect(self)
    721         """
    722         with _JavaStackTrace(self.context) as st:
--> 723             bytesInJava = self._jrdd.collect().iterator()
    724         return list(self._collect_iterator_through_file(bytesInJava))
    725 

/usr/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o197.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 400 in stage 9.0 failed 4 times, most recent failure: Lost task 400.3 in stage 9.0 (TID 1743, ip-10-65-128-31.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/root/spark/python/pyspark/worker.py", line 75, in main
    command = pickleSer._read_with_length(infile)
  File "/root/spark/python/pyspark/serializers.py", line 150, in _read_with_length
    return self.loads(obj)
ImportError: No module named thunder.rdds.imageblocks

        org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
        org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
freeman-lab commented 9 years ago

@broxtronix Thanks for the detailed report! First, a question: to get here, how did you launch Thunder? Did you use the thunder executable (inside thunder/python/bin/), or did you launch pyspark and then import the relevant Thunder code?

It's true that PySpark requires independent imports to be available on the workers, but rather than install code on the workers, this can be handled within PySpark itself by shipping egg files (and this seems like the preferred strategy among developers). Currently, the executable does this for you automatically: it builds an egg file and ships it across the cluster via the ADD_FILE environmental variable (setting this is equivalent to passing the egg to sc.addPyFile).

I just confirmed that if I launched Thunder via the executable, the following runs fine (identical to yours, except for the location of the data):

rdd_vols = tsc.loadExample('fish-images').cache()
num_vols = rdd_vols.count()
rdd_series = rdd_vols.toSeries().cache()
num_series = rdd_series.count()

If I launch via pyspark the above gives an error like yours, but it is fixed by first adding the egg using sc.addPyFile. In other words, If I run this as soon as the shell starts,

sc.addPyFile('/root/thunder/python/dist/thunder_python-0.5.0_dev-py2.6.egg')
from thunder import ThunderContext
tsc = ThunderContext(sc)

Then the above works fine as well.

Does this help explain what you were seeing? If so, we can definitely improve the documentation to clarify some of this.

broxtronix commented 9 years ago

Aha! Your explanation is right on the money. I had been running ipython notebook and then importing spark and thunder, and that explains why the egg files weren't automatically shipped to the slave nodes. I can confirm that this all works fine for me now if I run thunder instead of ipython notebook, or if I use

sc.addPyFile('/root/thunder/python/dist/thunder_python-0.5.0_dev-py2.6.egg')
from thunder import ThunderContext
tsc = ThunderContext(sc)

Looking at the documentation, I think it's actually pretty clear already that people should be running thunder on the command line in order to set up their environment correctly. Perhaps the one tip we could squeeze in there is to mention that you can still run thunder from pyspark or vanilla ipython notebook as long as you run the addPyFile() method first!

Thanks for the quick response on this. Cleared up my issue right away! :)