azavea / osmesa

OSMesa is an OpenStreetMap processing stack based on GeoTrellis and Apache Spark
Apache License 2.0
80 stars 26 forks source link

Run stats updaters and watch for crashes #77

Closed mojodna closed 5 years ago

mojodna commented 6 years ago

Driven by a) AugmentedDiffSource and b) ChangesetsSource.

Augmented diff-related crashes should be ameliorated by #76. Changeset-related crashes need to be investigated more.

mojodna commented 6 years ago

When attempting to use #80:

18/08/02 16:03:02 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.collection.immutable.Vector$.newBuilder(Vector.scala:22)
    at spray.json.JsonParser.array(JsonParser.scala:100)
    at spray.json.JsonParser.value(JsonParser.scala:61)
    at spray.json.JsonParser.values$1(JsonParser.scala:102)
    at spray.json.JsonParser.array(JsonParser.scala:106)
    at spray.json.JsonParser.value(JsonParser.scala:61)
    at spray.json.JsonParser.values$1(JsonParser.scala:102)
    at spray.json.JsonParser.array(JsonParser.scala:106)
    at spray.json.JsonParser.value(JsonParser.scala:61)
    at spray.json.JsonParser.values$1(JsonParser.scala:102)
    at spray.json.JsonParser.array(JsonParser.scala:106)
    at spray.json.JsonParser.value(JsonParser.scala:61)
    at spray.json.JsonParser.members$1(JsonParser.scala:81)
    at spray.json.JsonParser.object(JsonParser.scala:86)
    at spray.json.JsonParser.value(JsonParser.scala:60)
    at spray.json.JsonParser.members$1(JsonParser.scala:81)
    at spray.json.JsonParser.object(JsonParser.scala:86)
    at spray.json.JsonParser.value(JsonParser.scala:60)
    at spray.json.JsonParser.values$1(JsonParser.scala:102)
    at spray.json.JsonParser.array(JsonParser.scala:106)
    at spray.json.JsonParser.value(JsonParser.scala:61)
    at spray.json.JsonParser.members$1(JsonParser.scala:81)
    at spray.json.JsonParser.object(JsonParser.scala:86)
    at spray.json.JsonParser.value(JsonParser.scala:60)
    at spray.json.JsonParser.parseJsValue(JsonParser.scala:43)
    at spray.json.JsonParser$.apply(JsonParser.scala:28)
    at spray.json.PimpedString.parseJson(package.scala:45)
    at geotrellis.vector.io.json.Implicits$RichString.parseGeoJson(Implicits.scala:66)
    at osmesa.common.ProcessOSM$Countries$2$.all(ProcessOSM.scala:673)
    at osmesa.common.ProcessOSM$CountryLookup$1.<init>(ProcessOSM.scala:695)
    at osmesa.common.ProcessOSM$$anonfun$geocode$1.apply(ProcessOSM.scala:726)
    at osmesa.common.ProcessOSM$$anonfun$geocode$1.apply(ProcessOSM.scala:725)
mojodna commented 6 years ago

I can reproduce locally with

spark-submit \
  --driver-java-options="-Dlog4j.configuration=file:log4j.properties" \
  -v \
  --packages org.apache.hadoop:hadoop-aws:2.7.3,org.postgresql:postgresql:42.2.2 \
  --class osmesa.analytics.oneoffs.AugmentedDiffStreamProcessor src/analytics/target/scala-2.11/osmesa-analytics.jar \
  --augmented-diff-source s3a://████/augmented-diffs/ \
  --database-url postgresql:///osmesa-stats \
  --start-sequence 3034570
java.lang.OutOfMemoryError: Java heap space
    at java.lang.ClassLoader.getClassLoadingLock(ClassLoader.java:462)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:404)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at io.netty.handler.timeout.IdleStateHandler.newIdleStateEvent(IdleStateHandler.java:378)
    at io.netty.handler.timeout.IdleStateHandler$AllIdleTimeoutTask.run(IdleStateHandler.java:566)
    at io.netty.handler.timeout.IdleStateHandler$AbstractIdleTask.run(IdleStateHandler.java:466)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
java.lang.OutOfMemoryError: Java heap space
    at java.math.BigDecimal.<init>(BigDecimal.java:516)
    at java.math.BigDecimal.<init>(BigDecimal.java:383)
    at java.math.BigDecimal.<init>(BigDecimal.java:677)
    at scala.math.BigDecimal$.exact(BigDecimal.scala:130)
    at scala.math.BigDecimal$.apply(BigDecimal.scala:272)
    at spray.json.JsNumber$.apply(JsValue.scala:100)
    at spray.json.JsonParser.number(JsonParser.scala:126)

When running directly (i.e. w/o spark-submit) from within IntelliJ, I experience no problems.

What's different between these execution environments? Max heap size?

mojodna commented 6 years ago

Driver memory is the difference; it's effectively unlimited when running from within IntelliJ; spark-submit defaults to 1G.

3G appears to be the minimum when running in "local" mode (where all tasks/executors will run within the same JVM as the driver) with a batch size of 5 (and 8 executors).

mojodna commented 6 years ago
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: scala.MatchError: null
=== Streaming Query ===
Identifier: update changeset metadata [id = 249696e0-0d55-403b-a96b-b6b9c942640d, runId = cd6878e2-99d6-4292-9b0a-754a102e529f]
Current Committed Offsets: {osmesa.common.streaming.ChangesetMicroBatchReader@10610b45: 107580}
Current Available Offsets: {osmesa.common.streaming.ChangesetMicroBatchReader@10610b45: 107580}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [id#22L, created_at#23, closed_at#24, user#27, uid#28L, tags#34[created_by] AS editor#49, UDF(tags#34) AS hashtags#50]
+- StreamingExecutionRelation osmesa.common.streaming.ChangesetMicroBatchReader@10610b45, [sequence#21, id#22L, created_at#23, closed_at#24, open#25, num_changes#26, user#27, uid#28L, min_lat#29, max_lat#30, min_lon#31, max_lon#32, comments_count#33, tags#34]

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: com.azavea.shaded.demo.google.common.util.concurrent.UncheckedExecutionException: scala.MatchError: null
        at com.azavea.shaded.demo.google.common.cache.LocalCache$Segment.get(LocalCache.java:2218)
        at com.azavea.shaded.demo.google.common.cache.LocalCache.get(LocalCache.java:4147)
        at com.azavea.shaded.demo.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:5053)
        at com.softwaremill.macmemo.MemoCacheBuilder$$anon$1$$anon$2.get(MemoCacheBuilder.scala:63)
        at osmesa.common.streaming.ChangesetSource$.getCurrentSequence(ChangesetSource.scala:65)
        at osmesa.common.streaming.ChangesetMicroBatchReader.getCurrentSequence(ChangesetMicroBatchReader.scala:58)
        at osmesa.common.streaming.ReplicationStreamMicroBatchReader.setOffsetRange(ReplicationStreamMicroBatchReader.scala:62)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$6.apply(MicroBatchExecution.scala:276)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$6.apply(MicroBatchExecution.scala:272)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:272)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:264)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:264)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        ... 1 more
Caused by: scala.MatchError: null
        at io.circe.yaml.parser.package$.io$circe$yaml$parser$package$$yamlToJson(package.scala:79)
        at io.circe.yaml.parser.package$$anonfun$parse$1.apply(package.scala:21)
        at io.circe.yaml.parser.package$$anonfun$parse$1.apply(package.scala:20)
        at cats.syntax.EitherOps$.flatMap$extension(either.scala:148)
        at io.circe.yaml.parser.package$.parse(package.scala:20)
        at io.circe.yaml.parser.package$.parse(package.scala:24)
        at osmesa.common.streaming.ChangesetSource$.osmesa$common$streaming$ChangesetSource$$callRealBody$1(ChangesetSource.scala:71)
        at osmesa.common.streaming.ChangesetSource$$anonfun$getCurrentSequence$1.apply(ChangesetSource.scala:65)
        at osmesa.common.streaming.ChangesetSource$$anonfun$getCurrentSequence$1.apply(ChangesetSource.scala:65)
        at com.softwaremill.macmemo.MemoCacheBuilder$$anon$1$$anon$2$$anon$3.call(MemoCacheBuilder.scala:64)
        at com.azavea.shaded.demo.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:5058)
        at com.azavea.shaded.demo.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3708)
        at com.azavea.shaded.demo.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2416)
        at com.azavea.shaded.demo.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2299)
        at com.azavea.shaded.demo.google.common.cache.LocalCache$Segment.get(LocalCache.java:2212)
        ... 29 more
moradology commented 5 years ago

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
        at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        ... 1 more
Caused by: org.postgresql.util.PSQLException: The connection attempt failed.
        at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:257)
        at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
        at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:195)
        at org.postgresql.Driver.makeConnection(Driver.java:452)
        at org.postgresql.Driver.connect(Driver.java:254)
        at java.sql.DriverManager.getConnection(DriverManager.java:664)
        at java.sql.DriverManager.getConnection(DriverManager.java:270)
        at osmesa.analytics.oneoffs.ChangesetStreamProcessor$$anonfun$$lessinit$greater$1$$anon$1.open(ChangesetStreamProcessor.scala:181)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:50)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.UnknownHostException: █████████████
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at org.postgresql.core.PGStream.<init>(PGStream.java:69)
        at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:156)
        ... 19 more```
mojodna commented 5 years ago

(Known) remaining exceptional conditions:

These can't be recovered from using the current ForeachWriter approach. #93 will address this.