USCDataScience / sparkler

Spark-Crawler: Apache Nutch-like crawler that runs on Apache Spark.
http://irds.usc.edu/sparkler/
Apache License 2.0
411 stars 143 forks source link

Sparkler/PF4J plugins on a Spark cluster #156

Closed baddlan closed 6 years ago

baddlan commented 6 years ago

@buggtb Any ideas on how to deploy the Sparkler/pf4j plugins on a Spark cluster?

I managed to get the Sparkler app jar deployed and loaded by master and worker containers in the cluster but the executors fail to load the plugins (see error below). I have already tried copying the plugins jars and directory under SPARK_HOME and SPARK_HOME/jars with no luck.

WARN  TaskSetManager:66 [task-result-getter-1] - Lost task 0.0 in stage 1.0 (TID 1, 192.168.57.8, executor 0): java.lang.Exception: Failed to load extension urlfilter-regex
    at edu.usc.irds.sparkler.service.PluginService$$anonfun$load$1.apply(PluginService.scala:70)
    at edu.usc.irds.sparkler.service.PluginService$$anonfun$load$1.apply(PluginService.scala:67)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at edu.usc.irds.sparkler.service.PluginService.load(PluginService.scala:67)
    at edu.usc.irds.sparkler.service.PluginService$.getExtension(PluginService.scala:147)
    at edu.usc.irds.sparkler.pipeline.FetchFunction$.apply(FetchFunction.scala:44)
    at edu.usc.irds.sparkler.pipeline.FetchFunction$.apply(FetchFunction.scala:32)
    at edu.usc.irds.sparkler.pipeline.FairFetcher.<init>(FairFetcher.scala:38)
    at edu.usc.irds.sparkler.pipeline.Crawler$$anonfun$run$1$$anonfun$2.apply(Crawler.scala:171)
    at edu.usc.irds.sparkler.pipeline.Crawler$$anonfun$run$1$$anonfun$2.apply(Crawler.scala:170)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

ERROR TaskSetManager:70 [task-result-getter-0] - Task 0 in stage 1.0 failed 4 times; aborting job
Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at edu.usc.irds.sparkler.Main$.main(Main.scala:47)
    at edu.usc.irds.sparkler.Main.main(Main.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 192.168.57.8, executor 0): java.lang.Exception: Failed to load extension urlfilter-regex
    at edu.usc.irds.sparkler.service.PluginService$$anonfun$load$1.apply(PluginService.scala:70)
    at edu.usc.irds.sparkler.service.PluginService$$anonfun$load$1.apply(PluginService.scala:67)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at edu.usc.irds.sparkler.service.PluginService.load(PluginService.scala:67)
    at edu.usc.irds.sparkler.service.PluginService$.getExtension(PluginService.scala:147)
    at edu.usc.irds.sparkler.pipeline.FetchFunction$.apply(FetchFunction.scala:44)
    at edu.usc.irds.sparkler.pipeline.FetchFunction$.apply(FetchFunction.scala:32)
    at edu.usc.irds.sparkler.pipeline.FairFetcher.<init>(FairFetcher.scala:38)
    at edu.usc.irds.sparkler.pipeline.Crawler$$anonfun$run$1$$anonfun$2.apply(Crawler.scala:171)
    at edu.usc.irds.sparkler.pipeline.Crawler$$anonfun$run$1$$anonfun$2.apply(Crawler.scala:170)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1961)
    at edu.usc.irds.sparkler.pipeline.Crawler.processFetched(Crawler.scala:194)
    at edu.usc.irds.sparkler.pipeline.Crawler$$anonfun$run$1.apply$mcVI$sp(Crawler.scala:178)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at edu.usc.irds.sparkler.pipeline.Crawler.run(Crawler.scala:161)
    at edu.usc.irds.sparkler.base.CliTool$class.run(CliTool.scala:34)
    at edu.usc.irds.sparkler.pipeline.Crawler.run(Crawler.scala:43)
    at edu.usc.irds.sparkler.pipeline.Crawler$.main(Crawler.scala:264)
    at edu.usc.irds.sparkler.pipeline.Crawler.main(Crawler.scala)
    ... 6 more
Caused by: java.lang.Exception: Failed to load extension urlfilter-regex
    at edu.usc.irds.sparkler.service.PluginService$$anonfun$load$1.apply(PluginService.scala:70)
    at edu.usc.irds.sparkler.service.PluginService$$anonfun$load$1.apply(PluginService.scala:67)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at edu.usc.irds.sparkler.service.PluginService.load(PluginService.scala:67)
    at edu.usc.irds.sparkler.service.PluginService$.getExtension(PluginService.scala:147)
    at edu.usc.irds.sparkler.pipeline.FetchFunction$.apply(FetchFunction.scala:44)
    at edu.usc.irds.sparkler.pipeline.FetchFunction$.apply(FetchFunction.scala:32)
    at edu.usc.irds.sparkler.pipeline.FairFetcher.<init>(FairFetcher.scala:38)
    at edu.usc.irds.sparkler.pipeline.Crawler$$anonfun$run$1$$anonfun$2.apply(Crawler.scala:171)
    at edu.usc.irds.sparkler.pipeline.Crawler$$anonfun$run$1$$anonfun$2.apply(Crawler.scala:170)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

Environment and Version Information

-- Thanks

thammegowda commented 6 years ago

Sparkler uses JVM system property pf4j.pluginsDir to access plugins directory path. As long as plugins directory is correctly set with that system property, it should work. See for example, sparkler.sh for how it is set It simply adds-Dpf4j.pluginsDir=/Full/Path/to/dir to command line

Note: since you are running it in a distributed fashion, pf4j.pluginsDir should be set on all the workers. I know the documentation is lacking for this, and there are several ways to do it. Stackoverflow or Apache wiki should help

baddlan commented 6 years ago

Thanks @thammegowda! Explicitly setting the pluginsDir property did the trick. I was under the impression that Sparkler looks for plugins by default in $CWD\plugins.