Closed codlife closed 7 years ago
Hi @codlife
We use java multi-threading in a spark task. See: https://github.com/intel-analytics/BigDL/blob/master/dl/src/main/scala/com/intel/analytics/bigdl/optim/DistriOptimizer.scala#L157
@yiheng Hi, I run multithread in one spark task, it works in spark 1.6, while it failed in spark 2.1.
I run multithread task with ThreadPoolExecutor.
val threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
any ideas about this problems?
17/04/26 19:52:48 DEBUG BlockManager: Getting local block rdd_17_1 17/04/26 19:52:48 TRACE BlockInfoManager: Task 20 trying to acquire read lock for rdd_17_1 17/04/26 19:52:48 TRACE BlockInfoManager: Task 20 acquired read lock for rdd_17_1 17/04/26 19:52:48 DEBUG BlockManager: Level for block rdd_17_1 is StorageLevel(memory, deserialized, 1 replicas) 17/04/26 19:52:48 INFO BlockManager: Found block rdd_17_1 locally 17/04/26 19:52:48 DEBUG BlockManager: Getting local block rdd_21_0 17/04/26 19:52:48 TRACE BlockInfoManager: Task 20 trying to acquire read lock for rdd_21_0 17/04/26 19:52:48 TRACE BlockInfoManager: Task 20 acquired read lock for rdd_21_0 17/04/26 19:52:48 DEBUG BlockManager: Level for block rdd_21_0 is StorageLevel(memory, deserialized, 1 replicas) 17/04/26 19:52:48 INFO BlockManager: Found block rdd_21_0 locally 17/04/26 19:52:48 TRACE BlockInfoManager: Task 20 releasing lock for rdd_21_0 17/04/26 19:52:48 TRACE BlockInfoManager: Task -1024 releasing lock for rdd_17_1 Exception in thread "pool-16-thread-2" java.lang.AssertionError: assertion failed: Task -1024 release lock on block rdd_17_1 more times than it acquired it at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:298) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:654) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:458) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$Partner$1.hasNext(Iterator.scala:1209) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336) at com.tencent.angel.spark.ml.ADMM$CostFun$CalThread.run(ADMM.scala:523) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
BlockManager introduces some locking mechanism in Spark 2. If you work with BlockManager directly, you should handle it.
Hi devs: In my option, spark will use just one thread to run per task, so how you use multi-thread per spark task? Thank you very much!