delta-io / delta-sharing

An open protocol for secure data sharing
https://delta.io/sharing
Apache License 2.0
719 stars 154 forks source link

load_as_spark fails with Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated #489

Closed suckowuw closed 1 month ago

suckowuw commented 1 month ago

Dear all, need help don't get load_as_spark working.

spark-submit --deploy-mode client --files config.share --jars delta-core_2.13-2.1.1.jar,delta-sharing-spark_2.13-0.7.5.jar dsh_spark.py

import os import delta_sharing from pyspark.sql import SparkSession

client = delta_sharing.SharingClient(f"config.share") print(client.list_all_tables())

table_url = f"config.share#dsh.demo_tables_ext.dshdelta1m"

spark = SparkSession.builder.appName("delta-sharing") \ .config("spark.jars.packages", "io.delta:delta-sharing-spark_2.13-0.7.5") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate()

spark_df = delta_sharing.load_as_spark(table_url)

print(len(spark_df), " rows") print("Sample rows:") spark_df.show(3)

Traceback (most recent call last):

File "dsh_spark.py", line 13, in

spark_df = delta_sharing.load_as_spark(table_url)

File "/app/localstorage/skylab/python-dev/lib/python3.7/site-packages/delta_sharing/delta_sharing.py", line 163, in load_as_spark

return df.load(url)

File "/opt/cloudera/parcels/CDH-7.2.15-1.cdh7.2.15.p3.30895120/lib/spark3/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 158, in load

File "/opt/cloudera/parcels/CDH-7.2.15-1.cdh7.2.15.p3.30895120/lib/spark3/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1322, in call

File "/opt/cloudera/parcels/CDH-7.2.15-1.cdh7.2.15.p3.30895120/lib/spark3/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco

File "/opt/cloudera/parcels/CDH-7.2.15-1.cdh7.2.15.p3.30895120/lib/spark3/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o91.load.

: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated

    at java.util.ServiceLoader.fail(ServiceLoader.java:232)

    at java.util.ServiceLoader.access$100(ServiceLoader.java:185)

    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)

    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)

    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)

    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)

    at scala.collection.Iterator.foreach(Iterator.scala:941)

    at scala.collection.Iterator.foreach$(Iterator.scala:941)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

    at scala.collection.IterableLike.foreach(IterableLike.scala:74)

    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)

    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)

    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)

    at scala.collection.TraversableLike.filter(TraversableLike.scala:347)

    at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)

    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)

    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)

    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)

    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)

    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:188)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

    at py4j.Gateway.invoke(Gateway.java:282)

    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

    at py4j.commands.CallCommand.execute(CallCommand.java:79)

    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)

    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)

    at java.lang.Thread.run(Thread.java:750)

Caused by: java.lang.NoClassDefFoundError: scala/collection/IterableOnce

    at java.lang.Class.getDeclaredConstructors0(Native Method)

    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)

    at java.lang.Class.getConstructor0(Class.java:3075)

    at java.lang.Class.newInstance(Class.java:412)

    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)

    ... 31 more

Caused by: java.lang.ClassNotFoundException: scala.collection.IterableOnce

    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)

    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

    ... 36 more
suckowuw commented 1 month ago

found solution:

  1. Run spark-shell to get Scala version of Cloudera cluster: "Using Scala version 2.12.10"
  2. Download from https://mavenlibs.com/jar/file/io.delta/delta-sharing-spark_2.12 matching delta-sharing-spark_2.12:0.7.5
  3. Upload delta share credential file to HDFS e.g. /user/<your account/config.share

Sample code:

` import os import sys import time import delta_sharing from pyspark.sql import SparkSession

start = time.time() table_url = f"config.share#dsh.demo_tables_ext.{sys.argv[1]}"

spark = SparkSession.builder.appName("delta-sharing") \ .config("spark.jars.packages", "io.delta:delta-sharing-spark_2.12:0.7.5") \ .getOrCreate()

spark.sparkContext.setLogLevel('WARN')

client = delta_sharing.SharingClient(f"config.share") print(client.list_all_tables())

spark_df = delta_sharing.load_as_spark(table_url) print(spark_df.count(), " rows")

print("Sample rows:") spark_df.show(3)

elapsed = time.time() - start print(elapsed, " seconds elapsed, reading.") # time in seconds

start = time.time()

spark_df.write.option("delimiter", ";").option("header", True).csv(f"abfss://@/user//{sys.argv[1]}.csv")

elapsed = time.time() - start print(elapsed, " seconds elapsed, writing.") # time in seconds `

Run code: spark-submit --deploy-mode client --jars delta-sharing-spark_2.12-0.7.5.jar dsh_spark.py dshdelta100m