apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.87k stars 656 forks source link

My parquet reader is missing `parquetFilterPushDownStringStartWith` #996

Closed aw-west-defra closed 1 year ago

aw-west-defra commented 1 year ago

Expected behaviour

I want to read and save my data using GeoParquet. I can save, but not read.

Actual behaviour

java.lang.NoSuchMethodException: java.lang.NoSuchMethodError: org.apache.spark.sql.internal.SQLConf.parquetFilterPushDownStringStartWith()Z` ```java Py4JJavaError Traceback (most recent call last) File :3 1 sdf = spark.read.format('geoparquet').load(sf) ----> 3 display(sdf) File /databricks/python_shell/dbruntime/display.py:87, in Display.display(self, input, *args, **kwargs) 84 if kwargs.get('trigger'): 85 raise Exception('Triggers can only be set for streaming queries.') ---> 87 self.add_custom_display_data("table", input._jdf) 88 # TODO: replace this with an implementation that serves large file upload. 89 # This version is for Serverless + Spark Connect dogfooding. 90 elif self.spark_connect_enabled and \ 91 type(input).__module__ == 'pyspark.sql.connect.dataframe' and \ 92 type(input).__name__ == 'DataFrame': File /databricks/python_shell/dbruntime/display.py:40, in Display.add_custom_display_data(self, data_type, data) 38 def add_custom_display_data(self, data_type, data): 39 custom_display_key = str(uuid.uuid4()) ---> 40 return_code = self.entry_point.addCustomDisplayData(custom_display_key, data_type, data) 41 ip_display({ 42 "application/vnd.databricks.v1+display": custom_display_key, 43 "text/plain": "" 44 }, 45 raw=True) 46 if return_code == 1: File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception..deco(*a, **kw) 186 def deco(*a: Any, **kw: Any) -> Any: 187 try: --> 188 return f(*a, **kw) 189 except Py4JJavaError as e: 190 converted = convert_exception(e.java_exception) File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling t.addCustomDisplayData. : java.lang.NoSuchMethodException: java.lang.NoSuchMethodError: org.apache.spark.sql.internal.SQLConf.parquetFilterPushDownStringStartWith()Z at org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat.buildReaderWithPartitionValues(GeoParquetFileFormat.scala:212) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:2125) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:2113) at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:2186) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$2(SparkPlan.scala:274) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:274) at org.apache.spark.sql.execution.SparkPlan$.org$apache$spark$sql$execution$SparkPlan$$withExecuteQueryLogging(SparkPlan.scala:107) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:332) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:328) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:269) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:112) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:124) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:126) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:114) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:553) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:545) at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:565) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:426) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:419) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:313) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:504) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:501) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3726) at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3717) at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4642) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:926) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4640) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:248) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:460) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:183) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1038) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:133) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:410) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4640) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3716) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101) at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:744) at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1055) at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:239) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195) at py4j.ClientServerConnection.run(ClientServerConnection.java:115) at java.lang.Thread.run(Thread.java:750) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:104) at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:744) at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1055) at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:239) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195) at py4j.ClientServerConnection.run(ClientServerConnection.java:115) at java.lang.Thread.run(Thread.java:750) ```

Steps to reproduce the problem

Register sedona, and get an example dataset.

import geodatasets as gds
import geopandas as gpd

from pyspark.sql import functions as F
import sedona.spark
sedona.spark.SedonaContext.create(spark)

name = 'nybb'
f = f'/dbfs/tmp/{name}.parquet'
sf = f.replace('/dbfs/', 'dbfs:/')

path = gds.get_path(name)
gdf = gpd.read_file(path)
gdf.to_parquet(f)

This fails to read that dataset in.

sdf = spark.read.format('geoparquet').load(sf)

display(sdf)
>>> java.lang.NoSuchMethodException: java.lang.NoSuchMethodError: org.apache.spark.sql.internal.SQLConf.parquetFilterPushDownStringStartWith()Z

But I can save as geoparquet.

sdf = spark.read.parquet(sf).withColumn('geometry', F.expr('ST_GeomFromWKB(geometry)'))

sdf.write.format('geoparquet').save(sf)

gdf = gpd.read_parquet(f)
gdf.explore()

Am I missing out on bbox and indexing performance not using GeoParquet?

Settings

Sedona version = 1.4.1 Apache Spark version = 3.4.0 API type = Python Scala version = 2.12 JRE version = 1.8 Python version = 3.10 Environment = Azure Databricks - DBR 13.2

I don't believe it is due to my config/init script. I have previously failed to use GeoParquet on DBR 9 LTS, and 12 LTS, would it help to recreate those errors?

Kontinuation commented 1 year ago

Sedona 1.4.1 has 2 spark-shaded jars for scala 2.12 named sedona-spark-shaded-3.0_2.12-1.4.1.jar and sedona-spark-shaded-3.4_2.12-1.4.1.jar. If you are using sedona-spark-shaded-3.0_2.12-1.4.1.jar in your Databricks cluster, you can try switching to sedona-spark-shaded-3.4_2.12-1.4.1.jar, which is targeting Spark 3.4.

The GeoParquet reader also has a known problem of not working on Databricks with photon enabled. If your cluster does not have photon enabled, switching the spark-shaded jar should solve this problem.

aw-west-defra commented 1 year ago

Thank you! I was using spark version 3.0 instead of 3.4.