Open jayengee opened 7 months ago
Hi jayengee,
You are using a library version which does not support deletion vectors(0.6.4 and 2.12 is the scala version). From here,
“Delta Format Sharing” is newly introduced since delta-sharing-spark 3.1, which supports reading shared Delta tables with advanced Delta features such as deletion vectors and column mapping.
From the same link you can find the maven repository link with delta-sharing 3.1.0
Thanks for your response @aimtsou !
Sorry, I might be missing something basic here. From the 3.1.0 Delta release notes you shared I see links to delta-sharing-spark 2.12 and 2.13.
From the release notes:
This release of Delta https://github.com/delta-io/delta/issues/2291 a new module called delta-sharing-spark which enables reading Delta tables shared using the Delta Sharing protocol in Apache Spark™. It is migrated from https://github.com/delta-io/delta-sharing/tree/main/spark repository to https://github.com/delta-io/delta/tree/master/sharing repository. Last release version of delta-sharing-spark is 1.0.4 from the previous location. Next release of delta-sharing-spark is with the current release of Delta which is 3.1.0.
Does this mean that this repo isn't what I should be using moving forward? Alternatively, am I not supposed to be installing via PyPi, and instead compile delta-sharing and delta-sharing spark on my end?
FWIW the latest release in this repo seems to be 1.0.4, which was released before the delta 3.1.0 release. PyPi also still points to 1.0.3
Hi @jayengee,
You are confused and I understand that, believe I tried to read up on the repo and it is not being handled very well. Beware, I am not a maintainer so all is personal research while I setup the things on my side.
Sorry, I might be missing something basic here. From the 3.1.0 Delta release notes you shared I see links to delta-sharing-spark 2.12 and 2.13.
The 2.12 and 2.13 are Scala versions. Taking the 2.13 as example you can see all the versions by moving the directory up ie.
Does this mean that this repo isn't what I should be using moving forward?
I cannot reply that officially since I am not maintainer but both seem to be used, at least the one here until version 1.0.4 and for the description of the protocol. The documentation here, for the protocol points back to this repository. So for the moment we have a mix.
Alternatively, am I not supposed to be installing via PyPi, and instead compile delta-sharing and delta-sharing spark on my end?
There are several stuff:
You can try also to read the remote dataset that delta.io offers with this version.
And then try with your dataset but for that you will need the delta-sharing-spark 3.1.0 and a DBR version that supports Spark 3.5.0.
I hope I resolved some of your questions.
@jayengee thanks for the question. @aimtsou thanks for the answer.
The versions for delta-sharing-server, delta-sharing python connector and delta-sharing-spark are independent with each other.
We'll update our doc to make it more clear.
Thanks both for the input!
@linzhou-db
The latest delta-sharing python connector is 1.0.3, but it uses the latest delta-sharing-spark installed on your machine when calling load_as_spark, not restricted to the python connector version Understanding that the three different versions/packages are independent of one another, how is the
delta-sharing-spark
installed locally?
The path I had gone through initially was to simply run:
$ pip install delta-sharing
Then open up a python shell and run the previously provided commands
Given your last comment, I've also tried running this in pyspark shell locally:
$ pyspark --packages io.delta:delta-sharing-spark_2.13:3.1.0
However, I get the same error when calling load_as_pandas
.
I get a different error when calling load_as_spark
:
>>> delta_sharing.load_as_spark(f"{profile_path}#{share_name}.{table_path_2}")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/delta_sharing/delta_sharing.py", line 159, in load_as_spark
return df.load(url)
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 307, in load
return self._df(self._jreader.load(path))
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o47.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: io.delta.sharing.spark.DeltaSharingDataSource Unable to get public no-arg constructor
at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:582)
at java.base/java.util.ServiceLoader.getConstructor(ServiceLoader.java:673)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1233)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:629)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NoClassDefFoundError: scala/collection/IterableOnce
at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3137)
at java.base/java.lang.Class.getConstructor0(Class.java:3342)
at java.base/java.lang.Class.getConstructor(Class.java:2151)
at java.base/java.util.ServiceLoader$1.run(ServiceLoader.java:660)
at java.base/java.util.ServiceLoader$1.run(ServiceLoader.java:657)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.util.ServiceLoader.getConstructor(ServiceLoader.java:668)
... 33 more
Caused by: java.lang.ClassNotFoundException: scala.collection.IterableOnce
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 41 more
For context, I'm running these calls locally against a Databricks metastore, in order to POC a future use case where we would want to open up (share) tables from our Databricks metastore with users who don't have access to our Databricks workspaces. My hope is to figure out a playbook for them, which I had initially anticipated would be via just installing delta-sharing
and being given a token/profile .share file
Thanks for your help!
@jayengee:
I find your procedure correct. Although from the error I see that you miss a Java library which seems weird due to the fact that you try to bring it with the --packages flag.
Can you try to load in your local environment the remote dataset that delta.io offers with version 3.1.0.
Also can you try to add the following snippet at your code? Can you read the tables even with your profile file and dataset?
# Create a SharingClient.
client = delta_sharing.SharingClient(profile_file)
# List all shared tables.
print(client.list_all_tables())
Hi @aimtsou
Yep I thought that was odd as well.
Output below from where I list all tables from my side (pointed at Databricks), and also at the remote dataset from delta.io. In short, despite installing delta-sharing-spark 2.13:3.1.0, and being able to list the tables, I can't read from them - I get the same error about an unsupported feature. However, I am able to read from the remote dataset from delta.io
$ pyspark --packages io.delta:delta-sharing-spark_2.13:3.1.0
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.1
/_/
Using Python version 3.9.13 (main, Jun 28 2022 16:47:28)
Spark context Web UI available at http://172.16.11.113:4040
Spark context available as 'sc' (master = local[*], app id = local-1713903316429).
SparkSession available as 'spark'.
>>> import delta_sharing
>>> profile_path = "/Users/<user>/Downloads/config.share"
>>> share_name = "delta_share_poc"
>>> table_path_2 = 'product2.customer'
>>>
>>> # Create a SharingClient.
>>> client = delta_sharing.SharingClient(profile_path)
>>>
>>> # List all shared tables.
>>> print(client.list_all_tables())
[Table(name='customer', share='delta_share_poc', schema='product2')]
>>>
>>> delta_sharing.load_as_pandas(f"{profile_path}#{share_name}.{table_path_2}", limit=10)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/delta_sharing/delta_sharing.py", line 122, in load_as_pandas
return DeltaSharingReader(
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/delta_sharing/reader.py", line 89, in to_pandas
response = self._rest_client.list_files_in_table(
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/delta_sharing/rest_client.py", line 113, in func_with_retry
raise e
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/delta_sharing/rest_client.py", line 101, in func_with_retry
return func(self, *arg, **kwargs)
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/delta_sharing/rest_client.py", line 342, in list_files_in_table
with self._post_internal(
File "/Users/<user>/.pyenv/versions/3.9.13/lib/python3.9/contextlib.py", line 119, in __enter__
return next(self.gen)
File "/Users/<user>/.local/share/virtualenvs/code-QImT9lvn/lib/python3.9/site-packages/delta_sharing/rest_client.py", line 441, in _request_internal
raise HTTPError(message, response=e.response) from None
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://oregon.cloud.databricks.com/api/2.0/delta-sharing/metastores/988a4820-a048-40a3-9d85-e690d2a6b89c/shares/delta_share_poc/schemas/product2/tables/customer/query
Response from server:
{ 'details': [ { '@type': 'type.googleapis.com/google.rpc.ErrorInfo',
'domain': 'data-sharing.databricks.com',
'metadata': { 'optionStr': 'For DeletionVectors, use DBR with '
'version 14.1(14.2 for CDF and '
'streaming) or higher, or '
'delta-sharing-spark with version '
'3.1 or higher, and set option '
'("responseFormat", "delta") to '
'query the table. ',
'tableFeatures': 'delta.enableDeletionVectors',
'versionStr': ' version: 10.'},
'reason': 'DS_UNSUPPORTED_FEATURES'}],
'error_code': 'INVALID_PARAMETER_VALUE',
'message': 'Table features delta.enableDeletionVectors are found in table '
'version: 10. For DeletionVectors, use DBR with version 14.1(14.2 '
'for CDF and streaming) or higher, or delta-sharing-spark with '
'version 3.1 or higher, and set option ("responseFormat", '
'"delta") to query the table. '}
>>>
>>> profile_path_2 = "/Users/<user>/Downloads/open-datasets.share"
>>> client_2 = delta_sharing.SharingClient(profile_path_2)
>>> print(client_2.list_all_tables())
[Table(name='COVID_19_NYT', share='delta_sharing', schema='default'), Table(name='boston-housing', share='delta_sharing', schema='default'), Table(name='flight-asa_2008', share='delta_sharing', schema='default'), Table(name='lending_club', share='delta_sharing', schema='default'), Table(name='nyctaxi_2019', share='delta_sharing', schema='default'), Table(name='nyctaxi_2019_part', share='delta_sharing', schema='default'), Table(name='owid-covid-data', share='delta_sharing', schema='default')]
>>> delta_sharing.load_as_pandas(f"{profile_path_2}#delta_sharing.default.COVID_19_NYT", limit=10)
date county state fips cases deaths
0 2021-01-10 Washakie Wyoming 56043 804 21
1 2021-01-10 Weston Wyoming 56045 485 4
2 2021-01-11 Autauga Alabama 1001 4902 55
3 2021-01-11 Baldwin Alabama 1003 15417 173
4 2021-01-11 Barbour Alabama 1005 1663 35
5 2021-01-11 Bibb Alabama 1007 2060 48
6 2021-01-11 Blount Alabama 1009 5080 77
7 2021-01-11 Bullock Alabama 1011 957 28
8 2021-01-11 Butler Alabama 1013 1637 57
9 2021-01-11 Calhoun Alabama 1015 10537 178
>>>
I'm seeing the same error message reported above when I attempt to read tables inside Databricks I've shared to myself that have deletion vectors enabled. Actually, slightly different wording in error message but same error number and general gist of message (see below).
If I disable deletion vectors temporarily, I am able to read them via load_as_pandas()
I am able to read all the tables in the delta.io remote dataset - but question whether any of them have deletion vectors enabled.
Note: I'm running via Python rather than Spark Console.
Versions I'm running are...
Name: delta-sharing Version: 1.0.5 (latest)
Name: databricks-spark Version: 3.2.0 (latest)
Name: databricks-connect Version: 14.3.2 (latest)
"delta-sharing-spark" not installed (pip show doesn't find - pypi doesn't list)
SHOW TBLPROPERTIES catalog.schema.table;
ALTER TABLE catalog.schema.table SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
{ 'details': [ { '@type': 'type.googleapis.com/google.rpc.ErrorInfo',
'domain': 'data-sharing.databricks.com',
'metadata': { 'dsError': 'DS_UNSUPPORTED_DELTA_TABLE_FEATURES',
'optionStr': 'For DeletionVectors, use DBR with '
'version 14.1(14.2 for CDF and '
'streaming) or higher, or '
'delta-sharing-spark with version '
'3.1 or higher, and set option '
'("responseFormat", "delta") to '
'query the table. Or ask your '
'provider to disable '
'DeletionVectors with\n'
' (ALTER TABLE <table_name> SET ' 'TBLPROPERTIES ' '(delta.enableDeletionVectors=false)
),\n'
' rewrite it without Deletion '
'Vectors (REORG TABLE ' '<table_name> APPLY(PURGE)
).',
'tableFeatures': 'delta.enableDeletionVectors',
'versionStr': ' version: 6.'},
'reason': 'DS_UNSUPPORTED_DELTA_TABLE_FEATURES'}],
'error_code': 'INVALID_PARAMETER_VALUE',
'message': 'DS_UNSUPPORTED_DELTA_TABLE_FEATURES: Table features '
'delta.enableDeletionVectors are found in table version: 6. For '
'DeletionVectors, use DBR with version 14.1(14.2 for CDF and '
'streaming) or higher, or delta-sharing-spark with version 3.1 or '
'higher, and set option ("responseFormat", "delta") to query the '
'table. Or ask your provider to disable DeletionVectors with\n'
' (ALTER TABLE <table_name> SET TBLPROPERTIES ' '(delta.enableDeletionVectors=false)
),\n'
' rewrite it without Deletion Vectors (REORG TABLE <table_name> ' 'APPLY(PURGE)
).'}
@john-grassroots:
"delta-sharing-spark" not installed (pip show doesn't find - pypi doesn't list)
Delta sharing spark is a jar file that you give to your spark context. If you are using azure you do not need to install it, in your cluster. You are saying that you are running databricks but you do not mention which version.
Hey @aimtsou - thanks for your comment.
We're on AWS Databricks. When we're running non-serverless compute clusters we're generally on 14.3 LTS.
I don't see any reference to SparkContext in either of the python examples from the project.
The readme does state you should be passing delta-sharing-spark but the python example (which is the way we're running) does not have any mention of delta-sharing-spark.
In my case, I'm sharing a few tables to myself. Those tables are part of a share defined on AWS Databricks. My client code is running on my local machine via python. We're developing some example code to distribute to partners who we intend to share data with via Delta Sharing.
I haven't moved on from Python to PySpark examples as I've run into a few issues with the Python examples I'm working through (this one and another with CDF Timestamps)
If I'm not missing something obvious, I think I'm experiencing the same error here that @jayengee is.
From README.md
To run the example of PySpark in Python run spark-submit --packages io.delta:delta-sharing-spark_2.12:0.6.2 ./python/quickstart_spark.py
To run the example of pandas DataFrame in Python run python3 ./python/quickstart_pandas.py
Hey @aimtsou - thanks for your comment.
We're on AWS Databricks. When we're running non-serverless compute clusters we're generally on 14.3 LTS.
I don't see any reference to SparkContext in either of the python examples from the project.
So if I understand well you have enabled a share from your Databricks Unity Catalog. These examples are old, they are good for starting but do not cover the latest features ie: DeletionVectors
The readme does state you should be passing delta-sharing-spark but the python example (which is the way we're running) does not have any mention of delta-sharing-spark.
In my case, I'm sharing a few tables to myself. Those tables are part of a share defined on AWS Databricks. My client code is running on my local machine via python. We're developing some example code to distribute to partners who we intend to share data with via Delta Sharing.
I believe the spark session that you will start needs to have delta-sharing-spark >=3.1.0 If I understand your use case you have created a share from your unity catalog and you want to share it to somebody without access to databricks (so the open protocol)
I will use the MS Docs because it has some good examples for the open protocol. If you go to deletion vectors you will see you need delta-sharing-spark 3.1.0 at least. Now the most interesting next section is about pandas and it does not refer anything about Deletion Vectors although it provides an example with CDF. I believe deletion vectors are not supported in load_as_pandas but I will make a test using a new delta table.
I haven't moved on from Python to PySpark examples as I've run into a few issues with the Python examples I'm working through (this one and another with CDF Timestamps)
If I'm not missing something obvious, I think I'm experiencing the same error here that @jayengee is.
From README.md
To run the example of PySpark in Python run spark-submit --packages io.delta:delta-sharing-spark_2.12:0.6.2 ./python/quickstart_spark.py To run the example of pandas DataFrame in Python run python3 ./python/quickstart_pandas.py
spark-submit --packages io.delta:delta-sharing-spark_2.12:3.1.0 ./python/quickstart_spark.py For me even better would be to run: pyspark --packages io.delta:delta-sharing-spark_2.12:3.1.0
Then use the interactive shell, to try to load the tables in spark once with with DeletionVectors as true and one as false If it loads it confirms my theory that with deletion vectors you can just load as spark and then you need to convert to pandas.
I will make some tests on my own and let you know.
For sure the docs and the examples need more updates
Thanks @aimtsou - I'll take a look tomorrow at the Spark stuff.
To answer your questions above... Yes, we're on Databricks utilizing Unity Catalog and we've created a Delta Share there in that environment.
I've shared that data with myself and I'm consuming it outside of Databricks as that will be the scenario most of our clients and partners will be utilizing.
That share contains several test cases... Tables with delete vectors enables, with delete vectors disabled, and with change data feed enabled.
I can verify that I'm currently running the Python test project directly from VSCode and I'm not issuing any spark-submit commands from the terminal.
For tables where deletion vectors are enabled, when issuing the load_as_pandas method I get the error I mentioned above which is the same error @jayengee initially reported.
If I disable deletion vectors for that table the error goes away.
As @jayengee pointed out, there is an option to make enablement of deletion vectors enabled by default so explicitly disabling them isn't the solution.
I'm also not sure that first creating the dataframe in spark and then converting to pandas is the solution although it may do as a temporary workaround but forcing a consumer to go one route when deletion vectors are enabled and another when they're not feels like an anti-pattern.
I'll report back tomorrow when I get into Spark.
Thanks!
Hi @john-grassroots @jayengee , There are some issues related to the local spark and java packages, which I'm not sure exactly what packages are related.
But we are actively working on native support in load_as_pandas() to read shared Tables with deletion vectors. We'll make a release note once it's ready.
@linzhou-db any progress on load_as_pandas? We experience same error in our project with 1.0.5 package.
Issue
Running a simple
load_as_pandas
command against tables we've created in Databricks is returning an error suggesting that this library does not support tables with deletion vectors enabled, which could be the default setting for tables in DatabricksThe logs suggest that a newer version of
delta-sharing-spark
would support this, but this library seems to be using 2.12: https://github.com/delta-io/delta-sharing/blob/e368d5880a99c585bbbb67e0015189c552167b4b/README.md?plain=1#L130. Are there plans to make this available/upgrade ?Logs
The share for this recipient was set such that they had access to table history, which suggests that permissioning is not the issue here:
When I disabled the associated property on the table by running
ALTER TABLE delta_share_poc.product2.customer SET TBLPROPERTIES (delta.enableDeletionVectors = true);
, the sameload_as_pandas
call is successful:Steps to reproduce
delta_sharing
library