databrickslabs / mosaic

An extension to the Apache Spark framework that allows easy and fast processing of very large geospatial datasets.
https://databrickslabs.github.io/mosaic/
Other
269 stars 66 forks source link

grid_tessellateexplode() creates jts.geom.TopologyException in instances where input geometry is invalid #545

Closed FeralCatColonist closed 4 months ago

FeralCatColonist commented 5 months ago

Describe the bug In version 0.4.1; utilizing the function grid_tessellateexplode() may inadvertently create topological exceptions which prevent common functions like display(), count(), saveAsTable() from operating and produce an error. This same workflow did not produce any errors in version 0.3.11.

To Reproduce Steps to reproduce the behavior:

df = (
    df
    .select(mosaic.grid_tessellateexplode(f.col("geom"), f.lit(6), True).alias("h3_index"), "*")
    .select(f.col("h3_index.*"), "*")
)
df.count()
df.display()

ZIP Codes - MultiPolygon - Invalid Geometry.csv

Expected behavior Because there is not an st_makevalid() within the Mosaic library, my assumption is one of the ways to work around invalid geometries is to utilize grid_tessellateexplode() to break these apart into manageable H3 tiles. This has been my workflow of choice within the 0.3.x release and was built into our data pipelines.

We regularly work with geometries that are irregular due to their location in coastal areas and/or upstream issues with their creation by other entities.

Screenshots image Produces errors like the following:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 13.0 failed 4 times, most recent failure: Lost task 3.3 in stage 13.0 (TID 80) (10.26.175.165 executor 0): org.locationtech.jts.geom.TopologyException: found non-noded intersection between LINESTRING ( -97.14452 31.11474, -97.14424 31.11453 ) and LINESTRING ( -97.14285 31.11526, -97.14424 31.11453 ) [ (-97.14424, 31.11453, NaN) ]

Additionally, dropping geometry columns does not resolve the issue above. The dataframe object is "sticky" and retains its Mosaic identity and the topological baggage.

Additional context Using Databricks 13.3 LTS with Mosaic 0.4.1

sllynn commented 5 months ago

Could you try zero-buffering the input geometry, e.g. mosaic.st_buffer(f.col("geom"), 0) in order to force it to be made valid?

FeralCatColonist commented 5 months ago

Although not a named topology error, I'm getting the following error for an out of bounds array which I am assuming is an unmatched pair of vertices deriving from an invalid geometry.

I've implemented a 0-point buffer like this, since we are restricted to a WKT column:

our_df= (
    spark.read.option("path", f"{path_geospatial_db}/{delta_tbl}")
    .table(f"{schema_common_geometry}.{delta_tbl}")
    .withColumn(column_geometry,
                mosaic.st_buffer(
                    mosaic.st_setsrid(
                        mosaic.st_geomfromwkt(egdb_geometry)
                        , f.lit(4326))
                    , f.lit(0))
))

This is happening: image

Py4JJavaError: An error occurred while calling o1516.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 67.0 failed 4 times, most recent failure: Lost task 3.3 in stage 67.0 (TID 15931) (10.26.69.103 executor 5): java.lang.ArrayIndexOutOfBoundsException: 0
    at org.locationtech.jts.geom.GeometryCollection.getGeometryN(GeometryCollection.java:116)
    at com.databricks.labs.mosaic.core.geometry.polygon.MosaicPolygonJTS.toInternal(MosaicPolygonJTS.scala:15)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:224)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:224)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:88)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:88)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:899)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:902)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:797)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3645)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3567)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3554)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3554)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1521)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1521)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1521)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3890)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3790)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1245)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1233)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2959)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:286)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:282)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:366)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:363)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:117)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:124)
    at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:126)
    at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:114)
    at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94)
    at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:557)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:545)
    at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:565)
    at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:426)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:419)
    at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:313)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:519)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:516)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:492)
    at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3733)
    at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3732)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4546)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:959)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4544)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:298)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:528)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:225)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:154)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:477)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4544)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:3732)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at org.locationtech.jts.geom.GeometryCollection.getGeometryN(GeometryCollection.java:116)
    at com.databricks.labs.mosaic.core.geometry.polygon.MosaicPolygonJTS.toInternal(MosaicPolygonJTS.scala:15)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
mjohns-databricks commented 4 months ago

The zip code data itself seems to be the issue in the instance. @FeralCatColonist and I connected offline and are working it out.