azavea / osmesa

OSMesa is an OpenStreetMap processing stack based on GeoTrellis and Apache Spark
Apache License 2.0
79 stars 26 forks source link

Upgrade to GT3 and VectorPipe 2 #182

Closed CloudNiner closed 2 years ago

CloudNiner commented 4 years ago

Alright. Here it is. This PR upgrades the osmesa.analytics and osmesa.apps packages to GeoTrellis 3.1.0 and VectorPipe 2.1.3. I got as far as both projects compiling. This is probably worth a review now to catch any egregious issues in the changes made.

Notes

Outstanding TODOs:

Other stuff:

jpolchlo commented 4 years ago
  1. We don't care about updating bm. It's being kept for posterity; we don't have an immediate use case for it, but would be a shame to lose it. I could imagine making a branch to save it, then blowing it out of master/develop.

  2. The bench project is (I think) a good idea to have around as a means to get someone up and running quickly when they need to do performance tests. Bringing it up to date is probably a good idea. I think it was commented out during the switch to depending on VP, and could stand some attention, if it's important. One may argue that benchmarking VP is more pressing and important, and we can hash that out below, if you like.

  3. Integration tests are hard. Generating the augmented diff stream, for one example, requires a non-trivial deployment of mojodna/overpass-diff-publisher on a functioning Overpass instance. If we want to come up with a suite of this kind of test, then we're going to have to commit real engineering effort to the task.

  4. Of the listed small things, the primary concern I have is for the AWS API upgrade. This will require a test deployment to our staging environment to confirm, I think.

CloudNiner commented 4 years ago

We don't care about updating bm...would be a shame to lose it

Agreed. It's straightforward to create a branch and link to it in the README. I'll do that as part of this task as a separate PR after we've merged this one.

The bench project...

Super valid comments all around. I'd expect that we'd spend effort on this when we next need to benchmark. If the things we're testing at that point are only vectorpipe things, then moving and updating bm in vectorpipe project makes sense. Either way, this seems like a separate task that we address when we next need to do so.

Integration tests are hard

Oh boy yes. I think for this task, we should meet up separately to hash out a set of tests we want to run on the existing and upgraded system we have in place now as a one off to satisfy ourselves that we haven't broken anything major as part of this upgrade. That can then inform future work for better unit or maybe even integration tests (not likely near term).

Of the listed small things, the primary concern I have is for the AWS API upgrade.

Understood. We can work out a set of tests as part of the plan commented just above if that's a strategy that makes sense.

CloudNiner commented 4 years ago

@jpolchlo As far as other outstanding items on my side go, I just wanted to call out this one from the description above:

There was a conflict in osmesa.apps between the spark sql flatten and vectorpipe UDF flatten. As far as I could tell they both turn Seq[Seq[T]] to Seq[T] so I went with the spark sql one. Presumably its faster.

Any concerns about this and/or was this the correct way to go? IIUC since spark now provides this function, is there any advantage I'm missing with regards to having a similar function in vectorpipe?

jpolchlo commented 4 years ago

The Spark SQL flatten will be more efficient than a UDF-based version of the same. Prefer the former IF it's really doing the same thing (which it seems like it is).

jpolchlo commented 3 years ago

Encountered some substantially deep errors attempting to use this upgraded version. Notably, while attempting to run osmesa.apps.batch.MergeChangesets, I saw the following stack trace:

java.lang.AbstractMethodError: vectorpipe.sources.ChangesetReader.createDataReaderFactories()Ljava/util/List;
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories$lzycompute(DataSourceV2ScanExec.scala:55)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories(DataSourceV2ScanExec.scala:52)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD$lzycompute(DataSourceV2ScanExec.scala:76)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD(DataSourceV2ScanExec.scala:60)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDDs(DataSourceV2ScanExec.scala:79)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
    at org.apache.spark.sql.execution.UnionExec$$anonfun$doExecute$1.apply(basicPhysicalOperators.scala:557)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:557)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at org.apache.spark.sql.DataFrameWriter.orc(DataFrameWriter.scala:572)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:59)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:64)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:66)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:68)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:70)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:72)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:74)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:76)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:78)
    at $line36.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:80)
    at $line36.$read$$iw$$iw$$iw$$iw.<init>(<console>:82)
    at $line36.$read$$iw$$iw$$iw.<init>(<console>:84)
    at $line36.$read$$iw$$iw.<init>(<console>:86)
    at $line36.$read$$iw.<init>(<console>:88)
    at $line36.$read.<init>(<console>:90)
    at $line36.$read$.<init>(<console>:94)
    at $line36.$read$.<clinit>(<console>)
    at $line36.$eval$.$print$lzycompute(<console>:7)
    at $line36.$eval$.$print(<console>:6)
    at $line36.$eval.$print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
    at scala.tools.nsc.interpreter.ILoop.interpretCode$1(ILoop.scala:750)
    at scala.tools.nsc.interpreter.ILoop.pasteCommand(ILoop.scala:764)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$9.apply(ILoop.scala:208)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$9.apply(ILoop.scala:208)
    at scala.tools.nsc.interpreter.LoopCommands$LineCmd.apply(LoopCommands.scala:62)
    at scala.tools.nsc.interpreter.ILoop.colonCommand(ILoop.scala:688)
    at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:679)
    at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
    at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
    at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
    at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
    at org.apache.spark.repl.Main$.doMain(Main.scala:76)
    at org.apache.spark.repl.Main$.main(Main.scala:56)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

This implies that there was some manner of change to the Spark data source V2 infrastructure that needs addressing. The implication being that a new issue should be lodged in VP itself to ensure that we can read changeset ORCs properly, updating our data sources along the way.

Note: the above was generated with some copy pasta of MergeChangesets into a spark-shell instance on an EMR cluster. Local tests were hampered by a lack of access to S3 readers. That can probably be overcome by copying the source changeset ORC file to the local machine.

jpolchlo commented 3 years ago

We can and should update the GT version to 3.5.1. This will probably require a new version of VP to be cut after geotrellis/vectorpipe#145 and geotrellis/vectorpipe#146 are merged.

jpolchlo commented 3 years ago

I was able to successfully bulk ingest into a fresh test DB using this PR. We should deploy the streaming updaters using this version to a staging or scratch environment to compare to a known-good deployment to be certain, but I don't imagine there will be a problem. The bulk ingest was surprisingly fast, and appears to be up to snuff, but we'll know for sure after the streaming updaters are checked. Note that I relied on a version of VP that was published locally during testing that incorporated the changes mentioned in the above comment.