twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

Execution can hit ConcurrentModificationException #1324

Open johnynek opened 9 years ago

johnynek commented 9 years ago
Exception in thread "main" java.util.ConcurrentModificationException
        at java.util.Hashtable$Enumerator.next(Hashtable.java:1167)
        at org.apache.hadoop.conf.Configuration.iterator(Configuration.java:2270)
        at org.apache.hadoop.fs.viewfs.InodeTree.<init>(InodeTree.java:378)
        at org.apache.hadoop.fs.viewfs.ViewFileSystem$1.<init>(ViewFileSystem.java:162)
        at org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:162)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2438)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2472)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2454)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:384)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:178)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at com.twitter.scalding.FileSource$.glob(FileSource.scala:117)
        at com.twitter.scalding.FileSource$.globHasNonHiddenPaths(FileSource.scala:128)
        at com.twitter.scalding.FileSource.pathIsGood(FileSource.scala:158)
        at com.twitter.scalding.TimeSeqPathedSource$$anonfun$getPathStatuses$1.apply(TimePathedSource.scala:95)
        at com.twitter.scalding.TimeSeqPathedSource$$anonfun$getPathStatuses$1.apply(TimePathedSource.scala:95)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at com.twitter.scalding.TimeSeqPathedSource.getPathStatuses(TimePathedSource.scala:95)
        at com.twitter.scalding.TimeSeqPathedSource.hdfsReadPathsAreGood(TimePathedSource.scala:99)
        at com.twitter.scalding.FileSource.validateTaps(FileSource.scala:211)
        at com.twitter.scalding.FlowState$$anonfun$validateSources$1.apply(FlowState.scala:36)
        at com.twitter.scalding.FlowState$$anonfun$validateSources$1.apply(FlowState.scala:36)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
        at com.twitter.scalding.FlowState.validateSources(FlowState.scala:36)
        at com.twitter.scalding.FlowStateMap$.validateSources(FlowState.scala:76)
        at com.twitter.scalding.Execution$WriteExecution$$anonfun$runStats$11$$anonfun$apply$29.apply(Execution.scala:411)
        at com.twitter.scalding.Execution$WriteExecution$$anonfun$runStats$11$$anonfun$apply$29.apply(Execution.scala:410)

This happens by using the Configuration in multiple threads, I guess,. We can possibly fix it by using the same thread that submits work to cascading to also call the validate sources, so there is only one thread in play.

ianoc commented 9 years ago

I think in this case rather than getting the existing file system we could open a new one and then close it. In this case the user was aiming to call the validate taps in user land I believe

On Monday, June 15, 2015, P. Oscar Boykin notifications@github.com wrote:

Exception in thread "main" java.util.ConcurrentModificationException at java.util.Hashtable$Enumerator.next(Hashtable.java:1167) at org.apache.hadoop.conf.Configuration.iterator(Configuration.java:2270) at org.apache.hadoop.fs.viewfs.InodeTree.(InodeTree.java:378) at org.apache.hadoop.fs.viewfs.ViewFileSystem$1.(ViewFileSystem.java:162) at org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:162) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2438) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2472) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2454) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:384) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:178) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at com.twitter.scalding.FileSource$.glob(FileSource.scala:117) at com.twitter.scalding.FileSource$.globHasNonHiddenPaths(FileSource.scala:128) at com.twitter.scalding.FileSource.pathIsGood(FileSource.scala:158) at com.twitter.scalding.TimeSeqPathedSource$$anonfun$getPathStatuses$1.apply(TimePathedSource.scala:95) at com.twitter.scalding.TimeSeqPathedSource$$anonfun$getPathStatuses$1.apply(TimePathedSource.scala:95) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at com.twitter.scalding.TimeSeqPathedSource.getPathStatuses(TimePathedSource.scala:95) at com.twitter.scalding.TimeSeqPathedSource.hdfsReadPathsAreGood(TimePathedSource.scala:99) at com.twitter.scalding.FileSource.validateTaps(FileSource.scala:211) at com.twitter.scalding.FlowState$$anonfun$validateSources$1.apply(FlowState.scala:36) at com.twitter.scalding.FlowState$$anonfun$validateSources$1.apply(FlowState.scala:36) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at com.twitter.scalding.FlowState.validateSources(FlowState.scala:36) at com.twitter.scalding.FlowStateMap$.validateSources(FlowState.scala:76) at com.twitter.scalding.Execution$WriteExecution$$anonfun$runStats$11$$anonfun$apply$29.apply(Execution.scala:411) at com.twitter.scalding.Execution$WriteExecution$$anonfun$runStats$11$$anonfun$apply$29.apply(Execution.scala:410)

This happens by using the Configuration in multiple threads, I guess,. We can possibly fix it by using the same thread that submits work to cascading to also call the validate sources, so there is only one thread in play.

— Reply to this email directly or view it on GitHub https://github.com/twitter/scalding/issues/1324.

johnynek commented 9 years ago

The user was (shockingly) me. I was not calling validateTaps in user land. This was the call here:

https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala#L411

Perhaps moving that to https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala#L230 would do it.

ianoc commented 9 years ago

Oh, that got reported by a user @ twitter as an issue yesterday too. I've seen it before but never reproducible. In their case it was in user land, in yours it seems moving it to the single threaded portion would solve it. They would possibly still run into though.

johnynek commented 9 years ago

The Hadoop Configuration is a candidate for worst code I've even seen. I wish we could get immutable views of it, and I wish it were as fast as a HashMap.