delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.33k stars 410 forks source link

Add Support for Microsoft OneLake #1418

Closed djouallah closed 1 year ago

djouallah commented 1 year ago

Description

Microsoft recently released "in great fanfare" OneLake which is a managed lakehouse offering based on ADLS Gen2.

Reading works fine but writing generate an error, the Parquet is written but in the log, we get a tmp json file

image

using Polars writer which is based on Deltalake writer I believe get this error ldf.write_delta("/lakehouse/default/Files/regionn")

PyDeltaTableError                         Traceback (most recent call last)
Cell In[26], line 1
----> 1 ldf.write_delta("/lakehouse/default/Files/regionn")

File ~/cluster-env/trident_env/lib/python3.10/site-packages/polars/dataframe/frame.py:3363, in DataFrame.write_delta(self, target, mode, overwrite_schema, storage_options, delta_write_options)
   3360     if data_schema == table_schema.to_pyarrow(as_large_types=True):
   3361         data_schema = table_schema.to_pyarrow()
-> 3363 write_deltalake(
   3364     table_or_uri=target,
   3365     data=data,
   3366     mode=mode,
   3367     schema=data_schema,
   3368     overwrite_schema=overwrite_schema,
   3369     storage_options=storage_options,
   3370     **delta_write_options,
   3371 )

File ~/cluster-env/trident_env/lib/python3.10/site-packages/deltalake/writer.py:322, in write_deltalake(table_or_uri, data, schema, partition_by, filesystem, mode, file_options, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, name, description, configuration, overwrite_schema, storage_options, partition_filters)
    302 ds.write_dataset(
    303     data,
    304     base_dir="/",
   (...)
    318     max_partitions=max_partitions,
    319 )
    321 if table is None:
--> 322     _write_new_deltalake(
    323         table_uri,
    324         schema,
    325         add_actions,
    326         mode,
    327         partition_by or [],
    328         name,
    329         description,
    330         configuration,
    331         storage_options,
    332     )
    333 else:
    334     table._table.create_write_transaction(
    335         add_actions,
    336         mode,
   (...)
    339         partition_filters,
    340     )

PyDeltaTableError: Failed to read delta log object: Generic LocalFileSystem error: Unable to rename file: No such file or directory (os error 2)
cnolanminich commented 1 year ago

This is of great interest to my team as well -- the promise of OneLake as a managed access layer for a data lake (but being open to any compute engine to write) would be absolutely incredible!

rtyler commented 1 year ago

From the error and the fact that a temp file is remaining, it does look like the rename operation is failing. Can you share the URL scheme that the Delta table is being loaded with/ I'm wondering if Azure is exposing "raw" ADLSv2 or masking it somehow. This may require us to have a specific object_store implementation :thinking:

roeap commented 1 year ago

It seems like accessing the underlying one lake storage should be compatible with adls gen2 apis, but require a quite different kind of url. https://learn.microsoft.com/en-us/fabric/onelake/onelake-access-api

It seems they are using the gen2 apis.. unfortunately right now object store always calls the blob apis which work with both gen1 and gen2 storages, but may not work with one lake, we would have to find out.

While the docs are very limited right now I have not found any blocking disparity between gen2 and one lake storage from the api side. The authorization docs are very vague, but it sounds like things may just work using the same tokens as storage accounts do.

So if we are lucky we just need to handle urls correctly, maybe we also need to reimplement some things to use gen2 style requests only, as one lake apis may no longer support blob api requests.

mabasile-MSFT commented 1 year ago

Hi all, I'm on the OneLake team. I was trying to repro this on my side and was running into a "URL did not match any known pattern for scheme" error - does DeltaLake perform URL validation checks and is blocking the 'onelake.dfs.fabric.microsoft.com' format?

Also, @roeap - right now you can use the format "onelake.blob.fabric.microsoft.com" for OneLake to support Blob API calls. However this isn't supported for calls to the ABFS driver, so I'm not sure if that will help you here.

jocaplan commented 1 year ago

It looks like there is some validation of endpoints for Azure. image Adding two more lines to the Azure section of the config might be enough. It would also need to support: dfs.fabric.microsoft.com and blob.fabric.microsoft.com. https://github.com/delta-io/delta-rs/blob/b17f2863dc561834e51c1aa951260cce1a46d4f7/rust/src/storage/config.rs#L124

If you are supporting only blob, you can replace dfs in the URL with blob which you might already be doing for ADLS gen2.

roeap commented 1 year ago

@mabasile-MSFT, @jocaplan - thanks for the info, this is great news!

The code in delta-rs is actually just to choose the type of storage backend, we will have to do some (small) udates in the object_store crate where the actual client implementation lives.

However this isn't supported for calls to the ABFS driver, so I'm not sure if that will help you here.

object_store treats all variants or urls (including abfss) as specifying a location, but does not consider a protocol - we always connect using the rest APIs.

For the convenience of the user, we also like to allow for shorthand urls. like s3://<bucket>/<path>, or az://<container>/<path> - in case of blob requiring an additional account_name to be passed via config.

Do you think it makes sense to maybe create a convention for OneLake as well? Maybe something like

onelake://<workspace>/<item>/<path>, assuming that we would always find delta tables in a .lakehouse kind of item?

Also, is there some conventions / guarantees around the top level folders - i.e. would Files map to blobs, and Tables to data stored via the table APIs in storage accounts (and thus not relevant for delta?

Last but not least, we could allow for an optional region parameter to be passed in the config, to control data residency.

I'm on the OneLake team.

May is ask one thing that you may be able to answer or maybe forward to the storage PG? Since listing the storage is a key concern in delta, there are some optimisations possible if the storage supports sorted results and offsets as S3 and GCS do? IIRC, Azure does not support that thus far. Spark and soon delta-rs leverages these optimizations whenever possible. Since it seems MSFT went "all in" on Delta, is there a chance for this to become a feature? :)

jocaplan commented 1 year ago

Happy to pull in the storage team and discuss in more detail. It might be easier to have a quick call if you are open to it. I can send an invite. There are certainly a lot of optimizations that you could do for OneLake since we have a well-known structure at the top and that our tables are in delta lake.

However, I am curious why the normal ADLS path doesn't work. Other than the domain in the URL, we did a lot of work to match the ADLS DFS/Blob endpoints so that existing ADLS apps just work even if they call the REST APIs directly. We mapped our top-level concepts to ADLS concepts like storage accounts and containers. So far, any apps that didn't work were because of 1 of 2 reasons. Either we had a bug on our side, or the app was validating the domain in the URL.

djouallah commented 1 year ago

any update ?

mabasile-MSFT commented 1 year ago

We should have a PR out this week with our fix!

roeap commented 1 year ago

@jocaplan @mabasile-MSFT - sorry for being MIA for a while.

We should have a PR out this week with our fix!

Does this mean you are adding onelake support to object store, or is there another way in the works?

Regarding the talk with the storage team .- if that is still on the table... I am right now working on the ruist delta kernel implentation, trying to apply what we learned during delta-rs. As it turs nout there seems to be an assumtion in spark, the one gets lexicographically sorted results from list, and it seems to work out. But Icould not find any reference in the docs.

A second question would be if there is any change we can one day pass in a blobs key rather then an opaque marker to have list with offset.

Any info on that would ne great :)

mabasile-MSFT commented 1 year ago

Yes, we have two draft PRs - one adding OneLake support to object store, and one adding support in Delta RS (by updating the URL validation). We're just performing the last checks on our side and then we'll have the PRs officially submitted!

roeap commented 1 year ago

Great! I'll see that we integrate with the url parsing from object-store on this end. There is some awkwardness on how we handle environment variables right that migth lead to unexpected behaviours (in some cases env vars might take precedence over passed config), and I have been meaning to fox that along the way...

vmuddassir-msft commented 1 year ago

@roeap - The latest version of object_store v0.7.0 crate has been published which includes the changes required to support onelake. The latest version needs to be updated in delta-rs . Is there any specific procedure followed for upgrading package dependencies, raised a query #1597 with more details.

mabasile-MSFT commented 1 year ago

@roeap - just wondering if you had an update here. Is there a way to get delta-rs to take up this latest object_store version so we can also publish the changes for delta-rs to support OneLake?

rtyler commented 1 year ago

I believe support for this has landed via #1642 and will be in the 0.16 version of the crate

vipeller commented 1 year ago

We tested #1642 already in our rust based solution that pushes data directly to ms-fabric this works wonderfully.

djouallah commented 1 year ago

I hope we are are not going to wait months for the python package ?

djouallah commented 1 year ago

@rtyler please reopen the bug report, I just test it with Python 0.11 and got the same error

@mabasile-MSFT is it fixed or not ?

2023-09-26T20:49:14.645124+00:00 vm-cca98486 blobfusev2[4368]: [CURL perform] - ==> REQUEST/RESPONSE :: PUT http://onelake.dfs.fabric.microsoft.com/bb82abdf-8e66-44a5-be95-0a0c20421794/5b0917e6-0e49-47e9-beec-9dca4dd5ae58/Tables/delta_tes/_delta_log/_commit_1063fe1e-b815-4379-b16d-2bf2a483f3e6.json.tmp?&Content-Length=0&User-Agent=azure-storage-fuse/2.1.0&x-ms-date=Tue, 26 Sep 2023 20:49:14 GMT&x-ms-version=2021-06-08&x-ms-rename-source=/bb82abdf-8e66-44a5-be95-0a0c20421794/5b0917e6-0e49-47e9-beec-9dca4dd5ae58/Tables/delta_tes/_delta_log/_commit_1063fe1e-b815-4379-b16d-2bf2a483f3e6.json.tmp%231&Authorization=****&Transfer-Encoding=--------------------------------------------------------------------------------RESPONSE Status :: 404 :: REQ ID : 440cd461-101f-000a-0bba-f0570d000000 ROOT-ACTIVITY-ID : 0d731030-4940-4439-8e84-fe4d780bbc45

djouallah commented 1 year ago

@rtyler please reopen the bug report till the issue is resolved, this simple example does not work

import pandas as pd
from deltalake.writer import write_deltalake

df = pd.DataFrame(
    {
        "a": [1, 2, 3, 4, 5],
        "fruits": ["banana", "orange", "mango", "apple", "banana"],
    }
)

write_deltalake('/lakehouse/default/Files/fruits', df,mode='overwrite')
roeap commented 1 year ago

@djouallah - It seems you are referencing a local path, in which case delta-rs will use the local filesystem implementation. To work with onelake, you have to write that as an azure path. Additionally, the option use_fabric_endpoint: true needs to be passed with the storage options, along with an appropriate credential. In an environment where a metadata endpoint is available, that should get picked up as well.

djouallah commented 1 year ago

@roeap thanks, the local filesystem is just a fuse to an Azure storage, I guess that is a Fabric problem, not Delta rust

is there any example how to write to an Azure Path with credential and all ?

roeap commented 1 year ago

There are some integration tests, that currently are not executed in CI since they require a live credential ... https://github.com/delta-io/delta-rs/blob/18eec383955adafc8273bf17712009b81d92c58a/rust/tests/integration_object_store.rs#L25-L49

In essence though its just like reading from any other azure storage. The container name corresponds to the workspace and would be a giud - i think. The use_fabric_endpoint option, just tells object_store to construct urls with blob.fabric.microsoft.com rather then the usual blob.core.windows.net.

djouallah commented 1 year ago

thanks, it does not work, running inside Fabric,

image

roeap commented 1 year ago

could you share any error messages?

djouallah commented 1 year ago

OSError: Generic MicrosoftAzure error: Error performing token request: response error "request error", after 1 retries: error sending request for url (http://169.254.169.254/metadata/identity/oauth2/token?api-version=2019-08-01&resource=https%3A%2F%2Fstorage.azure.com): error trying to connect: tcp connect error: Connection timed out (os error 110)

roeap commented 1 year ago

since you did not configure a credential, it tries to query the metadata endpoint, which seems to not be available. Never worked with fabric, so not sure what the environments look like, an if that is expected. We have seen working instances with object store and managed identities though ...

So you need to figure out what credential is available and configure it based on the options for azure.

roeap commented 1 year ago

also the specified path is still worn, either configure the url in short form az://<container>/... and set the account in the options or env, or as a https URL, but mixing both will not work.

djouallah commented 1 year ago

Thanks @roeap I will just pass, I am not an expert in Azure authentication, I don't know how to get a token, account key is not exposed in Azure fabric, Microsoft should add more documentation how this thing is supposed to work, thanks for your help

vmuddassir-msft commented 1 year ago

@djouallah - You can generate a token from powershell using

az login --allow-no-subscriptions
$bearerToken = Get-AzAccessToken -ResourceTypeName Storage
$bearerToken.Token | Set-Clipboard

If you do not have Azure PowerShell Az module installed, please follow steps mentioned in - https://learn.microsoft.com/en-us/powershell/azure/install-azure-powershell

once this is done , please set the token in storage_options

`from deltalake import DeltaTable
from deltalake.writer import write_deltalake
import pandas as pd

# Delta table path 
delta_table_path = f"abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>"
# ADLS storage account name and key 
storage_options = {"bearer_token": "token_from_azure"} 
# Define delta table 
dt = DeltaTable(delta_table_path, storage_options=storage_options) `
djouallah commented 1 year ago

@vmuddassir-msft thanks, my scenario is to ingest data into Fabric onelake using a cloud function, I can't use Powershell in that case

vmuddassir-msft commented 1 year ago

@vmuddassir-msft thanks, my scenario is to ingest data into Fabric onelake using a cloud function, I can't use Powershell in that case

I meant you can use powershell to generate a token which can be used with deltalake

storage_options = {"bearer_token": "token_from_azure"} 
# Define delta table 
dt = DeltaTable(delta_table_path, storage_options=storage_options) `
djouallah commented 1 year ago

@vmuddassir-msft for how long the token will be valid ?

RobinLin666 commented 1 year ago

Hi @rtyler, @vmuddassir-msft, please reopen the bug report till the issue is resolved, this simple example does not work.

df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})
write_deltalake("abfss://xxx@onelake.dfs.fabric.microsoft.com/test.Lakehouse/Tables/sample_table2", df,
 storage_options={"bearer_token": aadToken, "use_fabric_endpoint": "true"})

error:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
Cell In[48], line 2
      1 df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})
----> 2 write_deltalake("abfss://xxx@onelake.dfs.fabric.microsoft.com/test.Lakehouse/Tables/sample_table2", df,
      3  storage_options={"bearer_token": aadToken, "use_fabric_endpoint": "true"})

File /nfs4/pyenv-515f53e0-5628-453e-a741-0c6f116d93b7/lib/python3.10/site-packages/deltalake/writer.py:153, in write_deltalake(table_or_uri, data, schema, partition_by, filesystem, mode, file_options, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, name, description, configuration, overwrite_schema, storage_options, partition_filters, large_dtypes)
    150     else:
    151         data, schema = delta_arrow_schema_from_pandas(data)
--> 153 table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)
    155 # We need to write against the latest table version
    156 if table:

File /nfs4/pyenv-515f53e0-5628-453e-a741-0c6f116d93b7/lib/python3.10/site-packages/deltalake/writer.py:417, in try_get_table_and_table_uri(table_or_uri, storage_options)
    414     raise ValueError("table_or_uri must be a str, Path or DeltaTable")
    416 if isinstance(table_or_uri, (str, Path)):
--> 417     table = try_get_deltatable(table_or_uri, storage_options)
    418     table_uri = str(table_or_uri)
    419 else:

File /nfs4/pyenv-515f53e0-5628-453e-a741-0c6f116d93b7/lib/python3.10/site-packages/deltalake/writer.py:430, in try_get_deltatable(table_uri, storage_options)
    426 def try_get_deltatable(
    427     table_uri: Union[str, Path], storage_options: Optional[Dict[str, str]]
    428 ) -> Optional[DeltaTable]:
    429     try:
--> 430         return DeltaTable(table_uri, storage_options=storage_options)
    431     except TableNotFoundError:
    432         return None

File /nfs4/pyenv-515f53e0-5628-453e-a741-0c6f116d93b7/lib/python3.10/site-packages/deltalake/table.py:250, in DeltaTable.__init__(self, table_uri, version, storage_options, without_files, log_buffer_size)
    231 """
    232 Create the Delta Table from a path with an optional version.
    233 Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI.
   (...)
    247 
    248 """
    249 self._storage_options = storage_options
--> 250 self._table = RawDeltaTable(
    251     str(table_uri),
    252     version=version,
    253     storage_options=storage_options,
    254     without_files=without_files,
    255     log_buffer_size=log_buffer_size,
    256 )
    257 self._metadata = Metadata(self._table)

OSError: Encountered object with invalid path: Error parsing Path "test.Lakehouse/Tables/sample_table2/_delta_log/_commit_ed2503ff-f28f-40c2-9a41-5be43ede8930.json.tmp#1": Encountered illegal character sequence "#" whilst parsing path segment "_commit_ed2503ff-f28f-40c2-9a41-5be43ede8930.json.tmp#1"
RobinLin666 commented 1 year ago

@roeap thanks, the local filesystem is just a fuse to an Azure storage, I guess that is a Fabric problem, not Delta rust

is there any example how to write to an Azure Path with credential and all ?

As for fuse's issue, I'd like to answer it.

Firstly, it will write a tmp#1 file in local,

image

But it did not call flush system call, so fuse will not upload the tmp#1 file to remote storage.

Then, it will rename tmp#1 to tmp file, so, fuse will call rename system call,

image

However, it cannot find tmp#1 file in the remote, so it failed with 404.

So, I hope delta-rs can fix it. Thanks!

roeap commented 1 year ago

Hi @rtyler, @vmuddassir-msft, please reopen the bug report till the issue is resolved, this simple example does not work. ... illegal character sequence "#" whilst parsing path segment "_commit_ed2503ff-f28f-40c2-9a41-5be43ede8930.json.tmp#1"

Hi @RobinLin666,

based on the error message we see, it seems there is an artifact from a failed commit in your table that contains a character deemed illegal in the object -store crate (specifically '#'). while it may be legal for azure blob, this is not the case for all object stores out there, and object_store tries to be fully consistent across all its implementations.

https://github.com/apache/arrow-rs/blob/51ac6fec8755147cd6b1dfe7d76bfdcfacad0463/object_store/src/path/parts.rs#L85-L109

roeap commented 1 year ago

As for fuse's issue, I'd like to answer it.

Thanks for the context. Just making sure I understand. so in fabric environments users will work against a mounted (via blobfuse) filesystem. This filesystem then makes the calls you describe. I.e. buffer files locally before sending them along to the remote store?

the file suffixed with #1 should be a fuse internal thing, as per the comment above, delta-rs should not be able to handle that path. Since there is no dedicated implementation for local fs mounted with fuse, we right now also have no control on when to call flush.

I have to think on that a little more :).

djouallah commented 1 year ago

I opened a bug report with glaredb which uses Delta_rs and it seems it is a blobfuse bug https://github.com/GlareDB/glaredb/issues/1809

RobinLin666 commented 1 year ago

Hi @rtyler, @vmuddassir-msft, please reopen the bug report till the issue is resolved, this simple example does not work. ... illegal character sequence "#" whilst parsing path segment "_commit_ed2503ff-f28f-40c2-9a41-5be43ede8930.json.tmp#1"

Hi @RobinLin666,

based on the error message we see, it seems there is an artifact from a failed commit in your table that contains a character deemed illegal in the object -store crate (specifically '#'). while it may be legal for azure blob, this is not the case for all object stores out there, and object_store tries to be fully consistent across all its implementations.

https://github.com/apache/arrow-rs/blob/51ac6fec8755147cd6b1dfe7d76bfdcfacad0463/object_store/src/path/parts.rs#L85-L109

Hi @roeap, thanks for your quick reply, I am using the last version (0.12.0) of the deltalake, I think _commit_ed2503ff-f28f-40c2-9a41-5be43ede8930.json.tmp#1 file is wrote by deltalake. I don't fully understand what you mean, is how do I need to modify code or upgrade some packages to write, or will delta-rs fix the bug?

RobinLin666 commented 1 year ago

As for fuse's issue, I'd like to answer it.

Thanks for the context. Just making sure I understand. so in fabric environments users will work against a mounted (via blobfuse) filesystem. This filesystem then makes the calls you describe. I.e. buffer files locally before sending them along to the remote store?

the file suffixed with #1 should be a fuse internal thing, as per the comment above, delta-rs should not be able to handle that path. Since there is no dedicated implementation for local fs mounted with fuse, we right now also have no control on when to call flush.

I have to think on that a little more :).

Thank you @djouallah, Just as @scsmithr said, it needs close (drop) the file before calls to rename.

image

Blobfuse may be for the sake of perf, so the upload file is only available when close file. Refer: https://github.com/Azure/azure-storage-fuse/blob/master/blobfuse/fileapis.cpp#L346

djouallah commented 1 year ago

thanks @RobinLin666 for the code how to get a token, it does works for me using this code

thank you everyone for your help, and sorry for my tone :)

import pandas as pd
from deltalake.writer import write_deltalake
from trident_token_library_wrapper import PyTridentTokenLibrary
aadToken = PyTridentTokenLibrary.get_access_token("storage")
df = pd.DataFrame(
    {
        "a": [1, 2, 3, 4, 5],
        "fruits": ["banana", "orange", "mango", "apple", "mandarin"],
    }
)

write_deltalake("abfss://xxxxxx@onelake.dfs.fabric.microsoft.com/yyyyy.Lakehouse/Tables/fruit",
 df,storage_options={"bearer_token": aadToken, "use_fabric_endpoint": "true"})
alexwilcoxson-rel commented 9 months ago

@roeap @jocaplan @mabasile-MSFT was there any discussion off this thread about opening a request to the azure storage team to support pushing down an offset filter to the list API?

roeap commented 9 months ago

@alexwilcoxson-rel - unfortunately, due to me not following up - we never reached out to the storage team. So right now, the list_from method does some internal stuff to emulatte this behaviour, but actually does normal list, since we do not have a marker. Hovever, given MSFTs commitment to delta, maybe the storage team ist open to the optimizatiojn to allow for pushing down actual blob key names ...

alexwilcoxson-rel commented 7 months ago

@roeap after speaking with our Microsoft contact I opened this: https://github.com/apache/arrow-rs/issues/5653